Skip to main content

ActionsClient

This class handles all etcd/K8s TaskAction CR operations for the Actions service. It manages watching for updates, deduplicates recorded actions, and uses a worker pool to process events for TaskAction resources while preserving per-resource ordering.

Methods


Enqueue()

@classmethod
def Enqueue(
ctx: context.Context,
action: *actions.Action,
runSpec: *task.RunSpec
) - > error

Enqueues a new action for processing. This method adds an action to the system, making it ready to be executed according to its run specification.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, enabling cancellation and carrying request-scoped values.
action*actions.ActionThe action to be enqueued, containing its definition and metadata.
runSpec*task.RunSpecThe specification for how the action should be run, including execution parameters and environment.

Returns

TypeDescription
errorAn error if the action could not be enqueued, otherwise nil.

AbortAction()

@classmethod
def AbortAction(
ctx: context.Context,
actionID: *common.ActionIdentifier,
reason: *string
) - > error

Aborts an action that is currently in progress or pending. This method stops the execution of a specified action and marks it as aborted.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, enabling cancellation and carrying request-scoped values.
actionID*common.ActionIdentifierThe unique identifier of the action to be aborted.
reason*stringA string explaining why the action is being aborted.

Returns

TypeDescription
errorAn error if the action could not be aborted, otherwise nil.

PutStatus()

@classmethod
def PutStatus(
ctx: context.Context,
actionID: *common.ActionIdentifier,
attempt: uint32,
status: *workflow.ActionStatus
) - > error

Updates the status of a specific attempt for an action. This method is used to report the current state of an action's execution.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, enabling cancellation and carrying request-scoped values.
actionID*common.ActionIdentifierThe unique identifier of the action whose status is being updated.
attemptuint32The specific attempt number for which the status is being reported.
status*workflow.ActionStatusThe new status to set for the action attempt, indicating its current state (e.g., running, completed, failed).

Returns

TypeDescription
errorAn error if the status could not be updated, otherwise nil.

ListRunActions()

@classmethod
def ListRunActions(
ctx: context.Context,
runID: *common.RunIdentifier
) - > ([]*executorv1.TaskAction, error)

Lists all task actions associated with a specific run. This method allows callers to retrieve all actions that are part of a particular execution run.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, enabling cancellation and carrying request-scoped values.
runID*common.RunIdentifierThe unique identifier of the run for which to list actions.

Returns

TypeDescription
([]*executorv1.TaskAction, error)A slice of TaskAction objects representing all actions within the specified run, and an error if the listing fails.

ListChildActions()

@classmethod
def ListChildActions(
ctx: context.Context,
parentActionID: *common.ActionIdentifier
) - > ([]*executorv1.TaskAction, error)

Lists all child task actions for a given parent action. This method is used to retrieve actions that are spawned or dependent on a specific parent action.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, enabling cancellation and carrying request-scoped values.
parentActionID*common.ActionIdentifierThe unique identifier of the parent action for which to list child actions.

Returns

TypeDescription
([]*executorv1.TaskAction, error)A slice of TaskAction objects representing the child actions of the specified parent, and an error if the listing fails.

GetTaskAction()

@classmethod
def GetTaskAction(
ctx: context.Context,
actionID: *common.ActionIdentifier
) - > (*executorv1.TaskAction, error)

Retrieves a specific task action by its identifier. This method allows callers to fetch detailed information about a particular action.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, enabling cancellation and carrying request-scoped values.
actionID*common.ActionIdentifierThe unique identifier of the task action to retrieve.

Returns

TypeDescription
(*executorv1.TaskAction, error)The TaskAction object corresponding to the given ID, and an error if the action is not found or retrieval fails.

Subscribe()

@classmethod
def Subscribe(
parentActionName: string
) - > chan *ActionUpdate

Subscribes to updates for actions related to a specific parent action. This method returns a channel that will receive ActionUpdate messages whenever a relevant action changes state.

Parameters

NameTypeDescription
parentActionNamestringThe name of the parent action for which to receive updates. Updates will be sent for this parent action and its children.

Returns

TypeDescription
chan *ActionUpdateA channel through which ActionUpdate messages are sent, providing real-time notifications about action changes.

Unsubscribe()

@classmethod
def Unsubscribe(
parentActionName: string,
ch: chan *ActionUpdate
)

Unsubscribes a channel from receiving updates for a specific parent action. This method stops sending ActionUpdate messages to the provided channel.

Parameters

NameTypeDescription
parentActionNamestringThe name of the parent action from which to unsubscribe the channel.
chchan *ActionUpdateThe channel to be unsubscribed from receiving action updates.

StartWatching()

@classmethod
def StartWatching(
ctx: context.Context
) - > error

