Skip to main content

Configuring Execution Throughput

When Flyte schedules many tasks simultaneously or performs a catchup after downtime, it can overwhelm the runs service with CreateRun requests. The TriggerExecutorConfig allows you to tune the throughput of these executions using a token-bucket rate limiter.

Configuring Rate Limits and Burst Capacity

To manage the load on the runs service, you configure the QPS (Queries Per Second) and Burst parameters within the TriggerExecutorConfig. These settings are typically mapped from the global TriggerSchedulerConfig during the scheduler's startup.

// From runs/scheduler/start.go
exec := executor.NewTriggerExecutor(executor.TriggerExecutorConfig{
BaseURL: baseURL,
QPS: cfg.ExecutionQPS, // Default: 10.0
Burst: cfg.ExecutionBurst, // Default: 20
ClientOpts: clientOpts,
})

The TriggerExecutor uses these values to initialize a rate.Limiter. Every time a scheduled trigger fires, the Execute method calls limiter.Wait(ctx) before making the RPC call:

// From runs/scheduler/executor/trigger_executor.go
func (e *TriggerExecutor) Execute(ctx context.Context, t *models.Trigger, scheduledAt time.Time) error {
// Blocks until a token is available or context is cancelled
if err := e.limiter.Wait(ctx); err != nil {
return err
}

// ... proceed to CreateRun
}

Ensuring Idempotency with Deterministic Run Names

When tuning throughput, you may encounter retries or overlapping catchup loops. Flyte prevents duplicate runs for the same scheduled slot by generating a deterministic run name based on the trigger metadata and the scheduled time.

The runName function in runs/scheduler/executor/trigger_executor.go creates a hash of the project, domain, task name, and the exact scheduled timestamp:

func runName(t *models.Trigger, scheduledAt time.Time) string {
h := fnv.New64()
_, _ = fmt.Fprintf(h, "%s:%s:%s:%s:%d:%d:%d:%d:%d:%d",
t.Project, t.Domain, t.TaskName, t.Name,
scheduledAt.Year(), scheduledAt.Month(), scheduledAt.Day(),
scheduledAt.Hour(), scheduledAt.Minute(), scheduledAt.Second())
return fmt.Sprintf("r%x", h.Sum64())
}

If the TriggerExecutor attempts to create a run that already exists (e.g., from a previous failed attempt that actually succeeded on the server), the runs service will return an "AlreadyExists" error, which the executor handles gracefully to ensure the schedule progresses.

Tuning Client Options

You can pass connect.ClientOption values into the TriggerExecutorConfig to customize the underlying RPC client used for CreateRun calls. This is useful for adding interceptors, setting custom timeouts, or configuring compression.

clientOpts := []connect.ClientOption{
connect.WithGRPC(),
// Add custom interceptors or timeouts here
}

config := executor.TriggerExecutorConfig{
BaseURL: "http://flyteadmin:8080",
QPS: 50.0,
Burst: 100,
ClientOpts: clientOpts,
}

Troubleshooting Throughput Issues

Context Cancellation

The Execute method respects the provided context.Context. If the scheduler is shutting down or a timeout is reached while waiting for the rate limiter, e.limiter.Wait(ctx) will return an error immediately.

Catchup Backpressure

During a "catchup" phase (triggered via sched.CatchupAll), the scheduler may attempt to fire hundreds of missed runs. If your QPS is set too low, the catchup process will be throttled, potentially delaying the return to a steady state. Monitor the MaxCatchupRunsPerLoop setting in conjunction with your QPS to ensure the system can recover from downtime efficiently without crashing the backend.

// From runs/scheduler/start.go
// MaxCatchupRunsPerLoop limits how many runs are queued in one sync cycle
sched.CatchupAll(ctx, triggers, time.Now().UTC(), cfg.MaxCatchupRunsPerLoop)