Scheduling & Automation
Flyte provides a robust scheduling and automation system that allows you to execute workflows on a recurring basis using cron expressions. This system is designed for high reliability, ensuring that scheduled runs are idempotent and capable of "catching up" after periods of service downtime.
Managing Triggers
The primary interface for managing automation is the triggerService (located in runs/service/trigger_service.go). This service handles the lifecycle of triggers, including deployment, updates, and deletion.
When you deploy a trigger, Flyte validates the cron expression and persists the trigger definition to the database.
Deploying a Trigger
To schedule a workflow, you call the DeployTrigger method. The service validates the request and the cron expression before saving it.
// From runs/service/trigger_service.go
func (s *triggerService) DeployTrigger(
ctx context.Context,
req *connect.Request[triggerpb.DeployTriggerRequest],
) (*connect.Response[triggerpb.DeployTriggerResponse], error) {
request := req.Msg
if err := request.Validate(); err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
// Validates the cron expression using robfig/cron/v3
if err := validateCronExpression(request.GetName().GetName(), request.GetAutomationSpec()); err != nil {
return nil, err
}
id := &commonpb.TriggerIdentifier{
Name: request.GetName(),
Revision: request.GetRevision(),
}
triggerModel, err := transformers.NewTriggerModel(ctx, id, request.GetSpec(), request.GetAutomationSpec())
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
saved, err := s.db.TriggerRepo().SaveTrigger(ctx, triggerModel, request.GetRevision())
// ... returns the saved trigger details
}
Cron Expression Validation
Flyte uses the robfig/cron/v3 library for parsing. It supports standard cron expressions and allows for timezone-aware scheduling by prefixing the expression with CRON_TZ.
// From runs/service/trigger_service.go
func validateCronExpression(triggerName string, spec *taskpb.TriggerAutomationSpec) error {
cronSchedule := spec.GetSchedule().GetCron()
if cronSchedule == nil {
return nil
}
expr := cronSchedule.GetExpression()
if tz := cronSchedule.GetTimezone(); tz != "" {
expr = fmt.Sprintf("CRON_TZ=%s %s", tz, expr)
}
if _, err := cron.ParseStandard(expr); err != nil {
return connect.NewError(connect.CodeInvalidArgument,
fmt.Errorf("trigger %q has invalid cron expression: %w", triggerName, err))
}
return nil
}
Scheduling Architecture
The scheduling system consists of three main components working in tandem: the ScheduleSyncer, the GoCronScheduler, and the TriggerExecutor.
Synchronization Loop
The ScheduleSyncer (in runs/scheduler/core/schedule_syncer.go) is a background worker that periodically reloads active triggers from the database. This ensures that the in-memory scheduler stays consistent with the desired state stored in the DB, even if multiple instances of the service are running or if triggers are updated via the API.
// From runs/scheduler/core/schedule_syncer.go
func (s *ScheduleSyncer) sync(ctx context.Context) {
triggers, err := s.listActiveScheduleTriggers(ctx)
if err != nil {
logger.Errorf(ctx, "scheduler: failed to list active triggers: %v", err)
return
}
// Reconciles the in-memory cron jobs
s.scheduler.UpdateSchedules(ctx, triggers)
}
In-Memory Scheduler
The GoCronScheduler (in runs/scheduler/core/scheduler.go) manages the actual cron.Cron instance. When UpdateSchedules is called, it compares the "desired" triggers from the database with the "running" jobs. It adds new jobs, removes deleted ones, and restarts jobs whose specifications (revisions) have changed.
Execution and Idempotency
When a cron job fires, the TriggerExecutor (in runs/scheduler/executor/trigger_executor.go) is responsible for creating the workflow run.
To prevent duplicate executions for the same scheduled slot (e.g., if the scheduler restarts or multiple instances fire), Flyte generates a deterministic run name based on the trigger identity and the scheduled timestamp.
// From runs/scheduler/executor/trigger_executor.go
func runName(t *models.Trigger, scheduledAt time.Time) string {
h := fnv.New64()
_, _ = fmt.Fprintf(h, "%s:%s:%s:%s:%d:%d:%d:%d:%d:%d",
t.Project, t.Domain, t.TaskName, t.Name,
scheduledAt.Year(), scheduledAt.Month(), scheduledAt.Day(),
scheduledAt.Hour(), scheduledAt.Minute(), scheduledAt.Second())
return fmt.Sprintf("r%x", h.Sum64())
}
If the CreateRun call returns a CodeAlreadyExists error, the executor safely ignores it, ensuring that each schedule slot results in exactly one workflow execution.
Catchup Logic
If the Flyte service is down for a period, it may miss several scheduled execution windows. Upon restart, the GoCronScheduler performs a "catchup" phase. It calculates the missed slots between the last recorded execution and the current time and triggers them sequentially.
The CatchupAll method (in runs/scheduler/core/scheduler.go) iterates through active triggers and fires missed runs, subject to a configurable limit (MaxCatchupRunsPerLoop) to prevent overwhelming the system on startup.
Configuration
The scheduler's behavior is controlled via the TriggerSchedulerConfig in runs/config/config.go. You can tune performance and reliability using the following parameters:
| Parameter | Default | Description |
|---|---|---|
enabled | true | Whether the background scheduler worker should run. |
resyncInterval | 30s | How often the ScheduleSyncer polls the database for trigger changes. |
maxCatchupRunsPerLoop | 100 | The maximum number of missed runs to trigger in a single sync cycle. |
executionQps | 10.0 | Rate limit (queries per second) for the CreateRun API calls. |
executionBurst | 20 | Burst capacity for the execution rate limiter. |
These settings allow you to balance the responsiveness of the scheduler against the load placed on the Flyte backend.