The Abort Reconciliation System
The Abort Reconciliation System in Flyte ensures that requests to terminate runs or individual actions are reliably delivered to the underlying execution engine (the Actions Service). Because pod termination involves network calls and Kubernetes API interactions that can fail or experience latency, Flyte uses an asynchronous reconciliation pattern rather than a simple synchronous request-response.
The Reliable Abort Philosophy
When a user aborts a run, Flyte must ensure that the corresponding Kubernetes resources (pods, CRDs) are cleaned up to free up cluster resources. A synchronous call to the Actions Service during the API request is fragile; if the Actions Service is temporarily unavailable or the Kubernetes API is slow, the abort might fail, leaving "zombie" pods running indefinitely.
Flyte solves this by treating an abort as a state change in the database that a background process—the AbortReconciler—is responsible for driving to completion.
The Lifecycle of an Abort
The process begins when a user invokes AbortRun or AbortAction via the RunService.
1. Initiation and Persistence
When AbortRun is called in runs/service/run_service.go, the system first updates the database using the ActionRepo.
// From runs/service/run_service.go
if err := s.repo.ActionRepo().AbortRun(ctx, req.Msg.RunId, reason, nil); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
The AbortRun method in the repository marks the action as ABORTED and sets an abort_requested_at timestamp. This timestamp serves as the authoritative signal for the reconciler. After the database is updated, the RunService pushes a task to the reconciler:
if s.abortReconciler != nil {
s.abortReconciler.Push(ctx, &common.ActionIdentifier{Run: req.Msg.RunId, Name: "a0"}, reason)
}
2. The Deduplication Queue
The AbortReconciler uses an internal dedupeQueue (defined in runs/service/abort_reconciler.go) to manage pending work. This queue ensures that the system does not waste resources processing multiple abort requests for the same action simultaneously.
The dedupeQueue is backed by a Go channel and a map of active keys:
type dedupeQueue struct {
mu sync.Mutex
keys map[string]struct{}
ch chan abortTask
}
When a task is pushed, the queue checks if the key (formatted as project/domain/run/actionName) is already present. If it is, the push is a no-op. This is critical during retries; if a task is waiting in its backoff period, any new push for the same action will be ignored until the existing task is either successful or removed.
3. Worker Execution
The AbortReconciler runs a pool of concurrent workers. Each worker pulls an abortTask from the queue and executes processTask.
func (r *AbortReconciler) processTask(ctx context.Context, task abortTask) {
attemptCount, err := r.repo.ActionRepo().MarkAbortAttempt(ctx, task.actionID)
// ... error handling ...
_, abortErr := r.actionsClient.Abort(ctx, connect.NewRequest(&actions.AbortRequest{
ActionId: task.actionID,
Reason: &reason,
}))
// ... success/failure logic ...
}
The MarkAbortAttempt call increments an abort_attempts counter in the database. This ensures that even if the Flyte process crashes, the retry count is preserved.
Resilience and Recovery
Startup Scan
To handle crashes or restarts, the AbortReconciler performs a startupScan when it begins its Run loop. It queries the database for any actions where abort_requested_at is set but the abort has not yet succeeded.
func (r *AbortReconciler) startupScan(ctx context.Context) error {
pending, err := r.repo.ActionRepo().ListPendingAborts(ctx)
if err != nil {
return err
}
for _, a := range pending {
// ... create task ...
r.queue.push(ctx, task)
}
return nil
}
The system starts the workers before performing the startup scan. This prevents the push channel from blocking if the number of pending aborts exceeds the QueueSize.
Exponential Backoff with Jitter
If the Actions Service returns an error, the reconciler schedules a retry using exponential backoff with "full jitter."
ceiling := r.cfg.InitialDelay * (1 << (attemptCount - 1))
if ceiling > r.cfg.MaxDelay {
ceiling = r.cfg.MaxDelay
}
delay := time.Duration(rand.Int63n(int64(ceiling) + 1))
r.queue.scheduleRequeue(ctx, task, delay)
The use of rand.Int63n for jitter is a design choice to prevent "thundering herd" scenarios where many failed aborts retry at the exact same moment after a service outage.
Handling Edge Cases
Already Terminated Resources
A common scenario is that a pod has already been deleted by Kubernetes or a previous abort attempt succeeded but the network response was lost. The reconciler handles this via the isAlreadyTerminated helper.
func isAlreadyTerminated(err error) bool {
// ...
if connectErr.Code() == connect.CodeNotFound {
return true
}
// The actions service forwards Kubernetes API "not found" errors with CodeInternal.
if connectErr.Code() == connect.CodeInternal && strings.Contains(connectErr.Message(), "not found") {
return true
}
return false
}
If the error indicates the resource is already gone, the reconciler treats the operation as a success and clears the abort request from the database.
Max Attempts and Manual Intervention
If an abort fails repeatedly and reaches MaxAttempts (default 10), the reconciler gives up. It logs an error stating that "manual intervention may be required" and clears the abort request from the database to prevent the worker from being stuck on a permanently failing task.
Configuration
The system is tuned via AbortReconcilerConfig, which allows operators to balance between aggressive cleanup and system load.
| Parameter | Default | Description |
|---|---|---|
Workers | 5 | Number of concurrent goroutines processing aborts. |
MaxAttempts | 10 | Maximum retries before giving up on an action. |
QueueSize | 1000 | Buffer size for the in-memory task channel. |
InitialDelay | 1s | The base duration for the first retry backoff. |
MaxDelay | 5m | The maximum duration any backoff can reach. |