Task Execution Engine
The Task Execution Engine in Flyte is the core infrastructure responsible for managing the lifecycle of individual task executions. It is built around the TaskAction Custom Resource Definition (CRD) and a specialized controller, the TaskActionReconciler, which orchestrates plugin execution, data movement, and state persistence.
The TaskAction Resource
The TaskAction resource (defined in executor/api/v1/taskaction_types.go) serves as the primary interface between the Flyte control plane and the execution engine. It encapsulates everything needed to run a task and tracks its progress through a series of phases.
Specification
The TaskActionSpec contains the desired state for a task execution:
- TaskType: Identifies the plugin responsible for the task (e.g.,
container,spark,ray). - TaskTemplate: A proto-serialized
core.TaskTemplatestored inline, containing the task's configuration. - InputURI: The location of the input data (typically an
inputs.pbfile in remote storage). - RunOutputBase: The base directory where the task should write its outputs and checkpoints.
- CacheKey: An optional key used for catalog lookups to enable task discovery and memoization.
Status Tracking
The TaskActionStatus tracks the observed state:
- PluginPhase: The current phase reported by the plugin (e.g.,
Queued,Initializing,Executing). - PluginState: A Gob-encoded blob where plugins persist their internal state between reconciliation loops.
- Attempts: The current attempt number, used to manage retries.
- SystemFailures: A counter for consecutive system-level errors, used to prevent infinite loops on infrastructure failures.
- PhaseHistory: An append-only log of
PhaseTransitionobjects, providing a full timeline of the execution.
Reconciliation Lifecycle
The TaskActionReconciler (implemented in executor/pkg/controller/taskaction_controller.go) implements the Kubernetes reconciliation loop for TaskAction resources. Each loop follows a structured sequence:
- Validation and Resolution: The reconciler validates the
TaskActionspec and resolves the appropriate plugin from thePluginRegistrybased on theTaskType. - Context Creation: It instantiates a
TaskExecutionContext, which provides the plugin with a unified interface to interact with the environment. - Cache Evaluation: Before invoking the plugin, the engine checks the
Catalogfor a cache hit. If a hit is found, it downloads the outputs and transitions the task directly toSucceeded. - Plugin Execution: If no cache hit occurs, the reconciler calls the plugin's
Handlemethod. The plugin returns aTransitionindicating the next phase and any updated internal state. - State Persistence: The engine persists the plugin's state, updates the
TaskActionstatus, and emits events to theEventsProxyServiceClient.
Task Execution Context
The TaskExecutionContext (defined in executor/pkg/plugin/task_exec_context.go) abstracts the complexities of the underlying infrastructure from the plugins. It provides several key components:
- InputReader: Accesses the task's inputs from the location specified in
InputURI. - OutputWriter: Manages the writing of task outputs. It uses
ComputeActionOutputPathto generate sharded, attempt-specific paths to avoid collisions and S3 hot-spots. - SecretManager: Provides access to secrets required by the task.
- ResourceManager: Manages external resources (e.g., Qubole clusters or Spark applications).
Storage Sharding
To ensure scalability and avoid storage performance bottlenecks, the engine implements a sharding strategy in ComputeActionOutputPath. It generates a 2-character base-36 prefix derived from the TaskAction namespace and name, inserting it into the output path:
// executor/pkg/plugin/task_exec_context.go
// s3://bucket/org/proj/domain/run/ → s3://bucket/<shard>/org/proj/domain/run/<action>/<attempt>
Caching and Serialized Reservations
Flyte supports both standard and serialized caching. The logic in executor/pkg/controller/taskaction_cache.go manages these interactions:
- Cache Hits: If
evaluateCacheBeforeExecutionfinds an entry in theCatalog, it populates theOutputWriterand short-circuits the execution. - Serialized Reservations: For tasks marked as
cache_serializable, the engine usesGetOrExtendReservation. If another execution is already running for the same cache key, the engine transitions theTaskActionto aWaitingForCachephase and requeues, preventing redundant work. - Cache Writeback: Upon successful completion,
finalizeCacheAfterExecutionwrites the task's outputs to theCatalogand releases any held reservations.
Failure Handling and Retries
The engine distinguishes between two types of failures:
System Failures
System failures are infrastructure-level issues (e.g., Kubernetes API errors or plugin-reported SYSTEM errors). The reconciler tracks these in Status.SystemFailures. If the count exceeds DefaultMaxSystemFailures (default 3), the task is moved to a PermanentFailure state. This prevents transient infrastructure issues from exhausting user-defined retry budgets.
User Failures and In-place Restarts
When a plugin reports a USER retryable failure, the engine performs an "in-place restart":
- It increments
Status.Attempts. - It clears the
Status.PluginState. - It invokes the plugin's
Abortmethod to clean up resources from the failed attempt. - The next reconciliation loop starts the task fresh with the new attempt number.
Garbage Collection
Terminal TaskAction resources are cleaned up by the GarbageCollector (found in executor/pkg/controller/garbage_collector.go). When a task becomes terminal, the reconciler stamps it with GC labels:
flyte.org/termination-status: Set toterminated.flyte.org/completed-time: The UTC time when the task finished.
The GarbageCollector periodically scans for resources with these labels and deletes those that have exceeded the configured maxTTL (default 1 hour).