Skip to main content

Workflow & Run Management

Flyte manages the lifecycle of workflows through a hierarchy of Projects, Tasks, and Runs. The system handles everything from the initial deployment of task definitions to the idempotent execution of scheduled triggers and the offloading of large input data to cloud storage.

Project and Domain Management

Before you can deploy tasks or create runs, you must organize them within projects and domains. The ProjectService in runs/service/project_service.go manages these top-level entities.

Projects act as the primary isolation boundary, while domains (e.g., development, staging, production) allow you to separate environments within a project. You can manage these via the CreateProject and UpdateProject methods:

// Example of creating a project via ProjectService
resp, err := projectClient.CreateProject(ctx, connect.NewRequest(&project.CreateProjectRequest{
Project: &project.Project{
Id: "my-project",
Name: "My Flyte Project",
Domains: []*project.Domain{
{Id: "development", Name: "Development"},
},
},
}))

Task Deployment and Triggers

Tasks are the fundamental units of execution in Flyte. The taskService (found in runs/service/task_service.go) handles the registration of task definitions and their associated triggers.

When you call DeployTask, Flyte performs several critical steps in a single database transaction:

  1. Validation: It validates the task specification and any cron expressions in the triggers using validateCronExpression.
  2. Model Transformation: It converts the request into internal models using transformers.NewTaskModel.
  3. Persistence: It upserts the task definition and inserts new trigger revisions into the repository via s.db.TaskRepo().CreateTask.
// Deploying a task with a scheduled trigger
resp, err := taskClient.DeployTask(ctx, connect.NewRequest(&task.DeployTaskRequest{
TaskId: &task.TaskIdentifier{
Project: "p1", Domain: "d1", Name: "t1", Version: "v1",
},
Spec: &task.TaskSpec{ ... },
Triggers: []*task.TaskTrigger{
{
Name: "daily-trigger",
AutomationSpec: &trigger.AutomationSpec{
Action: &trigger.AutomationSpec_Schedule{
Schedule: &trigger.Schedule{
CronExpression: "0 0 * * *", // Daily at midnight
},
},
},
},
},
}))

Run Lifecycle Management

The RunService in runs/service/run_service.go is responsible for the execution lifecycle, from creation to abortion.

Creating a Run

When you invoke CreateRun, Flyte does not immediately execute the task. Instead, it prepares the execution environment:

  1. Identity Resolution: If you only provide a ProjectId, Flyte generates a unique run name using generateRunName(time.Now().UnixNano()).
  2. Input Offloading: To handle large datasets, Flyte offloads inputs to storage (e.g., S3 or GCS). The RunService uses s.dataStore.WriteProtobuf to save the inputs.pb file to a path constructed by buildInputPrefix.
  3. Persistence: The run is saved to the database with an initial phase of ACTION_PHASE_QUEUED.
  4. Enqueuing: The root action (identified as a0) is enqueued to the Actions service via s.actionsClient.Enqueue.

Aborting a Run

If a run needs to be stopped, AbortRun marks the root action a0 as ABORTED in the database. It then hands off the cleanup to the AbortReconciler:

// From runs/service/run_service.go:473
// Mark only the root action ABORTED in DB, then push it to the reconciler.
// The reconciler deletes "a0"'s CRD; K8s cascades deletion to all child CRDs.
if err := s.repo.ActionRepo().AbortRun(ctx, req.Msg.RunId, reason, nil); err != nil {
return nil, err
}

if s.abortReconciler != nil {
s.abortReconciler.Push(ctx, &common.ActionIdentifier{Run: req.Msg.RunId, Name: "a0"}, reason)
}

Scheduled Executions and Idempotency

Flyte uses a cron-based scheduler (GoCronScheduler) to monitor active triggers and initiate runs. To prevent duplicate executions (e.g., after a scheduler restart), the TriggerExecutor in runs/scheduler/executor/trigger_executor.go ensures idempotency by generating deterministic run names.

The run name is derived from a hex hash of the trigger details and the scheduled time:

// From runs/scheduler/executor/trigger_executor.go
func (e *TriggerExecutor) Execute(ctx context.Context, t *models.Trigger, scheduledAt time.Time) error {
runName := runName(t, scheduledAt) // Deterministic name based on trigger and time
createReq := &workflow.CreateRunRequest{
Id: &workflow.CreateRunRequest_RunId{
RunId: &common.RunIdentifier{Project: t.Project, Domain: t.Domain, Name: runName},
},
// ...
Source: workflow.RunSource_RUN_SOURCE_SCHEDULE_TRIGGER,
RunStartTime: timestamppb.New(scheduledAt),
}
_, err := e.runClient.CreateRun(ctx, connect.NewRequest(createReq))
return err
}

SDK Compatibility

The RunService includes logic to handle different SDK versions. For example, the run_start_time metadata is only stamped for tasks created with SDK versions >= 2.3.6, as older versions do not support the {{.runStartTime}} placeholder in templates. This check is performed by meetsRunStartTimeSDKVersion(taskSpec) during run creation.