Initiates the watching process for TaskAction resources. This method sets up the necessary informers and workers to monitor changes in TaskAction objects, enabling real-time updates and event processing.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for starting the watching process, allowing for cancellation.

Returns

TypeDescription
errorAn error if the watching process cannot be started, otherwise nil.

setupInformer()

@classmethod
def setupInformer(
ctx: context.Context
) - > error

Sets up the Kubernetes informer for TaskAction resources. This internal method configures the mechanism to watch for changes in TaskAction objects within the Kubernetes cluster.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for setting up the informer, allowing for cancellation.

Returns

TypeDescription
errorAn error if the informer cannot be set up, otherwise nil.

worker()

@classmethod
def worker(
ctx: context.Context,
ch: < -chan watch.Event,
stopCh: < -chan struct{}
)

Processes watch events received from the Kubernetes API. This method runs as a goroutine, consuming events from a channel and dispatching them for further handling, ensuring per-resource ordering.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the worker, allowing for graceful shutdown.
ch< -chan watch.EventA receive-only channel from which watch events are consumed.
stopCh< -chan struct{}A channel that signals when the worker should stop processing events.

dispatchEvent()

@classmethod
def dispatchEvent(
taskAction: *executorv1.TaskAction,
eventType: watch.EventType
)

Dispatches a TaskAction event to the appropriate handlers. This method takes a TaskAction and an event type, then routes it for processing, including notifying subscribers and the run service.

Parameters

NameTypeDescription
taskAction*executorv1.TaskActionThe TaskAction object that is the subject of the event.
eventTypewatch.EventTypeThe type of watch event (e.g., Added, Modified, Deleted) that occurred for the TaskAction.

handleWatchEvent()

@classmethod
def handleWatchEvent(
ctx: context.Context,
event: watch.Event
)

Handles a single watch event received from the Kubernetes API. This method processes the raw watch event, extracts the TaskAction, and dispatches it to the appropriate event handler.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for handling the event, allowing for cancellation.
eventwatch.EventThe watch event containing information about a change to a Kubernetes resource.

handleTaskActionEvent()

@classmethod
def handleTaskActionEvent(
ctx: context.Context,
taskAction: *executorv1.TaskAction,
eventType: watch.EventType
)

Processes a specific TaskAction event. This method determines the necessary actions to take based on the TaskAction's state and the event type, such as notifying subscribers or updating the run service.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for handling the event, allowing for cancellation.
taskAction*executorv1.TaskActionThe TaskAction object that has changed.
eventTypewatch.EventTypeThe type of watch event (e.g., Added, Modified, Deleted) that occurred for the TaskAction.

shouldSkipTaskAction()

@classmethod
def shouldSkipTaskAction(
taskAction: *executorv1.TaskAction
) - > bool

Determines if a given TaskAction should be skipped from further processing. This method applies filtering logic to avoid redundant or unnecessary event handling for certain TaskActions.

Parameters

NameTypeDescription
taskAction*executorv1.TaskActionThe TaskAction object to evaluate for skipping.

Returns

TypeDescription
boolTrue if the TaskAction should be skipped, false otherwise.

notifySubscribers()

@classmethod
def notifySubscribers(
ctx: context.Context,
update: *ActionUpdate
)

Notifies all subscribed channels about an ActionUpdate. This method iterates through all active subscribers for a given parent action and sends them the latest update.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the notification, allowing for cancellation.
update*ActionUpdateThe ActionUpdate message containing the latest information about an action's state change.

notifyRunService()

@classmethod
def notifyRunService(
ctx: context.Context,
taskAction: *executorv1.TaskAction,
update: *ActionUpdate,
eventType: watch.EventType
)

Notifies the run service about changes to a TaskAction. This method communicates action status updates to the central run management service, ensuring consistency across the system.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the notification, allowing for cancellation.
taskAction*executorv1.TaskActionThe TaskAction object whose status has changed.
update*ActionUpdateThe ActionUpdate message containing the latest information about the action's state change.
eventTypewatch.EventTypeThe type of watch event (e.g., Added, Modified, Deleted) that triggered the notification.

StopWatching()

@classmethod
def StopWatching()

Stops the watching process for TaskAction resources. This method gracefully shuts down all informers and worker goroutines, releasing resources and ceasing to monitor for changes.


markTerminalStatusRecorded()

@classmethod
def markTerminalStatusRecorded(
ctx: context.Context,
taskAction: *executorv1.TaskAction
) - > error

Marks a TaskAction's terminal status as recorded to prevent duplicate processing. This method updates the TaskAction resource to indicate that its final state has been handled, typically by adding a finalizer or annotation.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, enabling cancellation and carrying request-scoped values.
taskAction*executorv1.TaskActionThe TaskAction object whose terminal status needs to be marked as recorded.

Returns

TypeDescription
errorAn error if the status could not be marked as recorded, otherwise nil.