Skip to main content

Automating Workflows with Triggers

Flyte automates task execution through a versioned trigger system that supports schedules and external events. By decoupling the trigger definition from the task itself, Flyte allows you to manage automation lifecycles independently, providing a full audit trail of every schedule change or activation state.

Managing Triggers via the Service API

When you need to automate a task, you interact with the triggerService (found in runs/service/trigger_service.go). This service handles the deployment, retrieval, and lifecycle management of triggers.

Deploying a Trigger

To create or update a trigger, you call DeployTrigger. This method validates the automation specification—such as ensuring a cron expression is valid—and persists the state.

// Example of how triggerService validates and saves a trigger
func (s *triggerService) DeployTrigger(
ctx context.Context,
req *connect.Request[triggerpb.DeployTriggerRequest],
) (*connect.Response[triggerpb.DeployTriggerResponse], error) {
request := req.Msg

// Validates the cron expression, including timezone support
if err := validateCronExpression(request.GetName().GetName(), request.GetAutomationSpec()); err != nil {
return nil, err
}

// Transforms the proto request into a database model
triggerModel, err := transformers.NewTriggerModel(ctx, id, request.GetSpec(), request.GetAutomationSpec())

// Persists the trigger with optimistic locking using the provided revision
saved, err := s.db.TriggerRepo().SaveTrigger(ctx, triggerModel, request.GetRevision())

// ... returns the saved trigger details
}

Flyte supports timezones in cron expressions by prepending CRON_TZ to the expression before validation and execution. If you provide a timezone in the Schedule proto, validateCronExpression ensures it is compatible with the robfig/cron parser:

if tz := cronSchedule.GetTimezone(); tz != "" {
expr = fmt.Sprintf("CRON_TZ=%s %s", tz, expr)
}

The Versioning Model

Flyte implements an "Action and ActionEvent" pattern for triggers to ensure that every change is recorded. This is managed by the TriggerRepo (defined in runs/repository/interfaces/trigger.go) across two primary models:

  1. Trigger: Represents the current, mutable state of a trigger. It contains the LatestRevision counter and denormalized fields like Active and AutomationType for efficient querying.
  2. TriggerRevision: An immutable snapshot created every time a trigger is deployed, activated, deactivated, or deleted.

Optimistic Locking

To prevent concurrent updates from overwriting each other, the SaveTrigger method requires an expectedRevision. If the LatestRevision in the database does not match the expectedRevision provided by the caller, the update fails. This ensures that if two users attempt to update the same schedule simultaneously, only one succeeds.

Soft Deletion

Triggers in Flyte are never hard-deleted from the database. The DeleteTriggers method sets a DeletedAt timestamp and appends a new TriggerRevision with the action TRIGGER_REVISION_ACTION_DELETE. Standard Get and List operations filter out these soft-deleted rows.

The Trigger Scheduler

The GoCronScheduler (located in runs/scheduler/core/scheduler.go) is the background worker responsible for executing scheduled tasks. It periodically reconciles the state of the database with its internal in-memory cron registry.

Reconciliation Logic

The scheduler uses the UpdateSchedules method to sync active triggers. It identifies changes by comparing the LatestRevision of the trigger in the database against the revision of the job currently running in memory.

func (s *GoCronScheduler) UpdateSchedules(ctx context.Context, triggers []*models.Trigger) {
// ... (parsing and filtering logic)

for key, t := range desired {
if existing, exists := s.jobs[key]; exists {
// If the revision matches, the spec hasn't changed; skip update.
if existing.trigger.LatestRevision == t.LatestRevision {
continue
}
// Spec changed — remove old job before re-adding.
s.cron.Remove(existing.entryID)
delete(s.jobs, key)
}

// Register the new or updated job
job := NewGoCronJob(ctx, t, s.executor)
job.entryID = s.cron.ScheduleTimedJob(sched, job, startTime)
s.jobs[key] = job
}
}

Catchup Runs

When a trigger is activated or the scheduler restarts, Flyte determines the next execution time based on the TriggeredAt and UpdatedAt timestamps. This prevents the scheduler from firing "catchup" runs for the entire period a trigger was inactive, instead focusing on the next valid window from the point of activation.

Configuration

The behavior of the automation engine is controlled via the runs.triggerScheduler configuration block.

SettingDefaultDescription
enabledtrueWhether the trigger scheduler worker should run.
resyncInterval30sHow often the worker polls the TriggerRepo for active triggers.
maxCatchupRunsPerLoop100Limits the number of missed runs the scheduler will attempt to trigger in a single sync cycle.
executionQps10.0The rate limit (requests per second) for creating new runs.
executionBurst20The burst capacity for the run creation rate limiter.