The Reconciler Architecture
The TaskActionReconciler is the central control loop of the Flyte executor. It manages the lifecycle of TaskAction custom resources, coordinating between the Flyte control plane, the plugin registry, and the data catalog.
The Reconciliation Lifecycle
When a TaskAction is created or updated, the TaskActionReconciler.Reconcile method in executor/pkg/controller/taskaction_controller.go executes a sequence of steps to observe the current state and move it toward the desired state.
1. Validation and Plugin Resolution
Before any execution logic begins, the reconciler validates the TaskAction spec using validateTaskAction. This ensures that required fields like Project, Domain, and TaskType are present. It then resolves the appropriate plugin from the PluginRegistry.
If validation fails or no plugin is found for the TaskType, the reconciler marks the TaskAction as failed and does not add a finalizer, allowing the resource to be easily deleted.
2. Cache Evaluation (Pre-execution)
If the task is configured as discoverable, the reconciler invokes evaluateCacheBeforeExecution (defined in executor/pkg/controller/taskaction_cache.go).
- Cache Hit: If a valid cache entry exists in the
Catalog, the reconciler downloads the outputs, writes them to theOutputWriter, and transitions the task directly toPhaseSuccess. - Cache Reservation: For tasks marked as
serializable, the reconciler attempts to acquire a reservation. If another worker already holds the reservation, the task transitions toPhaseWaitingForCacheand requeues, preventing redundant execution of the same work.
3. Plugin Execution
If no cache hit occurs, the reconciler invokes the plugin's Handle method. This is where the actual work (e.g., launching a K8s Pod, calling an external API) happens. The plugin returns a pluginsCore.Transition indicating the new phase of the task.
// From executor/pkg/controller/taskaction_controller.go
if !cacheShortCircuited {
transition, err = p.Handle(ctx, tCtx)
if err != nil {
return r.recordSystemError(ctx, taskAction, originalTaskActionInstance, p.GetID(), err)
}
}
4. Cache Finalization (Post-execution)
After the plugin completes successfully, finalizeCacheAfterExecution handles writing the task outputs back to the Catalog and releasing any held reservations. If the task failed or was aborted, it ensures that reservations are released so other workers can attempt the task.
5. Status Updates and Event Emission
The reconciler persists the new state to the Kubernetes API and emits an ActionEvent to the Flyte control plane via the eventsClient. It uses retry.RetryOnConflict to handle optimistic locking issues when multiple reconciliation loops attempt to update the same resource.
Error Handling and Retries
Flyte distinguishes between two types of failures: system errors and user errors.
System Failures
System failures are transient issues (e.g., network blips, K8s API timeouts) that are not the fault of the user's code. The reconciler tracks these in Status.SystemFailures.
- Threshold: The reconciler allows up to
MaxSystemFailures(default 3, configured viaDefaultMaxSystemFailures). - Reset: If a system error occurs, the reconciler calls
resetPluginResource, which invokes the plugin'sAbortmethod and clears thePluginStateto ensure the next attempt starts fresh. - Permanent Failure: If the threshold is exceeded, the task is moved to
PermanentFailurewith the codeMaxSystemFailuresExceeded.
User Retries (In-place Restarts)
When a plugin reports a PhaseRetryableFailure of kind USER, the reconciler performs an "in-place" restart if the maximum number of attempts has not been reached.
- The current resource (e.g., a Pod) is aborted via
p.Abort. Status.Attemptsis incremented.Status.PluginStateis cleared.- The task is transitioned back to
PhaseQueuedto be picked up in the next reconciliation cycle.
Finalization and Resource Cleanup
The reconciler uses a finalizer (flyte.org/plugin-finalizer) to ensure that external resources are cleaned up when a TaskAction is deleted. When the DeletionTimestamp is set, handleAbortAndFinalize is called:
- It invokes
p.Abortto stop any in-flight execution. - It invokes
p.Finalizeto allow the plugin to perform any necessary cleanup. - It releases any held cache reservations.
- It emits an
Abortedevent to the control plane. - Finally, it removes the finalizer to allow Kubernetes to delete the CRD.
Observability and Metrics
The taskActionMetrics class (in executor/pkg/controller/metrics.go) provides OpenTelemetry instrumentation for the controller.
Efficient Phase Counting
To avoid the overhead of deep-copying every TaskAction CRD during metric collection, the reconciler uses a cachedPhaseCounter. This function accesses the controller-runtime informer cache's indexer directly, performing an $O(N)$ pointer read instead of full-object copies.
// From executor/pkg/controller/metrics.go
func cachedPhaseCounter(c ctrlcache.Cache) func(context.Context) map[string]int64 {
// ...
indexer = sii.GetIndexer()
objs := indexer.List() // Returns cached object pointers without deep-copying
// ...
return countByPhase(items)
}
Key Metrics
taskaction.active: An asynchronous gauge showing the number ofTaskActionCRDs labeled by their currentplugin_phase.taskaction.k8s.duration: A histogram tracking the latency ofget,update, andstatus_updateoperations against the Kubernetes API.taskaction.crd.size_bytes: A histogram tracking the serialized JSON size of the CRDs, helping identify potential performance bottlenecks due to large plugin states.