Kubernetes Task Lifecycle
Flyte manages tasks that run as native Kubernetes resources (such as Pods, SparkJobs, or RayJobs) through a specialized lifecycle manager. This ensures that Flyte can launch resources, monitor their progress, capture asynchronous events, and clean up resources reliably across reconciliation rounds.
The Plugin Manager Bridge
The PluginManager class in executor/pkg/plugin/k8s/plugin_manager.go acts as a bridge between Flyte's generic task execution engine and the Kubernetes API. It wraps a specific k8s.Plugin implementation (like the Spark or Pod plugin) and handles the boilerplate of Kubernetes resource management.
When a Kubernetes-based plugin is registered in executor/pkg/plugin/registry.go, Flyte initializes a PluginManager for it:
// From executor/pkg/plugin/registry.go
pm := executorK8s.NewPluginManager(
entry.ID,
entry.Plugin,
r.setupCtx.KubeClient(),
)
if err := pm.InitializeObjectEventWatcher(ctx); err != nil {
return fmt.Errorf("failed to initialize k8s object event watcher for plugin %s: %w", entry.ID, err)
}
Lifecycle State Machine
The PluginManager uses a simple state machine to track whether a Kubernetes resource has been created. This state is defined by the PluginPhase and persisted in PluginState.
Plugin Phases
The lifecycle consists of two primary phases defined in executor/pkg/plugin/k8s/plugin_manager.go:
PluginPhaseNotStarted: The initial state. The manager needs to build and create the Kubernetes resource.PluginPhaseStarted: The resource has been successfully submitted to the Kubernetes API. The manager now monitors its status.
Persistent State
To survive restarts and multiple reconciliation loops, the manager persists a PluginState object:
type PluginState struct {
Phase PluginPhase
K8sPluginState k8s.PluginState
LastEventUpdate time.Time
LastEventRecordedAt time.Time
}
The Reconciliation Loop
The Handle method is the core of the lifecycle. It is invoked by the Flyte engine during every reconciliation round for a task.
- State Recovery: It reads the existing
PluginStatefrom theTaskExecutionContext. If the version (hardcoded aspluginStateVersion = 1) mismatches, it triggers a retryable failure. - Resource Launch: If the phase is
PluginPhaseNotStarted, it callslaunchResource. This method uses the underlying plugin'sBuildResourcemethod to generate the Kubernetes object, adds Flyte-specific metadata (labels, annotations, and owner references), and calls the K8s API to create it. - Status Monitoring: If the phase is
PluginPhaseStarted, it callscheckResourcePhase. This fetches the latest resource state from Kubernetes and delegates the phase evaluation to the underlying plugin'sGetTaskPhasemethod.
Handling External Deletion
If a resource is deleted from Kubernetes by an external actor (e.g., a user manually deleting a Pod) while the task is still running, PluginManager detects this in checkResourcePhase:
if err := pm.kubeClient.GetClient().Get(ctx, nsName, o); err != nil {
if k8serrors.IsNotFound(err) || k8serrors.IsGone(err) || k8serrors.IsResourceExpired(err) {
failureReason := fmt.Sprintf("resource not found, name [%s]. reason: %s", nsName.String(), err.Error())
return pluginsCore.DoTransition(pluginsCore.PhaseInfoSystemRetryableFailure("ResourceDeletedExternally", failureReason, nil)), nil
}
return pluginsCore.UnknownTransition, err
}
Event Monitoring and Enrichment
Flyte enriches task logs and status messages by watching native Kubernetes events (e.g., Scheduled, Pulling image, FailedScheduling).
The PluginManager uses an objectEventWatcher to asynchronously collect these events. During the Handle loop, it calls attachRecentObjectEvents to append new events to the task's AdditionalReasons.
To prevent duplicate logs, it uses LastEventUpdate and LastEventRecordedAt in the PluginState to filter for only new events since the last reconciliation:
recentEvents := pm.eventWatcher.List(objectKey, lastEventUpdate, lastEventRecordedAt)
for _, event := range recentEvents {
info.AdditionalReasons = append(info.AdditionalReasons, pluginsCore.ReasonInfo{
Reason: event.Message,
OccurredAt: &event.CreatedAt,
})
lastEventUpdate = event.CreatedAt
lastEventRecordedAt = event.RecordedAt
}
Cleanup: Abort and Finalize
Flyte ensures that Kubernetes resources do not leak when a task is cancelled or finishes.
Aborting Tasks
When a task is killed, Abort is called. By default, it attempts to delete the Kubernetes resource. However, plugins can override this behavior by implementing k8s.PluginAbortOverride, allowing them to patch the resource (e.g., setting a "cancelled" flag) instead of deleting it immediately.
Finalization
The Finalize method is responsible for the final cleanup:
- Finalizers: If
InjectFinalizeris enabled in theK8sPluginConfig, Flyte adds theflyte/flytek8sfinalizer to the resource.Finalizeis responsible for clearing this finalizer so the resource can be garbage collected by Kubernetes. - Resource Deletion: If
DeleteResourceOnFinalizeis configured, the manager will issue a delete command for the resource once the task reaches a terminal state.
if cfg.DeleteResourceOnFinalize && !pm.plugin.GetProperties().DisableDeleteResourceOnFinalize {
if err := pm.kubeClient.GetClient().Delete(ctx, o); err != nil {
// ... handle error ...
}
}