Skip to main content

Enqueuing and Managing Actions

When you need to trigger a specific task execution and track its progress through various states, Flyte uses the Actions Service to manage these units of work as Kubernetes Custom Resources. The service provides a gRPC/Connect interface for enqueuing new actions, updating their status, and monitoring state changes via a streaming API.

Enqueuing a New Action

To start a task execution, you enqueue an action using the ActionsService.Enqueue method. This creates a TaskAction Custom Resource (CR) in the configured Kubernetes namespace.

import (
"context"
"connectrpc.com/connect"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/actions"
)

func triggerAction(ctx context.Context, svc *ActionsService, action *actions.Action) error {
req := connect.NewRequest(&actions.EnqueueRequest{
Action: action,
// Optional: RunSpec for specific execution requirements
RunSpec: nil,
})

_, err := svc.Enqueue(ctx, req)
if err != nil {
return fmt.Errorf("failed to enqueue action: %w", err)
}
return nil
}

The ActionsService delegates the actual creation to the ActionsClient, which translates the actions.Action message into a executorv1.TaskAction Kubernetes object.

Monitoring Action Progress

Flyte provides a streaming WatchForUpdates API to monitor the lifecycle of actions. This is particularly useful for parent actions that need to track the progress of multiple child actions.

The implementation in ActionsService.WatchForUpdates ensures no events are missed by subscribing to updates before retrieving the initial state snapshot:

// From actions/service/actions_service.go
func (s *ActionsService) WatchForUpdates(
ctx context.Context,
req *connect.Request[actions.WatchForUpdatesRequest],
stream *connect.ServerStream[actions.WatchForUpdatesResponse],
) error {
parentActionID := req.Msg.GetParentActionId()

// 1. Subscribe before listing to avoid missing events between snapshot and watch
updateCh := s.client.Subscribe(parentActionID.Name)
defer s.client.Unsubscribe(parentActionID.Name, updateCh)

// 2. Send initial state snapshot
childActions, _ := s.client.ListChildActions(ctx, parentActionID)
for _, action := range childActions {
stream.Send(&actions.WatchForUpdatesResponse{
Message: &actions.WatchForUpdatesResponse_ActionUpdate{
ActionUpdate: taskActionToUpdate(action),
},
})
}

// 3. Send sentinel to signal end of initial snapshot
stream.Send(&actions.WatchForUpdatesResponse{
Message: &actions.WatchForUpdatesResponse_ControlMessage{
ControlMessage: &workflow.ControlMessage{Sentinel: true},
},
})

// 4. Stream live updates from the channel
for {
select {
case <-ctx.Done():
return nil
case update := <-updateCh:
// Send update to stream...
}
}
}

Event Ordering

The ActionsClient maintains event ordering by using a sharded worker pool. Events are sharded by the TaskAction name, ensuring that updates for a specific resource are processed and delivered in the order they occurred.

Updating Action Status

As a task progresses, its status is updated via the Update endpoint. This is typically called by the executor or the task itself to report phase changes (e.g., from QUEUED to RUNNING).

func updateStatus(ctx context.Context, svc *ActionsService, actionID *common.ActionIdentifier, status *workflow.ActionStatus) error {
req := connect.NewRequest(&actions.UpdateRequest{
ActionId: actionID,
Attempt: 1,
Status: status,
})

_, err := svc.Update(ctx, req)
return err
}

The ActionsClient.PutStatus method handles the Kubernetes patch operation to update the TaskAction status and conditions.

Aborting Actions

Aborting an action stops its execution and triggers a cascading cleanup of any descendant actions. This is implemented using Kubernetes OwnerReferences, so deleting or aborting a parent action automatically cleans up its children.

func abortTask(ctx context.Context, svc *ActionsService, actionID *common.ActionIdentifier) error {
reason := "User requested abort"
req := connect.NewRequest(&actions.AbortRequest{
ActionId: actionID,
Reason: &reason,
})

_, err := svc.Abort(ctx, req)
return err
}

Configuration Requirements

To use the Actions Service, ensure the following configurations are set in your Flyte deployment:

ParameterDefaultDescription
actions.kubernetes.namespaceflyteThe namespace where TaskAction CRs are stored.
actions.watchWorkers10Number of parallel workers for processing watch events.
actions.watchBufferSize100Buffer size for the internal update channels.

The service is initialized in actions/setup.go by creating an ActionsClient and passing it to NewActionsService. The client must start its watcher via StartWatching(ctx) before the service can process updates.