Creating Your First Scheduled Trigger
Flyte uses a robust scheduling system to automate task execution based on time-based triggers. This system is built around three core components: the Scheduler, which manages the lifecycle of jobs; the Job, which wraps a trigger and its schedule; and the Executor, which defines the logic to run when a schedule fires.
In this tutorial, you will implement a custom executor and define a scheduled trigger to automate your logic.
Prerequisites
To follow this tutorial, you need the following packages from the Flyte codebase:
github.com/flyteorg/flyte/v2/runs/scheduler/core: For the scheduler and executor interfaces.github.com/flyteorg/flyte/v2/runs/repository/models: For the trigger data model.github.com/flyteorg/flyte/v2/gen/go/flyteidl2/task: For defining automation specs.
Step 1: Implement the Executor Interface
The Executor interface in runs/scheduler/core/job.go is the primary extension point for custom scheduling logic. It requires a single Execute method that Flyte calls every time a schedule fires.
Create a simple executor that logs the scheduled time:
package main
import (
"context"
"fmt"
"time"
"github.com/flyteorg/flyte/v2/runs/repository/models"
"github.com/flyteorg/flyte/v2/runs/scheduler/core"
)
// LoggingExecutor implements the core.Executor interface
type LoggingExecutor struct{}
func (e *LoggingExecutor) Execute(ctx context.Context, t *models.Trigger, scheduledAt time.Time) error {
fmt.Printf("Firing trigger %s/%s/%s scheduled for %s\n",
t.Project, t.Domain, t.Name, scheduledAt.Format(time.RFC3339))
return nil
}
In a production environment, you would typically use the TriggerExecutor found in runs/scheduler/executor/trigger_executor.go, which calls the Flyte RunServiceClient to create actual workflow runs.
Step 2: Define a Scheduled Trigger
Flyte triggers use a TriggerAutomationSpec to define their schedule. This spec can handle both standard cron expressions and fixed-rate intervals (e.g., every 5 minutes).
The following code defines a trigger that runs every minute using a cron expression:
import (
"google.golang.org/protobuf/proto"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/task"
"github.com/flyteorg/flyte/v2/runs/repository/models"
)
func createCronTrigger() *models.Trigger {
// Define the automation spec with a cron schedule
spec := &task.TriggerAutomationSpec{
Type: task.TriggerAutomationSpecType_TYPE_SCHEDULE,
Automation: &task.TriggerAutomationSpec_Schedule{
Schedule: &task.Schedule{
Schedule: &task.Schedule_Cron{
Cron: &task.CronSchedule{
Expression: "* * * * *", // Every minute
},
},
},
},
}
// Serialize the spec for the database model
specBytes, _ := proto.Marshal(spec)
return &models.Trigger{
Project: "flytesnacks",
Domain: "development",
Name: "my-hourly-trigger",
AutomationSpec: specBytes,
Active: true,
UpdatedAt: time.Now(),
LatestRevision: 1,
}
}
Step 3: Initialize and Start the Scheduler
The GoCronScheduler in runs/scheduler/core/scheduler.go manages the mapping between your Trigger models and the underlying robfig/cron engine.
func main() {
ctx := context.Background()
executor := &LoggingExecutor{}
// 1. Initialize the scheduler with your executor
scheduler := core.NewGoCronScheduler(executor)
// 2. Define your triggers
trigger := createCronTrigger()
activeTriggers := []*models.Trigger{trigger}
// 3. Sync the triggers with the scheduler
// This parses the AutomationSpec and creates GoCronJobs internally
scheduler.UpdateSchedules(ctx, activeTriggers)
// 4. Start the cron engine (non-blocking)
scheduler.Start()
fmt.Println("Scheduler started...")
select {} // Keep the process alive
}
When UpdateSchedules is called, the scheduler uses ParseSchedule (from runs/scheduler/core/schedule_time.go) to extract the cron expression and wraps the trigger in a GoCronJob.
Step 4: Handling Missed Runs (Catchup)
If your service is down, you may miss scheduled intervals. The GoCronScheduler provides a CatchupAll method to fire these missed runs. It calculates the difference between the current time and the trigger's TriggeredAt or UpdatedAt timestamps.
// Fire up to 10 missed runs for the active triggers
scheduler.CatchupAll(ctx, activeTriggers, time.Now(), 10)
Best Practice: Idempotency
When implementing a real Executor, ensure your logic is idempotent. The Flyte TriggerExecutor achieves this by generating deterministic run names based on the trigger identity and the scheduledAt time:
// Example of deterministic naming logic used in Flyte
func runName(t *models.Trigger, scheduledAt time.Time) string {
return fmt.Sprintf("sched-%s-%d", t.Name, scheduledAt.Unix())
}
By using a deterministic name, if the scheduler attempts to fire the same interval twice (e.g., during a catchup loop and a normal cron fire), the underlying Flyte service will return a AlreadyExists error instead of creating a duplicate run.