Skip to main content

Handling Task State and Errors

Flyte manages task execution by mediating between the Flyte plugin machinery and the Kubernetes resource state. This process involves persisting opaque plugin state, tracking a timeline of phase transitions, and classifying errors to determine whether a task should be retried or marked as a permanent failure.

Task State Persistence

Plugins often need to store internal state (such as job IDs or resource versions) between reconciliation loops. Flyte handles this using the PluginStateManager in executor/pkg/plugin/state_manager.go. This manager uses Gob encoding to serialize plugin-specific data into the TaskActionStatus.PluginState byte slice.

When the TaskActionReconciler starts a reconciliation loop in executor/pkg/controller/taskaction_controller.go, it initializes the state manager with the existing state:

// Build PluginStateManager from persisted state
stateMgr := plugin.NewPluginStateManager(
taskAction.Status.PluginState,
taskAction.Status.PluginStateVersion,
)

During the Handle call, the plugin can read from and write to this manager. If the plugin writes new state, the controller detects this and updates the TaskAction resource:

// Persist new PluginState if it was modified during Handle
if newBytes, newVersion, written := stateMgr.GetNewState(); written {
taskAction.Status.PluginState = newBytes
taskAction.Status.PluginStateVersion = newVersion
}

The PluginStateVersion field allows plugins to evolve their state schema over time while maintaining compatibility with existing resources.

Tracking Phase Transitions

Flyte maintains an append-only log of task progress in the PhaseHistory field of TaskActionStatus. Unlike standard Kubernetes conditions which are often updated in-place, PhaseHistory preserves the full timeline of the task.

Each entry is a PhaseTransition object (defined in executor/api/v1/taskaction_types.go):

type PhaseTransition struct {
Phase string `json:"phase"` // e.g., "Queued", "Executing", "Succeeded"
OccurredAt metav1.Time `json:"occurredAt"` // Timestamp of the transition
Message string `json:"message,omitempty"`
}

The controller appends to this history only when the plugin reports a transition to a new phase name. This history is critical for observability and is used to derive the UpdatedTime for events sent to the Flyte control plane.

Error Handling and Classification

When a task fails, Flyte distinguishes between different failure modes to decide the next course of action. This logic is centered around the ErrorState struct and the SystemFailures counter.

Structured Errors

If a plugin returns a core.ExecutionError, the controller captures it in the ErrorState field of the TaskActionStatus. This ensures that specific error codes (like OOMKilled or ContainerCreating) are preserved and can be surfaced to the SDK.

The mapping is performed by errorStateFromExecError in executor/pkg/controller/taskaction_controller.go:

func errorStateFromExecError(err *core.ExecutionError) *flyteorgv1.ErrorState {
if err == nil {
return nil
}
kind := ""
switch err.GetKind() {
case core.ExecutionError_USER:
kind = "USER"
case core.ExecutionError_SYSTEM:
kind = "SYSTEM"
}
return &flyteorgv1.ErrorState{
Code: err.GetCode(),
Kind: kind,
Message: err.GetMessage(),
}
}

Retry Logic: User vs. System Failures

Flyte treats user-level errors and system-level errors differently:

  1. User Failures: If a plugin reports a USER kind retryable failure, Flyte increments the Attempts counter and clears the PluginState. This triggers an "in-place restart," ensuring the next reconciliation loop starts the task from scratch.
  2. System Failures: If the controller encounters a Go error from Plugin.Handle (like a transient Kubernetes API error) or the plugin reports a SYSTEM kind failure, the SystemFailures counter is incremented.

Terminal Failures

To prevent infinite loops during infrastructure instability, Flyte enforces a limit on consecutive system failures. This is controlled by DefaultMaxSystemFailures (defaulting to 3 in executor/pkg/controller/taskaction_controller.go).

If SystemFailures exceeds this threshold, the TaskAction is transitioned to a permanent failure state. Note that any successful non-system-error transition resets the SystemFailures counter to 0, ensuring that only consecutive system issues trigger the terminal state.

Observability with StateJSON

For external monitoring and debugging, TaskActionStatus includes a StateJSON field. This field contains a JSON-serialized version of the NodeStatus that was last synchronized with the Flyte State Service. While PluginState is Gob-encoded and opaque, StateJSON provides a human-readable snapshot of the task's high-level status as understood by the broader Flyte system.