Skip to main content

Task Execution Engine

The Task Execution Engine in Flyte is the core infrastructure responsible for managing the lifecycle of individual task executions. It is built around the TaskAction Custom Resource Definition (CRD) and a specialized controller, the TaskActionReconciler, which orchestrates plugin execution, data movement, and state persistence.

The TaskAction Resource

The TaskAction resource (defined in executor/api/v1/taskaction_types.go) serves as the primary interface between the Flyte control plane and the execution engine. It encapsulates everything needed to run a task and tracks its progress through a series of phases.

Specification

The TaskActionSpec contains the desired state for a task execution:

  • TaskType: Identifies the plugin responsible for the task (e.g., container, spark, ray).
  • TaskTemplate: A proto-serialized core.TaskTemplate stored inline, containing the task's configuration.
  • InputURI: The location of the input data (typically an inputs.pb file in remote storage).
  • RunOutputBase: The base directory where the task should write its outputs and checkpoints.
  • CacheKey: An optional key used for catalog lookups to enable task discovery and memoization.

Status Tracking

The TaskActionStatus tracks the observed state:

  • PluginPhase: The current phase reported by the plugin (e.g., Queued, Initializing, Executing).
  • PluginState: A Gob-encoded blob where plugins persist their internal state between reconciliation loops.
  • Attempts: The current attempt number, used to manage retries.
  • SystemFailures: A counter for consecutive system-level errors, used to prevent infinite loops on infrastructure failures.
  • PhaseHistory: An append-only log of PhaseTransition objects, providing a full timeline of the execution.

Reconciliation Lifecycle

The TaskActionReconciler (implemented in executor/pkg/controller/taskaction_controller.go) implements the Kubernetes reconciliation loop for TaskAction resources. Each loop follows a structured sequence:

  1. Validation and Resolution: The reconciler validates the TaskAction spec and resolves the appropriate plugin from the PluginRegistry based on the TaskType.
  2. Context Creation: It instantiates a TaskExecutionContext, which provides the plugin with a unified interface to interact with the environment.
  3. Cache Evaluation: Before invoking the plugin, the engine checks the Catalog for a cache hit. If a hit is found, it downloads the outputs and transitions the task directly to Succeeded.
  4. Plugin Execution: If no cache hit occurs, the reconciler calls the plugin's Handle method. The plugin returns a Transition indicating the next phase and any updated internal state.
  5. State Persistence: The engine persists the plugin's state, updates the TaskAction status, and emits events to the EventsProxyServiceClient.

Task Execution Context

The TaskExecutionContext (defined in executor/pkg/plugin/task_exec_context.go) abstracts the complexities of the underlying infrastructure from the plugins. It provides several key components:

  • InputReader: Accesses the task's inputs from the location specified in InputURI.
  • OutputWriter: Manages the writing of task outputs. It uses ComputeActionOutputPath to generate sharded, attempt-specific paths to avoid collisions and S3 hot-spots.
  • SecretManager: Provides access to secrets required by the task.
  • ResourceManager: Manages external resources (e.g., Qubole clusters or Spark applications).

Storage Sharding

To ensure scalability and avoid storage performance bottlenecks, the engine implements a sharding strategy in ComputeActionOutputPath. It generates a 2-character base-36 prefix derived from the TaskAction namespace and name, inserting it into the output path:

// executor/pkg/plugin/task_exec_context.go
// s3://bucket/org/proj/domain/run/ → s3://bucket/<shard>/org/proj/domain/run/<action>/<attempt>

Caching and Serialized Reservations

Flyte supports both standard and serialized caching. The logic in executor/pkg/controller/taskaction_cache.go manages these interactions:

  • Cache Hits: If evaluateCacheBeforeExecution finds an entry in the Catalog, it populates the OutputWriter and short-circuits the execution.
  • Serialized Reservations: For tasks marked as cache_serializable, the engine uses GetOrExtendReservation. If another execution is already running for the same cache key, the engine transitions the TaskAction to a WaitingForCache phase and requeues, preventing redundant work.
  • Cache Writeback: Upon successful completion, finalizeCacheAfterExecution writes the task's outputs to the Catalog and releases any held reservations.

Failure Handling and Retries

The engine distinguishes between two types of failures:

System Failures

System failures are infrastructure-level issues (e.g., Kubernetes API errors or plugin-reported SYSTEM errors). The reconciler tracks these in Status.SystemFailures. If the count exceeds DefaultMaxSystemFailures (default 3), the task is moved to a PermanentFailure state. This prevents transient infrastructure issues from exhausting user-defined retry budgets.

User Failures and In-place Restarts

When a plugin reports a USER retryable failure, the engine performs an "in-place restart":

  1. It increments Status.Attempts.
  2. It clears the Status.PluginState.
  3. It invokes the plugin's Abort method to clean up resources from the failed attempt.
  4. The next reconciliation loop starts the task fresh with the new attempt number.

Garbage Collection

Terminal TaskAction resources are cleaned up by the GarbageCollector (found in executor/pkg/controller/garbage_collector.go). When a task becomes terminal, the reconciler stamps it with GC labels:

  • flyte.org/termination-status: Set to terminated.
  • flyte.org/completed-time: The UTC time when the task finished.

The GarbageCollector periodically scans for resources with these labels and deletes those that have exceeded the configured maxTTL (default 1 hour).