Trigger State Reconciliation
Flyte implements a pull-based reconciliation model for scheduled triggers, where the scheduler periodically polls the database to synchronize its in-memory state with the desired state of the system. This design ensures that the scheduler remains eventually consistent with the database without requiring complex distributed locking or event-driven synchronization for every trigger update.
The reconciliation logic is primarily handled by two components: the ScheduleSyncer, which manages the periodic polling loop, and the GoCronScheduler, which manages the lifecycle of individual cron jobs.
Reconciliation Loop
The ScheduleSyncer (defined in runs/scheduler/core/schedule_syncer.go) acts as the control loop for the scheduling system. It runs a ticker based on the resyncInterval configuration (defaulting to 30 seconds) and performs a synchronization step on every tick.
During each sync, the ScheduleSyncer calls ListActiveScheduleTriggers, which queries the database for all triggers that are active and have an automation type of TYPE_SCHEDULE. To ensure performance, this query is limited to 10,000 triggers, reflecting a design assumption that in-memory cron management is efficient enough for this scale.
func ListActiveScheduleTriggers(ctx context.Context, repo interfaces.TriggerRepo) ([]*models.Trigger, error) {
activeFilter := impl.NewEqualFilter("active", true)
scheduleFilter := impl.NewEqualFilter("automation_type", "TYPE_SCHEDULE")
filter := activeFilter.And(scheduleFilter)
return repo.ListTriggers(ctx, interfaces.ListResourceInput{
Limit: 10000, // fetch all; cron jobs are cheap in-memory
Filter: filter,
})
}
State Reconciliation Logic
The GoCronScheduler (defined in runs/scheduler/core/scheduler.go) maintains a map of active jobs, keyed by a unique TriggerKey. The UpdateSchedules method compares the list of triggers fetched from the database with the current in-memory state to perform three actions:
- Removal: Any job currently in memory that is not present in the "desired" list from the database is removed from the underlying cron engine.
- Update: If a trigger exists in both the database and memory, the scheduler checks its
LatestRevision. If the revision has changed, the existing job is removed and re-added with the new specification. - Addition: New triggers are parsed and added as new jobs to the cron engine.
The TriggerKey (defined in runs/scheduler/core/job.go) ensures that jobs are uniquely identified across projects and domains:
func TriggerKey(t *models.Trigger) string {
return t.Project + "/" + t.Domain + "/" + t.TaskName + "/" + t.Name
}
Bootstrap and Catch-up Execution
When the Flyte scheduler worker starts, it performs a two-phase initialization in runs/scheduler/start.go:
- Bootstrap: It loads all active triggers and populates the
GoCronScheduler. - Catch-up: It calls
CatchupAllto identify and execute any runs that were missed while the scheduler was offline.
The catch-up logic uses GetCatchUpTimes (in runs/scheduler/core/schedule_time.go) to calculate missed intervals between the trigger's last execution time (or activation time) and the current time. To prevent overwhelming the system after a long period of downtime, the number of catch-up runs is capped by maxCatchupRunsPerLoop (defaulting to 100).
func (s *GoCronScheduler) CatchupAll(
ctx context.Context,
triggers []*models.Trigger,
now time.Time,
maxRunsPerLoop int,
) {
fired := 0
for _, t := range triggers {
if fired >= maxRunsPerLoop {
break
}
times, err := GetCatchUpTimes(t, now)
// ... execution logic ...
}
}
Schedule Parsing and Timing
Flyte supports both standard Cron expressions and Fixed-rate schedules. The ParseSchedule function in runs/scheduler/core/schedule_time.go handles the conversion of these specifications into cron.Schedule objects.
A critical aspect of the reconciliation is determining the StartTime for a job. For fixed-rate schedules, Flyte calculates a start time that is advanced past the last execution to avoid duplicate runs for the same time slot. The startTimeFallback ensures that re-activating a trigger does not trigger catch-up runs for the period it was inactive by using the UpdatedAt timestamp as a baseline.
Execution Decoupling
The timing logic is decoupled from the actual execution of the task via the Executor interface. The GoCronJob implements the cron.TimedJob interface from the robfig/cron library, allowing it to receive the exact scheduled time from the cron engine.
type GoCronJob struct {
trigger *models.Trigger
executor Executor
ctx context.Context
entryID cron.EntryID
}
func (j *GoCronJob) Run(scheduledAt time.Time) {
if err := j.executor.Execute(j.ctx, j.trigger, scheduledAt.UTC()); err != nil {
// log error
}
}
This separation allows the scheduler to focus on timing and reconciliation while the TriggerExecutor (typically implemented in runs/scheduler/executor/executor.go) handles the complexities of calling the Flyte API to create new runs, including rate limiting and retries.