Skip to main content

Trigger Service API Reference

The Trigger Service in Flyte provides a Connect-RPC interface for managing the lifecycle of task triggers. It acts as the primary gateway for deploying, updating, and tracking the history of automations that initiate task executions based on schedules or other events.

Deploying and Updating Triggers

When you need to automate a task, you use the DeployTrigger method. This method handles both the initial creation of a trigger and subsequent updates. If you attempt to deploy a trigger with a cron schedule that has an invalid syntax, the service returns a CodeInvalidArgument error.

Usage Example

To deploy a trigger, you send a DeployTriggerRequest containing the trigger name, the task it should execute, and the automation specification (such as a cron schedule).

// Example of a client call to DeployTrigger
req := &triggerpb.DeployTriggerRequest{
Name: &commonpb.TriggerName{
Project: "flytesnacks",
Domain: "development",
TaskName: "my_task",
Name: "daily-trigger",
},
Spec: &taskpb.TriggerSpec{
// Task execution details
},
AutomationSpec: &taskpb.TriggerAutomationSpec{
Action: &taskpb.TriggerAutomationSpec_Schedule{
Schedule: &taskpb.Schedule{
Cron: &taskpb.CronSchedule{
Expression: "0 0 * * *", // Daily at midnight
Timezone: "UTC",
},
},
},
},
}
resp, err := triggerClient.DeployTrigger(ctx, connect.NewRequest(req))

Internal Validation and Persistence

Internally, the triggerService (located in runs/service/trigger_service.go) performs several steps:

  1. Validation: It calls request.Validate() and then runs a specific validateCronExpression check. This check uses the github.com/robfig/cron/v3 library and supports the CRON_TZ prefix for timezone-aware schedules.
  2. Transformation: It converts the protobuf request into a database model using transformers.NewTriggerModel.
  3. Optimistic Locking: The DeployTrigger method accepts a revision in the request. This revision is passed to s.db.TriggerRepo().SaveTrigger, which uses it to ensure that updates do not overwrite changes made by other concurrent requests.
// From runs/service/trigger_service.go
func (s *triggerService) DeployTrigger(
ctx context.Context,
req *connect.Request[triggerpb.DeployTriggerRequest],
) (*connect.Response[triggerpb.DeployTriggerResponse], error) {
request := req.Msg

if err := request.Validate(); err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
if err := validateCronExpression(request.GetName().GetName(), request.GetAutomationSpec()); err != nil {
return nil, err
}
// ... transformation and saving logic ...
saved, err := s.db.TriggerRepo().SaveTrigger(ctx, triggerModel, request.GetRevision())
// ...
}

Managing Trigger Lifecycle

The service provides standard CRUD operations to manage triggers once they are deployed.

Retrieving and Listing Triggers

  • GetTriggerDetails: Fetches the current state and configuration of a specific trigger. If the trigger does not exist, it returns a CodeNotFound error by checking for sql.ErrNoRows from the repository.
  • ListTriggers: Allows you to query triggers across different scopes. You can filter by ProjectId, TaskId, or TaskName. The service uses impl.NewListResourceInputFromProto to handle pagination and filtering logic consistently with other Flyte services.

Bulk Operations

The service supports bulk updates and deletions through UpdateTriggers and DeleteTriggers. These methods take a list of TriggerName identifiers and apply the requested state change (e.g., activating/deactivating) or removal in the database via s.db.TriggerRepo().

Trigger Revisions and History

Flyte tracks every change made to a trigger's configuration. This allows you to audit changes or revert to previous versions of an automation.

  • GetTriggerRevisionHistory: Returns a paginated list of all revisions for a specific trigger.
  • GetTriggerRevisionDetails: Retrieves the full specification of a specific revision of a trigger.

The triggerService uses transformers.TriggerRevisionModelToTriggerDetails to map historical database records back into the standard trigger detail format used by the API.

Scheduler Integration

The Trigger Service is the management plane, while the trigger-scheduler worker (defined in runs/scheduler) is the execution plane.

In runs/setup.go, the scheduler is initialized if runs.triggerScheduler.enabled is set to true. The scheduler periodically polls the database (using the same TriggerRepo used by the service) to identify active triggers that need to be executed based on their cron expressions.

// From runs/setup.go
if cfg.TriggerScheduler.Enabled {
// ...
worker := scheduler.Start(ctx, repo.TriggerRepo(), cfg.TriggerScheduler, runsURL, connect.WithInterceptors(otelInterceptor))
sc.AddWorker("trigger-scheduler", worker)
}

When you update a trigger's state to inactive via UpdateTriggers, the scheduler will stop executing it during its next resync interval (configured by runs.triggerScheduler.resyncInterval, which defaults to 30 seconds).