Action Lifecycle and State Management
Flyte manages the execution state of complex workflows by tracking individual actions through a series of phase transitions. This lifecycle is captured via granular events and managed in-memory using a hierarchical tree structure that supports real-time monitoring and filtering.
Tracking Phase Transitions with Action Events
When an action (such as a task or a subworkflow) changes state, Flyte records this transition as an ActionEvent. These events provide a historical record of every phase an action attempt has passed through, from QUEUED to RUNNING and eventually to terminal states like SUCCEEDED or FAILED.
The ActionEvent model in runs/repository/models/action_event.go uses a composite primary key to ensure that duplicate event notifications do not result in redundant database entries:
type ActionEvent struct {
Project string `db:"project"`
Domain string `db:"domain"`
RunName string `db:"run_name"`
Name string `db:"name"`
Attempt uint32 `db:"attempt"`
Phase int32 `db:"phase"` // common.ActionPhase
Version uint32 `db:"version"`
// Serialized workflow.ActionEvent proto
Info []byte `db:"info"`
// ...
}
You interact with these events through the ActionRepo interface. For example, when the system receives a new event from the execution engine, it calls InsertEvents to persist the transition:
// From runs/repository/interfaces/action.go
type ActionRepo interface {
InsertEvents(ctx context.Context, events []*models.ActionEvent) error
ListEvents(ctx context.Context, actionID *common.ActionIdentifier, limit int) ([]*models.ActionEvent, error)
// ...
}
In-Memory State Management
While the database stores the source of truth, Flyte uses the runStateManager in runs/service/run_state_manager.go to maintain a live, hierarchical view of a run's state. This is particularly important for the UI, which needs to display parent-child relationships and aggregate statistics (like how many child tasks are currently running).
The Action Tree
The runStateManager builds a tree of node objects. Each node tracks its parent, children, and the current phase counts of its descendants:
type node struct {
Parent *node
Action *models.Action
Children []*node
ChildPhaseCounts map[common.ActionPhase]int
MatchingDescendantCount int
}
When you update an action via upsertActions, the manager automatically recalculates these aggregates. For instance, if a child task moves from QUEUED to RUNNING, the runStateManager traverses up the tree to update the ChildPhaseCounts of all ancestor nodes.
Chronological Ordering Requirement
The runStateManager requires that actions are processed in the order they were created. If you attempt to insert a child action before its parent has been registered in the AllNodes map, the manager will return an error because it cannot establish the necessary tree links:
// From runs/service/run_state_manager.go
func (rsm *runStateManager) insertAction(actionNode *node, changed map[string]struct{}) error {
// ...
parent := rsm.GetActionTreeNodeByName(parentName)
if parent == nil {
return fmt.Errorf("parent node [%s] not found for action [%s]", parentName, actionNode.Action.Name)
}
rsm.attachChild(parent, actionNode, changed)
return nil
}
Real-time Monitoring via WatchActions
The primary consumer of the runStateManager is the WatchActions endpoint in the RunService. This service provides a streaming API that allows clients to receive real-time updates as the workflow progresses.
When you call WatchActions, the service performs the following steps:
- Initialize Watch: It starts a database watch via
ActionRepo.WatchAllActionUpdatesto capture any new events. - Bootstrap State: It lists all existing actions for the run and populates the
runStateManager. - Stream Updates: As new updates arrive on the
updatesCh, they are passed torsm.upsertActions.
// Simplified logic from runs/service/run_service.go
func (s *RunService) WatchActions(...) error {
updatesCh := make(chan *models.Action, 50)
go s.repo.ActionRepo().WatchAllActionUpdates(ctx, runID, updatesCh, errsCh)
rsm, _ := newRunStateManager(req.Msg.GetFilter())
// Initial sync
s.listAndSendAllActions(ctx, runID, rsm, stream)
for {
select {
case updated := <-updatesCh:
// Update the tree and get nodes that changed visibility or state
updates, _ := rsm.upsertActions(ctx, []*models.Action{updated})
s.sendChangedActions(runID, updates, stream)
}
}
}
Filtering and Visibility Logic
The runStateManager handles complex filtering logic to ensure the frontend only receives the data it needs while maintaining the structural integrity of the tree.
Visibility Rules
A node is considered "visible" if:
- It directly matches the active filter (e.g., its phase is
FAILED). - It has at least one descendant that matches the filter.
This ensures that if you filter for failed tasks, you can still see the parent nodes (like subworkflows) that lead to those failures, even if the parents themselves are technically in a RUNNING or SUCCEEDED state.
Communicating Changes
The runStateManager returns a slice of nodeUpdate objects. This struct tells the streaming service whether a node should be rendered or pruned:
type nodeUpdate struct {
Node *node
MeetsFilter bool // If false, the frontend should remove this node
}
Internally, upsertActions reconciles the VisibleNodes and DirectlyMatchingNodes maps. If a node no longer matches the filter and has no matching descendants, it is pruned, and the manager may recursively prune its ancestors if they also no longer meet the visibility criteria.