Skip to main content

Managing Task Definitions

Flyte manages task definitions through a combination of versioned task records and content-addressed task specifications. Tasks are identified by a unique combination of project, domain, name, and version, while their underlying specifications can be deduplicated across different versions or runs using cryptographic digests.

Deploying a Task

To deploy a new task or update an existing version, use the taskService.DeployTask method. This operation is atomic; it upserts the task record and manages its associated triggers in a single database transaction.

// Example of deploying a task using taskService
func deployMyTask(ctx context.Context, service taskconnect.TaskServiceHandler) error {
req := &task.DeployTaskRequest{
TaskId: &task.TaskIdentifier{
Project: "flytesnacks",
Domain: "development",
Name: "core.control_flow.run_merge_sort",
Version: "v1",
},
Spec: &task.TaskSpec{
Environment: &task.Environment{
Name: "production",
Description: "Production environment for merge sort",
},
Documentation: &task.TaskDocumentation{
ShortDescription: "Sorts a list using merge sort algorithm.",
},
},
Triggers: []*task.TaskTrigger{
{
Name: "daily-trigger",
Spec: &task.TaskTriggerSpec{
Active: true,
},
AutomationSpec: &task.AutomationSpec{
// Automation details like cron expressions
},
},
},
}

_, err := service.DeployTask(ctx, connect.NewRequest(req))
return err
}

When DeployTask is called, Flyte performs several internal steps:

  1. Validation: It verifies that the project exists using the projectClient.
  2. Metadata Extraction: It extracts the FunctionName from the task name (e.g., stripping the environment prefix) using transformers.ExtractFunctionName.
  3. Truncation: Descriptions that exceed internal limits (like maxShortDescriptionLength) are automatically truncated.
  4. Atomic Upsert: The TaskRepo.CreateTask method executes an INSERT ... ON CONFLICT DO UPDATE to ensure the task version is created or updated safely.

Managing Task Specifications

Flyte uses TaskSpec to store the actual logic and configuration of an action. These are stored in a content-addressable manner in the task_specs table, allowing multiple task versions or runs to reference the same specification blob via a digest.

Creating a Task Specification

You can persist a specification independently of a task deployment, which is useful for deduplication during run creation.

func saveTaskSpec(ctx context.Context, repo interfaces.TaskRepo, spec []byte, digest string) error {
taskSpec := &models.TaskSpec{
Digest: digest,
Spec: spec,
}

// This uses ON CONFLICT (digest) DO NOTHING to ensure deduplication
return repo.CreateTaskSpec(ctx, taskSpec)
}

Versioning and Listing Tasks

Flyte maintains a full history of task versions. You can retrieve specific versions or list the latest version of all tasks in a project.

Listing the Latest Versions

The ListTasks method in TaskRepo uses a Common Table Expression (CTE) with ROW_NUMBER() to return only the most recent version for each unique task name.

func listLatestTasks(ctx context.Context, service taskconnect.TaskServiceHandler, project string) {
req := &task.ListTasksRequest{
ScopeBy: &task.ListTasksRequest_ProjectId{
ProjectId: project,
},
}

resp, _ := service.ListTasks(ctx, connect.NewRequest(req))
for _, t := range resp.Msg.Tasks {
fmt.Printf("Task: %s, Latest Version: %s\n", t.TaskId.Name, t.TaskId.Version)
}
}

Listing All Versions of a Task

To see the history of a specific task, use ListVersions.

func listTaskHistory(ctx context.Context, service taskconnect.TaskServiceHandler, taskName string) {
req := &task.ListVersionsRequest{
TaskName: taskName,
}

resp, _ := service.ListVersions(ctx, connect.NewRequest(req))
for _, v := range resp.Msg.Versions {
fmt.Printf("Version: %s, Deployed At: %s\n", v.Version, v.DeployedAt.AsTime())
}
}

Handling Triggers during Deployment

When a task is deployed with triggers, Flyte manages the trigger lifecycle alongside the task. The TaskRepo.CreateTask implementation handles:

  • Pruning: Triggers that were previously associated with the task but are missing from the new deployment are soft-deleted.
  • Versioning: New revisions of triggers are created if their specifications have changed.
  • Metadata Refresh: The task record is updated with summary statistics (e.g., total_triggers, active_triggers) after the triggers are processed.

This logic is encapsulated within runs/repository/impl/task.go in the CreateTask method, ensuring that the task's trigger metadata always reflects the current state of its active triggers.