Skip to main content

Task State and Transitions

In Flyte, plugins communicate the status and progress of a task execution back to the engine using a structured state machine. This communication is centered around the Transition object, which encapsulates the current execution phase and metadata required by the Flyte control plane.

The Transition Mechanism

When the Flyte engine invokes a plugin's Handle method, it expects a Transition in return. This object tells the framework whether the task is still in progress, has succeeded, or has encountered an error that requires a retry or permanent failure.

A Transition consists of two primary components:

  1. TransitionType: Defines the consistency model for the state update.
  2. PhaseInfo: Contains the detailed state of the task, including the phase, version, and associated metadata.

Consistency Models

The TransitionType (defined in flyteplugins/go/tasks/pluginmachinery/core/transition.go) determines how the Flyte engine persists the state:

  • TransitionTypeEphemeral: This is the standard and most performant option. It is eventually consistent, meaning the state written might not be immediately visible in the next Handle call. It is ideal for idempotent plugin logic where the plugin can safely re-derive or re-verify state if needed.
  • TransitionTypeBarrier: This type is deprecated. It was previously used to attempt monotonic consistency, ensuring the latest state was visible for consecutive reads.

Plugins typically use the core.DoTransition(info PhaseInfo) helper, which defaults to TransitionTypeEphemeral.

Task Phases

The Phase enum in flyteplugins/go/tasks/pluginmachinery/core/phase.go represents the discrete states of a task's lifecycle. The engine uses these phases to determine if it should continue calling Handle or move the task to a terminal state.

PhaseDescription
PhaseQueuedThe task is submitted but has not started executing.
PhaseInitializingThe system is preparing for execution (e.g., downloading containers).
PhaseRunningThe task is actively executing.
PhaseSuccessThe task completed successfully (Terminal).
PhaseRetryableFailureA recoverable error occurred; the engine may retry the task.
PhasePermanentFailureA non-recoverable error occurred (Terminal).
PhaseWaitingForResourcesThe plugin is waiting for external resources (e.g., quota) before starting.

Detailed State with PhaseInfo

While Phase provides the high-level status, PhaseInfo provides the context. It includes logs, external resource links, and human-readable reasons for the current state.

Versioning and Immutability

A critical aspect of PhaseInfo is the version field. In the Flyte control plane, task events are immutable for a given (Phase, Version) pair. If a plugin needs to update metadata (like adding new log links) while remaining in the same PhaseRunning, it must increment the version.

// Example: Updating logs while staying in the Running phase
newInfo := core.PhaseInfoRunning(currentVersion + 1, &core.TaskInfo{
Logs: []*core.TaskLog{{Uri: "http://logs/path", Name: "Main Log"}},
})

Helper Functions

The core package provides numerous helper functions to simplify the creation of PhaseInfo objects:

  • PhaseInfoSuccess(info *TaskInfo)
  • PhaseInfoRunning(version uint32, info *TaskInfo)
  • PhaseInfoFailure(code, reason string, info *TaskInfo)
  • PhaseInfoRetryableFailure(code, reason string, info *TaskInfo)

Practical Implementation

Simple State Transition

In the Sleep plugin (flyteplugins/go/tasks/plugins/core/sleep/plugin.go), the transition logic is straightforward. It checks the elapsed time and returns either a Running or Success transition.

func (p *Plugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) {
// ... logic to check duration ...
if time.Since(startTime) >= sleepDuration {
return core.DoTransition(core.PhaseInfoSuccess(nil)), nil
}

return core.DoTransition(core.PhaseInfoRunning(core.DefaultPhaseVersion, nil)), nil
}

Mapping External States

For plugins interacting with external services, such as the WebAPI connector (flyteplugins/go/tasks/plugins/webapi/connector/plugin.go), internal resource states are mapped to Flyte phases:

func (p *Plugin) getPhaseInfo(ctx context.Context, taskCtx webapi.StatusContext, resource ResourceWrapper) (core.PhaseInfo, error) {
switch resource.Phase {
case flyteIdl.TaskExecution_QUEUED:
return core.PhaseInfoQueuedWithTaskInfo(time.Now(), core.DefaultPhaseVersion, resource.Message, taskInfo), nil
case flyteIdl.TaskExecution_SUCCEEDED:
return core.PhaseInfoSuccess(taskInfo), nil
// ...
}
return core.PhaseInfoUndefined, nil
}

Error Handling

When a plugin encounters an unexpected error during the Handle method that prevents it from determining the task's state, it should return core.UnknownTransition. This is a predefined transition that uses TransitionTypeEphemeral and PhaseInfoUndefined. Returning this signals to the Flyte engine that the state is currently unknown and the operation should be retried or handled as a system error.

if err != nil {
return core.UnknownTransition, err
}

Additionally, PhaseInfo supports a cleanupOnFailure flag. This is used in scenarios where a task is marked as a failure (e.g., ImagePullBackoff in Kubernetes) but still requires explicit cleanup of resources that would otherwise continue to consume quota or retry indefinitely.