Managing Task Definitions
Flyte manages task definitions through a combination of versioned task records and content-addressed task specifications. Tasks are identified by a unique combination of project, domain, name, and version, while their underlying specifications can be deduplicated across different versions or runs using cryptographic digests.
Deploying a Task
To deploy a new task or update an existing version, use the taskService.DeployTask method. This operation is atomic; it upserts the task record and manages its associated triggers in a single database transaction.
// Example of deploying a task using taskService
func deployMyTask(ctx context.Context, service taskconnect.TaskServiceHandler) error {
req := &task.DeployTaskRequest{
TaskId: &task.TaskIdentifier{
Project: "flytesnacks",
Domain: "development",
Name: "core.control_flow.run_merge_sort",
Version: "v1",
},
Spec: &task.TaskSpec{
Environment: &task.Environment{
Name: "production",
Description: "Production environment for merge sort",
},
Documentation: &task.TaskDocumentation{
ShortDescription: "Sorts a list using merge sort algorithm.",
},
},
Triggers: []*task.TaskTrigger{
{
Name: "daily-trigger",
Spec: &task.TaskTriggerSpec{
Active: true,
},
AutomationSpec: &task.AutomationSpec{
// Automation details like cron expressions
},
},
},
}
_, err := service.DeployTask(ctx, connect.NewRequest(req))
return err
}
When DeployTask is called, Flyte performs several internal steps:
- Validation: It verifies that the project exists using the
projectClient. - Metadata Extraction: It extracts the
FunctionNamefrom the task name (e.g., stripping the environment prefix) usingtransformers.ExtractFunctionName. - Truncation: Descriptions that exceed internal limits (like
maxShortDescriptionLength) are automatically truncated. - Atomic Upsert: The
TaskRepo.CreateTaskmethod executes anINSERT ... ON CONFLICT DO UPDATEto ensure the task version is created or updated safely.
Managing Task Specifications
Flyte uses TaskSpec to store the actual logic and configuration of an action. These are stored in a content-addressable manner in the task_specs table, allowing multiple task versions or runs to reference the same specification blob via a digest.
Creating a Task Specification
You can persist a specification independently of a task deployment, which is useful for deduplication during run creation.
func saveTaskSpec(ctx context.Context, repo interfaces.TaskRepo, spec []byte, digest string) error {
taskSpec := &models.TaskSpec{
Digest: digest,
Spec: spec,
}
// This uses ON CONFLICT (digest) DO NOTHING to ensure deduplication
return repo.CreateTaskSpec(ctx, taskSpec)
}
Versioning and Listing Tasks
Flyte maintains a full history of task versions. You can retrieve specific versions or list the latest version of all tasks in a project.
Listing the Latest Versions
The ListTasks method in TaskRepo uses a Common Table Expression (CTE) with ROW_NUMBER() to return only the most recent version for each unique task name.
func listLatestTasks(ctx context.Context, service taskconnect.TaskServiceHandler, project string) {
req := &task.ListTasksRequest{
ScopeBy: &task.ListTasksRequest_ProjectId{
ProjectId: project,
},
}
resp, _ := service.ListTasks(ctx, connect.NewRequest(req))
for _, t := range resp.Msg.Tasks {
fmt.Printf("Task: %s, Latest Version: %s\n", t.TaskId.Name, t.TaskId.Version)
}
}
Listing All Versions of a Task
To see the history of a specific task, use ListVersions.
func listTaskHistory(ctx context.Context, service taskconnect.TaskServiceHandler, taskName string) {
req := &task.ListVersionsRequest{
TaskName: taskName,
}
resp, _ := service.ListVersions(ctx, connect.NewRequest(req))
for _, v := range resp.Msg.Versions {
fmt.Printf("Version: %s, Deployed At: %s\n", v.Version, v.DeployedAt.AsTime())
}
}
Handling Triggers during Deployment
When a task is deployed with triggers, Flyte manages the trigger lifecycle alongside the task. The TaskRepo.CreateTask implementation handles:
- Pruning: Triggers that were previously associated with the task but are missing from the new deployment are soft-deleted.
- Versioning: New revisions of triggers are created if their specifications have changed.
- Metadata Refresh: The task record is updated with summary statistics (e.g.,
total_triggers,active_triggers) after the triggers are processed.
This logic is encapsulated within runs/repository/impl/task.go in the CreateTask method, ensuring that the task's trigger metadata always reflects the current state of its active triggers.