Automating Workflows with Triggers
Flyte automates task execution through a versioned trigger system that supports schedules and external events. By decoupling the trigger definition from the task itself, Flyte allows you to manage automation lifecycles independently, providing a full audit trail of every schedule change or activation state.
Managing Triggers via the Service API
When you need to automate a task, you interact with the triggerService (found in runs/service/trigger_service.go). This service handles the deployment, retrieval, and lifecycle management of triggers.
Deploying a Trigger
To create or update a trigger, you call DeployTrigger. This method validates the automation specification—such as ensuring a cron expression is valid—and persists the state.
// Example of how triggerService validates and saves a trigger
func (s *triggerService) DeployTrigger(
ctx context.Context,
req *connect.Request[triggerpb.DeployTriggerRequest],
) (*connect.Response[triggerpb.DeployTriggerResponse], error) {
request := req.Msg
// Validates the cron expression, including timezone support
if err := validateCronExpression(request.GetName().GetName(), request.GetAutomationSpec()); err != nil {
return nil, err
}
// Transforms the proto request into a database model
triggerModel, err := transformers.NewTriggerModel(ctx, id, request.GetSpec(), request.GetAutomationSpec())
// Persists the trigger with optimistic locking using the provided revision
saved, err := s.db.TriggerRepo().SaveTrigger(ctx, triggerModel, request.GetRevision())
// ... returns the saved trigger details
}
Flyte supports timezones in cron expressions by prepending CRON_TZ to the expression before validation and execution. If you provide a timezone in the Schedule proto, validateCronExpression ensures it is compatible with the robfig/cron parser:
if tz := cronSchedule.GetTimezone(); tz != "" {
expr = fmt.Sprintf("CRON_TZ=%s %s", tz, expr)
}
The Versioning Model
Flyte implements an "Action and ActionEvent" pattern for triggers to ensure that every change is recorded. This is managed by the TriggerRepo (defined in runs/repository/interfaces/trigger.go) across two primary models:
Trigger: Represents the current, mutable state of a trigger. It contains theLatestRevisioncounter and denormalized fields likeActiveandAutomationTypefor efficient querying.TriggerRevision: An immutable snapshot created every time a trigger is deployed, activated, deactivated, or deleted.
Optimistic Locking
To prevent concurrent updates from overwriting each other, the SaveTrigger method requires an expectedRevision. If the LatestRevision in the database does not match the expectedRevision provided by the caller, the update fails. This ensures that if two users attempt to update the same schedule simultaneously, only one succeeds.
Soft Deletion
Triggers in Flyte are never hard-deleted from the database. The DeleteTriggers method sets a DeletedAt timestamp and appends a new TriggerRevision with the action TRIGGER_REVISION_ACTION_DELETE. Standard Get and List operations filter out these soft-deleted rows.
The Trigger Scheduler
The GoCronScheduler (located in runs/scheduler/core/scheduler.go) is the background worker responsible for executing scheduled tasks. It periodically reconciles the state of the database with its internal in-memory cron registry.
Reconciliation Logic
The scheduler uses the UpdateSchedules method to sync active triggers. It identifies changes by comparing the LatestRevision of the trigger in the database against the revision of the job currently running in memory.
func (s *GoCronScheduler) UpdateSchedules(ctx context.Context, triggers []*models.Trigger) {
// ... (parsing and filtering logic)
for key, t := range desired {
if existing, exists := s.jobs[key]; exists {
// If the revision matches, the spec hasn't changed; skip update.
if existing.trigger.LatestRevision == t.LatestRevision {
continue
}
// Spec changed — remove old job before re-adding.
s.cron.Remove(existing.entryID)
delete(s.jobs, key)
}
// Register the new or updated job
job := NewGoCronJob(ctx, t, s.executor)
job.entryID = s.cron.ScheduleTimedJob(sched, job, startTime)
s.jobs[key] = job
}
}
Catchup Runs
When a trigger is activated or the scheduler restarts, Flyte determines the next execution time based on the TriggeredAt and UpdatedAt timestamps. This prevents the scheduler from firing "catchup" runs for the entire period a trigger was inactive, instead focusing on the next valid window from the point of activation.
Configuration
The behavior of the automation engine is controlled via the runs.triggerScheduler configuration block.
| Setting | Default | Description |
|---|---|---|
enabled | true | Whether the trigger scheduler worker should run. |
resyncInterval | 30s | How often the worker polls the TriggerRepo for active triggers. |
maxCatchupRunsPerLoop | 100 | Limits the number of missed runs the scheduler will attempt to trigger in a single sync cycle. |
executionQps | 10.0 | The rate limit (requests per second) for creating new runs. |
executionBurst | 20 | The burst capacity for the run creation rate limiter. |