Trigger Service API Reference
The Trigger Service in Flyte provides a Connect-RPC interface for managing the lifecycle of task triggers. It acts as the primary gateway for deploying, updating, and tracking the history of automations that initiate task executions based on schedules or other events.
Deploying and Updating Triggers
When you need to automate a task, you use the DeployTrigger method. This method handles both the initial creation of a trigger and subsequent updates. If you attempt to deploy a trigger with a cron schedule that has an invalid syntax, the service returns a CodeInvalidArgument error.
Usage Example
To deploy a trigger, you send a DeployTriggerRequest containing the trigger name, the task it should execute, and the automation specification (such as a cron schedule).
// Example of a client call to DeployTrigger
req := &triggerpb.DeployTriggerRequest{
Name: &commonpb.TriggerName{
Project: "flytesnacks",
Domain: "development",
TaskName: "my_task",
Name: "daily-trigger",
},
Spec: &taskpb.TriggerSpec{
// Task execution details
},
AutomationSpec: &taskpb.TriggerAutomationSpec{
Action: &taskpb.TriggerAutomationSpec_Schedule{
Schedule: &taskpb.Schedule{
Cron: &taskpb.CronSchedule{
Expression: "0 0 * * *", // Daily at midnight
Timezone: "UTC",
},
},
},
},
}
resp, err := triggerClient.DeployTrigger(ctx, connect.NewRequest(req))
Internal Validation and Persistence
Internally, the triggerService (located in runs/service/trigger_service.go) performs several steps:
- Validation: It calls
request.Validate()and then runs a specificvalidateCronExpressioncheck. This check uses thegithub.com/robfig/cron/v3library and supports theCRON_TZprefix for timezone-aware schedules. - Transformation: It converts the protobuf request into a database model using
transformers.NewTriggerModel. - Optimistic Locking: The
DeployTriggermethod accepts arevisionin the request. This revision is passed tos.db.TriggerRepo().SaveTrigger, which uses it to ensure that updates do not overwrite changes made by other concurrent requests.
// From runs/service/trigger_service.go
func (s *triggerService) DeployTrigger(
ctx context.Context,
req *connect.Request[triggerpb.DeployTriggerRequest],
) (*connect.Response[triggerpb.DeployTriggerResponse], error) {
request := req.Msg
if err := request.Validate(); err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
if err := validateCronExpression(request.GetName().GetName(), request.GetAutomationSpec()); err != nil {
return nil, err
}
// ... transformation and saving logic ...
saved, err := s.db.TriggerRepo().SaveTrigger(ctx, triggerModel, request.GetRevision())
// ...
}
Managing Trigger Lifecycle
The service provides standard CRUD operations to manage triggers once they are deployed.
Retrieving and Listing Triggers
- GetTriggerDetails: Fetches the current state and configuration of a specific trigger. If the trigger does not exist, it returns a
CodeNotFounderror by checking forsql.ErrNoRowsfrom the repository. - ListTriggers: Allows you to query triggers across different scopes. You can filter by
ProjectId,TaskId, orTaskName. The service usesimpl.NewListResourceInputFromPrototo handle pagination and filtering logic consistently with other Flyte services.
Bulk Operations
The service supports bulk updates and deletions through UpdateTriggers and DeleteTriggers. These methods take a list of TriggerName identifiers and apply the requested state change (e.g., activating/deactivating) or removal in the database via s.db.TriggerRepo().
Trigger Revisions and History
Flyte tracks every change made to a trigger's configuration. This allows you to audit changes or revert to previous versions of an automation.
- GetTriggerRevisionHistory: Returns a paginated list of all revisions for a specific trigger.
- GetTriggerRevisionDetails: Retrieves the full specification of a specific revision of a trigger.
The triggerService uses transformers.TriggerRevisionModelToTriggerDetails to map historical database records back into the standard trigger detail format used by the API.
Scheduler Integration
The Trigger Service is the management plane, while the trigger-scheduler worker (defined in runs/scheduler) is the execution plane.
In runs/setup.go, the scheduler is initialized if runs.triggerScheduler.enabled is set to true. The scheduler periodically polls the database (using the same TriggerRepo used by the service) to identify active triggers that need to be executed based on their cron expressions.
// From runs/setup.go
if cfg.TriggerScheduler.Enabled {
// ...
worker := scheduler.Start(ctx, repo.TriggerRepo(), cfg.TriggerScheduler, runsURL, connect.WithInterceptors(otelInterceptor))
sc.AddWorker("trigger-scheduler", worker)
}
When you update a trigger's state to inactive via UpdateTriggers, the scheduler will stop executing it during its next resync interval (configured by runs.triggerScheduler.resyncInterval, which defaults to 30 seconds).