Skip to main content

Scheduling & Automation

Flyte provides a robust scheduling and automation system that allows you to execute workflows on a recurring basis using cron expressions. This system is designed for high reliability, ensuring that scheduled runs are idempotent and capable of "catching up" after periods of service downtime.

Managing Triggers

The primary interface for managing automation is the triggerService (located in runs/service/trigger_service.go). This service handles the lifecycle of triggers, including deployment, updates, and deletion.

When you deploy a trigger, Flyte validates the cron expression and persists the trigger definition to the database.

Deploying a Trigger

To schedule a workflow, you call the DeployTrigger method. The service validates the request and the cron expression before saving it.

// From runs/service/trigger_service.go
func (s *triggerService) DeployTrigger(
ctx context.Context,
req *connect.Request[triggerpb.DeployTriggerRequest],
) (*connect.Response[triggerpb.DeployTriggerResponse], error) {
request := req.Msg

if err := request.Validate(); err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
// Validates the cron expression using robfig/cron/v3
if err := validateCronExpression(request.GetName().GetName(), request.GetAutomationSpec()); err != nil {
return nil, err
}

id := &commonpb.TriggerIdentifier{
Name: request.GetName(),
Revision: request.GetRevision(),
}
triggerModel, err := transformers.NewTriggerModel(ctx, id, request.GetSpec(), request.GetAutomationSpec())
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}

saved, err := s.db.TriggerRepo().SaveTrigger(ctx, triggerModel, request.GetRevision())
// ... returns the saved trigger details
}

Cron Expression Validation

Flyte uses the robfig/cron/v3 library for parsing. It supports standard cron expressions and allows for timezone-aware scheduling by prefixing the expression with CRON_TZ.

// From runs/service/trigger_service.go
func validateCronExpression(triggerName string, spec *taskpb.TriggerAutomationSpec) error {
cronSchedule := spec.GetSchedule().GetCron()
if cronSchedule == nil {
return nil
}
expr := cronSchedule.GetExpression()
if tz := cronSchedule.GetTimezone(); tz != "" {
expr = fmt.Sprintf("CRON_TZ=%s %s", tz, expr)
}
if _, err := cron.ParseStandard(expr); err != nil {
return connect.NewError(connect.CodeInvalidArgument,
fmt.Errorf("trigger %q has invalid cron expression: %w", triggerName, err))
}
return nil
}

Scheduling Architecture

The scheduling system consists of three main components working in tandem: the ScheduleSyncer, the GoCronScheduler, and the TriggerExecutor.

Synchronization Loop

The ScheduleSyncer (in runs/scheduler/core/schedule_syncer.go) is a background worker that periodically reloads active triggers from the database. This ensures that the in-memory scheduler stays consistent with the desired state stored in the DB, even if multiple instances of the service are running or if triggers are updated via the API.

// From runs/scheduler/core/schedule_syncer.go
func (s *ScheduleSyncer) sync(ctx context.Context) {
triggers, err := s.listActiveScheduleTriggers(ctx)
if err != nil {
logger.Errorf(ctx, "scheduler: failed to list active triggers: %v", err)
return
}

// Reconciles the in-memory cron jobs
s.scheduler.UpdateSchedules(ctx, triggers)
}

In-Memory Scheduler

The GoCronScheduler (in runs/scheduler/core/scheduler.go) manages the actual cron.Cron instance. When UpdateSchedules is called, it compares the "desired" triggers from the database with the "running" jobs. It adds new jobs, removes deleted ones, and restarts jobs whose specifications (revisions) have changed.

Execution and Idempotency

When a cron job fires, the TriggerExecutor (in runs/scheduler/executor/trigger_executor.go) is responsible for creating the workflow run.

To prevent duplicate executions for the same scheduled slot (e.g., if the scheduler restarts or multiple instances fire), Flyte generates a deterministic run name based on the trigger identity and the scheduled timestamp.

// From runs/scheduler/executor/trigger_executor.go
func runName(t *models.Trigger, scheduledAt time.Time) string {
h := fnv.New64()
_, _ = fmt.Fprintf(h, "%s:%s:%s:%s:%d:%d:%d:%d:%d:%d",
t.Project, t.Domain, t.TaskName, t.Name,
scheduledAt.Year(), scheduledAt.Month(), scheduledAt.Day(),
scheduledAt.Hour(), scheduledAt.Minute(), scheduledAt.Second())
return fmt.Sprintf("r%x", h.Sum64())
}

If the CreateRun call returns a CodeAlreadyExists error, the executor safely ignores it, ensuring that each schedule slot results in exactly one workflow execution.

Catchup Logic

If the Flyte service is down for a period, it may miss several scheduled execution windows. Upon restart, the GoCronScheduler performs a "catchup" phase. It calculates the missed slots between the last recorded execution and the current time and triggers them sequentially.

The CatchupAll method (in runs/scheduler/core/scheduler.go) iterates through active triggers and fires missed runs, subject to a configurable limit (MaxCatchupRunsPerLoop) to prevent overwhelming the system on startup.

Configuration

The scheduler's behavior is controlled via the TriggerSchedulerConfig in runs/config/config.go. You can tune performance and reliability using the following parameters:

ParameterDefaultDescription
enabledtrueWhether the background scheduler worker should run.
resyncInterval30sHow often the ScheduleSyncer polls the database for trigger changes.
maxCatchupRunsPerLoop100The maximum number of missed runs to trigger in a single sync cycle.
executionQps10.0Rate limit (queries per second) for the CreateRun API calls.
executionBurst20Burst capacity for the execution rate limiter.

These settings allow you to balance the responsiveness of the scheduler against the load placed on the Flyte backend.