This class handles all etcd/K8s TaskAction CR operations for the Actions service. It manages watching for updates, deduplicates recorded actions, and uses a worker pool to process events for TaskAction resources while preserving per-resource ordering.
Methods
Enqueue()
@classmethod
def Enqueue(
ctx: context.Context,
action: *actions.Action,
runSpec: *task.RunSpec
) - > error
Enqueues a new action for processing. This method adds an action to the system, making it ready to be executed according to its run specification.
Parameters
| Name | Type | Description |
|---|
| ctx | context.Context | The context for the request, enabling cancellation and carrying request-scoped values. |
| action | *actions.Action | The action to be enqueued, containing its definition and metadata. |
| runSpec | *task.RunSpec | The specification for how the action should be run, including execution parameters and environment. |
Returns
| Type | Description |
|---|
error | An error if the action could not be enqueued, otherwise nil. |
AbortAction()
@classmethod
def AbortAction(
ctx: context.Context,
actionID: *common.ActionIdentifier,
reason: *string
) - > error
Aborts an action that is currently in progress or pending. This method stops the execution of a specified action and marks it as aborted.
Parameters
| Name | Type | Description |
|---|
| ctx | context.Context | The context for the request, enabling cancellation and carrying request-scoped values. |
| actionID | *common.ActionIdentifier | The unique identifier of the action to be aborted. |
| reason | *string | A string explaining why the action is being aborted. |
Returns
| Type | Description |
|---|
error | An error if the action could not be aborted, otherwise nil. |
PutStatus()
@classmethod
def PutStatus(
ctx: context.Context,
actionID: *common.ActionIdentifier,
attempt: uint32,
status: *workflow.ActionStatus
) - > error
Updates the status of a specific attempt for an action. This method is used to report the current state of an action's execution.
Parameters
| Name | Type | Description |
|---|
| ctx | context.Context | The context for the request, enabling cancellation and carrying request-scoped values. |
| actionID | *common.ActionIdentifier | The unique identifier of the action whose status is being updated. |
| attempt | uint32 | The specific attempt number for which the status is being reported. |
| status | *workflow.ActionStatus | The new status to set for the action attempt, indicating its current state (e.g., running, completed, failed). |
Returns
| Type | Description |
|---|
error | An error if the status could not be updated, otherwise nil. |
ListRunActions()
@classmethod
def ListRunActions(
ctx: context.Context,
runID: *common.RunIdentifier
) - > ([]*executorv1.TaskAction, error)
Lists all task actions associated with a specific run. This method allows callers to retrieve all actions that are part of a particular execution run.
Parameters
| Name | Type | Description |
|---|
| ctx | context.Context | The context for the request, enabling cancellation and carrying request-scoped values. |
| runID | *common.RunIdentifier | The unique identifier of the run for which to list actions. |
Returns
| Type | Description |
|---|
([]*executorv1.TaskAction, error) | A slice of TaskAction objects representing all actions within the specified run, and an error if the listing fails. |
ListChildActions()
@classmethod
def ListChildActions(
ctx: context.Context,
parentActionID: *common.ActionIdentifier
) - > ([]*executorv1.TaskAction, error)
Lists all child task actions for a given parent action. This method is used to retrieve actions that are spawned or dependent on a specific parent action.
Parameters
| Name | Type | Description |
|---|
| ctx | context.Context | The context for the request, enabling cancellation and carrying request-scoped values. |
| parentActionID | *common.ActionIdentifier | The unique identifier of the parent action for which to list child actions. |
Returns
| Type | Description |
|---|
([]*executorv1.TaskAction, error) | A slice of TaskAction objects representing the child actions of the specified parent, and an error if the listing fails. |
GetTaskAction()
@classmethod
def GetTaskAction(
ctx: context.Context,
actionID: *common.ActionIdentifier
) - > (*executorv1.TaskAction, error)
Retrieves a specific task action by its identifier. This method allows callers to fetch detailed information about a particular action.
Parameters
| Name | Type | Description |
|---|
| ctx | context.Context | The context for the request, enabling cancellation and carrying request-scoped values. |
| actionID | *common.ActionIdentifier | The unique identifier of the task action to retrieve. |
Returns
| Type | Description |
|---|
(*executorv1.TaskAction, error) | The TaskAction object corresponding to the given ID, and an error if the action is not found or retrieval fails. |
Subscribe()
@classmethod
def Subscribe(
parentActionName: string
) - > chan *ActionUpdate
Subscribes to updates for actions related to a specific parent action. This method returns a channel that will receive ActionUpdate messages whenever a relevant action changes state.
Parameters
| Name | Type | Description |
|---|
| parentActionName | string | The name of the parent action for which to receive updates. Updates will be sent for this parent action and its children. |
Returns
| Type | Description |
|---|
chan *ActionUpdate | A channel through which ActionUpdate messages are sent, providing real-time notifications about action changes. |
Unsubscribe()
@classmethod
def Unsubscribe(
parentActionName: string,
ch: chan *ActionUpdate
)
Unsubscribes a channel from receiving updates for a specific parent action. This method stops sending ActionUpdate messages to the provided channel.
Parameters
| Name | Type | Description |
|---|
| parentActionName | string | The name of the parent action from which to unsubscribe the channel. |
| ch | chan *ActionUpdate | The channel to be unsubscribed from receiving action updates. |
StartWatching()
@classmethod
def StartWatching(
ctx: context.Context
) - > error
Initiates the watching process for TaskAction resources. This method sets up the necessary informers and workers to monitor changes in TaskAction objects, enabling real-time updates and event processing.
Parameters
| Name | Type | Description |
|---|
| ctx | context.Context | The context for starting the watching process, allowing for cancellation. |
Returns
| Type | Description |
|---|
error | An error if the watching process cannot be started, otherwise nil. |
@classmethod
def setupInformer(
ctx: context.Context
) - > error
Sets up the Kubernetes informer for TaskAction resources. This internal method configures the mechanism to watch for changes in TaskAction objects within the Kubernetes cluster.
Parameters
| Name | Type | Description |
|---|
| ctx | context.Context | The context for setting up the informer, allowing for cancellation. |
Returns
| Type | Description |
|---|
error | An error if the informer cannot be set up, otherwise nil. |
worker()
@classmethod
def worker(
ctx: context.Context,
ch: < -chan watch.Event,
stopCh: < -chan struct{}
)
Processes watch events received from the Kubernetes API. This method runs as a goroutine, consuming events from a channel and dispatching them for further handling, ensuring per-resource ordering.
Parameters
| Name | Type | Description |
|---|
| ctx | context.Context | The context for the worker, allowing for graceful shutdown. |
| ch | < -chan watch.Event | A receive-only channel from which watch events are consumed. |
| stopCh | < -chan struct{} | A channel that signals when the worker should stop processing events. |
dispatchEvent()
@classmethod
def dispatchEvent(
taskAction: *executorv1.TaskAction,
eventType: watch.EventType
)
Dispatches a TaskAction event to the appropriate handlers. This method takes a TaskAction and an event type, then routes it for processing, including notifying subscribers and the run service.
Parameters
| Name | Type | Description |
|---|
| taskAction | *executorv1.TaskAction | The TaskAction object that is the subject of the event. |
| eventType | watch.EventType | The type of watch event (e.g., Added, Modified, Deleted) that occurred for the TaskAction. |
handleWatchEvent()
@classmethod
def handleWatchEvent(
ctx: context.Context,
event: watch.Event
)
Handles a single watch event received from the Kubernetes API. This method processes the raw watch event, extracts the TaskAction, and dispatches it to the appropriate event handler.
Parameters
| Name | Type | Description |
|---|
| ctx | context.Context | The context for handling the event, allowing for cancellation. |
| event | watch.Event | The watch event containing information about a change to a Kubernetes resource. |
handleTaskActionEvent()
@classmethod
def handleTaskActionEvent(
ctx: context.Context,
taskAction: *executorv1.TaskAction,
eventType: watch.EventType
)
Processes a specific TaskAction event. This method determines the necessary actions to take based on the TaskAction's state and the event type, such as notifying subscribers or updating the run service.
Parameters
| Name | Type | Description |
|---|
| ctx | context.Context | The context for handling the event, allowing for cancellation. |
| taskAction | *executorv1.TaskAction | The TaskAction object that has changed. |
| eventType | watch.EventType | The type of watch event (e.g., Added, Modified, Deleted) that occurred for the TaskAction. |
shouldSkipTaskAction()
@classmethod
def shouldSkipTaskAction(
taskAction: *executorv1.TaskAction
) - > bool
Determines if a given TaskAction should be skipped from further processing. This method applies filtering logic to avoid redundant or unnecessary event handling for certain TaskActions.
Parameters
| Name | Type | Description |
|---|
| taskAction | *executorv1.TaskAction | The TaskAction object to evaluate for skipping. |
Returns
| Type | Description |
|---|
bool | True if the TaskAction should be skipped, false otherwise. |
notifySubscribers()
@classmethod
def notifySubscribers(
ctx: context.Context,
update: *ActionUpdate
)
Notifies all subscribed channels about an ActionUpdate. This method iterates through all active subscribers for a given parent action and sends them the latest update.
Parameters
| Name | Type | Description |
|---|
| ctx | context.Context | The context for the notification, allowing for cancellation. |
| update | *ActionUpdate | The ActionUpdate message containing the latest information about an action's state change. |
notifyRunService()
@classmethod
def notifyRunService(
ctx: context.Context,
taskAction: *executorv1.TaskAction,
update: *ActionUpdate,
eventType: watch.EventType
)
Notifies the run service about changes to a TaskAction. This method communicates action status updates to the central run management service, ensuring consistency across the system.
Parameters
| Name | Type | Description |
|---|
| ctx | context.Context | The context for the notification, allowing for cancellation. |
| taskAction | *executorv1.TaskAction | The TaskAction object whose status has changed. |
| update | *ActionUpdate | The ActionUpdate message containing the latest information about the action's state change. |
| eventType | watch.EventType | The type of watch event (e.g., Added, Modified, Deleted) that triggered the notification. |
StopWatching()
@classmethod
def StopWatching()
Stops the watching process for TaskAction resources. This method gracefully shuts down all informers and worker goroutines, releasing resources and ceasing to monitor for changes.
markTerminalStatusRecorded()
@classmethod
def markTerminalStatusRecorded(
ctx: context.Context,
taskAction: *executorv1.TaskAction
) - > error
Marks a TaskAction's terminal status as recorded to prevent duplicate processing. This method updates the TaskAction resource to indicate that its final state has been handled, typically by adding a finalizer or annotation.
Parameters
| Name | Type | Description |
|---|
| ctx | context.Context | The context for the request, enabling cancellation and carrying request-scoped values. |
| taskAction | *executorv1.TaskAction | The TaskAction object whose terminal status needs to be marked as recorded. |
Returns
| Type | Description |
|---|
error | An error if the status could not be marked as recorded, otherwise nil. |