Skip to main content

The Reconciler Architecture

The TaskActionReconciler is the central control loop of the Flyte executor. It manages the lifecycle of TaskAction custom resources, coordinating between the Flyte control plane, the plugin registry, and the data catalog.

The Reconciliation Lifecycle

When a TaskAction is created or updated, the TaskActionReconciler.Reconcile method in executor/pkg/controller/taskaction_controller.go executes a sequence of steps to observe the current state and move it toward the desired state.

1. Validation and Plugin Resolution

Before any execution logic begins, the reconciler validates the TaskAction spec using validateTaskAction. This ensures that required fields like Project, Domain, and TaskType are present. It then resolves the appropriate plugin from the PluginRegistry.

If validation fails or no plugin is found for the TaskType, the reconciler marks the TaskAction as failed and does not add a finalizer, allowing the resource to be easily deleted.

2. Cache Evaluation (Pre-execution)

If the task is configured as discoverable, the reconciler invokes evaluateCacheBeforeExecution (defined in executor/pkg/controller/taskaction_cache.go).

  • Cache Hit: If a valid cache entry exists in the Catalog, the reconciler downloads the outputs, writes them to the OutputWriter, and transitions the task directly to PhaseSuccess.
  • Cache Reservation: For tasks marked as serializable, the reconciler attempts to acquire a reservation. If another worker already holds the reservation, the task transitions to PhaseWaitingForCache and requeues, preventing redundant execution of the same work.

3. Plugin Execution

If no cache hit occurs, the reconciler invokes the plugin's Handle method. This is where the actual work (e.g., launching a K8s Pod, calling an external API) happens. The plugin returns a pluginsCore.Transition indicating the new phase of the task.

// From executor/pkg/controller/taskaction_controller.go
if !cacheShortCircuited {
transition, err = p.Handle(ctx, tCtx)
if err != nil {
return r.recordSystemError(ctx, taskAction, originalTaskActionInstance, p.GetID(), err)
}
}

4. Cache Finalization (Post-execution)

After the plugin completes successfully, finalizeCacheAfterExecution handles writing the task outputs back to the Catalog and releasing any held reservations. If the task failed or was aborted, it ensures that reservations are released so other workers can attempt the task.

5. Status Updates and Event Emission

The reconciler persists the new state to the Kubernetes API and emits an ActionEvent to the Flyte control plane via the eventsClient. It uses retry.RetryOnConflict to handle optimistic locking issues when multiple reconciliation loops attempt to update the same resource.

Error Handling and Retries

Flyte distinguishes between two types of failures: system errors and user errors.

System Failures

System failures are transient issues (e.g., network blips, K8s API timeouts) that are not the fault of the user's code. The reconciler tracks these in Status.SystemFailures.

  • Threshold: The reconciler allows up to MaxSystemFailures (default 3, configured via DefaultMaxSystemFailures).
  • Reset: If a system error occurs, the reconciler calls resetPluginResource, which invokes the plugin's Abort method and clears the PluginState to ensure the next attempt starts fresh.
  • Permanent Failure: If the threshold is exceeded, the task is moved to PermanentFailure with the code MaxSystemFailuresExceeded.

User Retries (In-place Restarts)

When a plugin reports a PhaseRetryableFailure of kind USER, the reconciler performs an "in-place" restart if the maximum number of attempts has not been reached.

  1. The current resource (e.g., a Pod) is aborted via p.Abort.
  2. Status.Attempts is incremented.
  3. Status.PluginState is cleared.
  4. The task is transitioned back to PhaseQueued to be picked up in the next reconciliation cycle.

Finalization and Resource Cleanup

The reconciler uses a finalizer (flyte.org/plugin-finalizer) to ensure that external resources are cleaned up when a TaskAction is deleted. When the DeletionTimestamp is set, handleAbortAndFinalize is called:

  1. It invokes p.Abort to stop any in-flight execution.
  2. It invokes p.Finalize to allow the plugin to perform any necessary cleanup.
  3. It releases any held cache reservations.
  4. It emits an Aborted event to the control plane.
  5. Finally, it removes the finalizer to allow Kubernetes to delete the CRD.

Observability and Metrics

The taskActionMetrics class (in executor/pkg/controller/metrics.go) provides OpenTelemetry instrumentation for the controller.

Efficient Phase Counting

To avoid the overhead of deep-copying every TaskAction CRD during metric collection, the reconciler uses a cachedPhaseCounter. This function accesses the controller-runtime informer cache's indexer directly, performing an $O(N)$ pointer read instead of full-object copies.

// From executor/pkg/controller/metrics.go
func cachedPhaseCounter(c ctrlcache.Cache) func(context.Context) map[string]int64 {
// ...
indexer = sii.GetIndexer()
objs := indexer.List() // Returns cached object pointers without deep-copying
// ...
return countByPhase(items)
}

Key Metrics

  • taskaction.active: An asynchronous gauge showing the number of TaskAction CRDs labeled by their current plugin_phase.
  • taskaction.k8s.duration: A histogram tracking the latency of get, update, and status_update operations against the Kubernetes API.
  • taskaction.crd.size_bytes: A histogram tracking the serialized JSON size of the CRDs, helping identify potential performance bottlenecks due to large plugin states.