Skip to main content

Architecture of the Cron Scheduler

The GoCronScheduler is the central component in Flyte responsible for managing time-based triggers. It acts as a bridge between the persistent state of triggers in the database and the execution engine, ensuring that scheduled tasks (both cron expressions and fixed-rate intervals) are fired at the correct times.

Lifecycle and Initialization

The scheduler is typically initialized and started during the application bootstrap process in runs/scheduler/start.go. It requires an Executor implementation, which defines how a single scheduled run is actually triggered (e.g., via a gRPC call to the Flyte service).

// runs/scheduler/start.go

exec := executor.NewTriggerExecutor(executor.TriggerExecutorConfig{
BaseURL: baseURL,
QPS: cfg.ExecutionQPS,
Burst: cfg.ExecutionBurst,
ClientOpts: clientOpts,
})

sched := core.NewGoCronScheduler(exec)
sched.Start() // Non-blocking start of the underlying cron engine

The GoCronScheduler uses the robfig/cron/v3 library internally, configured to operate strictly in UTC to ensure consistency across distributed deployments. When the scheduler needs to shut down, calling Stop() returns a context that you should wait on to ensure all in-flight jobs complete gracefully.

State Reconciliation

Because Flyte triggers can be added, updated, or deactivated at any time, the GoCronScheduler does not maintain a static list of jobs. Instead, it uses a reconciliation pattern via the UpdateSchedules method. This method is periodically called by the ScheduleSyncer (defaulting to every 10 seconds) to align the in-memory cron state with the database.

The reconciliation logic in runs/scheduler/core/scheduler.go follows these rules:

  1. Removal: Any job currently in memory that is not present in the provided list of active triggers is removed from the underlying cron engine.
  2. Update Detection: The scheduler uses the LatestRevision field of the models.Trigger. If a trigger exists in memory but its LatestRevision has changed, the old job is removed and a new one is scheduled.
  3. Addition: New triggers are parsed and added as GoCronJob instances.
// runs/scheduler/core/scheduler.go

func (s *GoCronScheduler) UpdateSchedules(ctx context.Context, triggers []*models.Trigger) {
// ... (logic to identify desired triggers)

s.mu.Lock()
defer s.mu.Unlock()

// Remove jobs no longer desired
for key, job := range s.jobs {
if _, ok := desired[key]; !ok {
s.cron.Remove(job.entryID)
delete(s.jobs, key)
}
}

// Add or update jobs
for key, t := range desired {
if existing, exists := s.jobs[key]; exists {
if existing.trigger.LatestRevision == t.LatestRevision {
continue // No change
}
s.cron.Remove(existing.entryID)
}
// ... (parse and schedule new job)
}
}

Job Execution and Mapping

Each active trigger is mapped to a GoCronJob (defined in runs/scheduler/core/job.go). This class implements the cron.TimedJob interface, which allows the underlying cron library to pass the exact scheduled time to the job's Run method.

The TriggerKey function generates a unique identifier for each job based on the Flyte project, domain, task name, and trigger name:

// runs/scheduler/core/job.go

func TriggerKey(t *models.Trigger) string {
return t.Project + "/" + t.Domain + "/" + t.TaskName + "/" + t.Name
}

func (j *GoCronJob) Run(scheduledAt time.Time) {
// The scheduledAt time is passed by the cron library
if err := j.executor.Execute(j.ctx, j.trigger, scheduledAt.UTC()); err != nil {
// Log error if execution fails
}
}

Catch-up Mechanism

When the scheduler starts up or a trigger is re-activated, there may be "missed" runs that occurred while the scheduler was offline or the trigger was inactive. Flyte handles this via CatchupAll.

Calculating Missed Runs

The scheduler determines the baseline for missed runs using startTimeFallback in runs/scheduler/core/schedule_time.go. It selects the latest of:

  • UpdatedAt: The time the trigger was last modified (e.g., activated).
  • TriggeredAt: The time the trigger last successfully fired.

Using UpdatedAt as a baseline ensures that if you deactivate a trigger for a week and then re-activate it, Flyte does not try to "catch up" on the entire week of missed runs.

Execution Limits

To prevent overwhelming the system during a restart, CatchupAll accepts a maxRunsPerLoop parameter (configured via MaxCatchupRunsPerLoop). It will fire at most this many missed runs across all triggers in a single catch-up cycle.

// runs/scheduler/core/scheduler.go

func (s *GoCronScheduler) CatchupAll(ctx context.Context, triggers []*models.Trigger, now time.Time, maxRunsPerLoop int) {
fired := 0
for _, t := range triggers {
if fired >= maxRunsPerLoop {
break
}
times, _ := GetCatchUpTimes(t, now)
for _, scheduledAt := range times {
if fired >= maxRunsPerLoop {
break
}
s.executor.Execute(ctx, t, scheduledAt.UTC())
fired++
}
}
}

Schedule Parsing

Flyte supports two types of schedules, which are parsed in runs/scheduler/core/schedule_time.go:

  1. Cron Expressions: Standard cron strings (e.g., 0 * * * *). These are parsed using cron.ParseStandard.
  2. Fixed Rates: Intervals defined by a value and a unit (Minutes, Hours, Days). These are converted into cron.ConstantDelaySchedule.

The ParseSchedule function handles the unmarshaling of the TriggerAutomationSpec protobuf to extract these details and return a cron.Schedule compatible with the internal engine.