Workflow & Run Management
Flyte manages the lifecycle of workflows through a hierarchy of Projects, Tasks, and Runs. The system handles everything from the initial deployment of task definitions to the idempotent execution of scheduled triggers and the offloading of large input data to cloud storage.
Project and Domain Management
Before you can deploy tasks or create runs, you must organize them within projects and domains. The ProjectService in runs/service/project_service.go manages these top-level entities.
Projects act as the primary isolation boundary, while domains (e.g., development, staging, production) allow you to separate environments within a project. You can manage these via the CreateProject and UpdateProject methods:
// Example of creating a project via ProjectService
resp, err := projectClient.CreateProject(ctx, connect.NewRequest(&project.CreateProjectRequest{
Project: &project.Project{
Id: "my-project",
Name: "My Flyte Project",
Domains: []*project.Domain{
{Id: "development", Name: "Development"},
},
},
}))
Task Deployment and Triggers
Tasks are the fundamental units of execution in Flyte. The taskService (found in runs/service/task_service.go) handles the registration of task definitions and their associated triggers.
When you call DeployTask, Flyte performs several critical steps in a single database transaction:
- Validation: It validates the task specification and any cron expressions in the triggers using
validateCronExpression. - Model Transformation: It converts the request into internal models using
transformers.NewTaskModel. - Persistence: It upserts the task definition and inserts new trigger revisions into the repository via
s.db.TaskRepo().CreateTask.
// Deploying a task with a scheduled trigger
resp, err := taskClient.DeployTask(ctx, connect.NewRequest(&task.DeployTaskRequest{
TaskId: &task.TaskIdentifier{
Project: "p1", Domain: "d1", Name: "t1", Version: "v1",
},
Spec: &task.TaskSpec{ ... },
Triggers: []*task.TaskTrigger{
{
Name: "daily-trigger",
AutomationSpec: &trigger.AutomationSpec{
Action: &trigger.AutomationSpec_Schedule{
Schedule: &trigger.Schedule{
CronExpression: "0 0 * * *", // Daily at midnight
},
},
},
},
},
}))
Run Lifecycle Management
The RunService in runs/service/run_service.go is responsible for the execution lifecycle, from creation to abortion.
Creating a Run
When you invoke CreateRun, Flyte does not immediately execute the task. Instead, it prepares the execution environment:
- Identity Resolution: If you only provide a
ProjectId, Flyte generates a unique run name usinggenerateRunName(time.Now().UnixNano()). - Input Offloading: To handle large datasets, Flyte offloads inputs to storage (e.g., S3 or GCS). The
RunServiceusess.dataStore.WriteProtobufto save theinputs.pbfile to a path constructed bybuildInputPrefix. - Persistence: The run is saved to the database with an initial phase of
ACTION_PHASE_QUEUED. - Enqueuing: The root action (identified as
a0) is enqueued to the Actions service vias.actionsClient.Enqueue.
Aborting a Run
If a run needs to be stopped, AbortRun marks the root action a0 as ABORTED in the database. It then hands off the cleanup to the AbortReconciler:
// From runs/service/run_service.go:473
// Mark only the root action ABORTED in DB, then push it to the reconciler.
// The reconciler deletes "a0"'s CRD; K8s cascades deletion to all child CRDs.
if err := s.repo.ActionRepo().AbortRun(ctx, req.Msg.RunId, reason, nil); err != nil {
return nil, err
}
if s.abortReconciler != nil {
s.abortReconciler.Push(ctx, &common.ActionIdentifier{Run: req.Msg.RunId, Name: "a0"}, reason)
}
Scheduled Executions and Idempotency
Flyte uses a cron-based scheduler (GoCronScheduler) to monitor active triggers and initiate runs. To prevent duplicate executions (e.g., after a scheduler restart), the TriggerExecutor in runs/scheduler/executor/trigger_executor.go ensures idempotency by generating deterministic run names.
The run name is derived from a hex hash of the trigger details and the scheduled time:
// From runs/scheduler/executor/trigger_executor.go
func (e *TriggerExecutor) Execute(ctx context.Context, t *models.Trigger, scheduledAt time.Time) error {
runName := runName(t, scheduledAt) // Deterministic name based on trigger and time
createReq := &workflow.CreateRunRequest{
Id: &workflow.CreateRunRequest_RunId{
RunId: &common.RunIdentifier{Project: t.Project, Domain: t.Domain, Name: runName},
},
// ...
Source: workflow.RunSource_RUN_SOURCE_SCHEDULE_TRIGGER,
RunStartTime: timestamppb.New(scheduledAt),
}
_, err := e.runClient.CreateRun(ctx, connect.NewRequest(createReq))
return err
}
SDK Compatibility
The RunService includes logic to handle different SDK versions. For example, the run_start_time metadata is only stamped for tasks created with SDK versions >= 2.3.6, as older versions do not support the {{.runStartTime}} placeholder in templates. This check is performed by meetsRunStartTimeSDKVersion(taskSpec) during run creation.