Skip to main content

TaskActionReconciler

No overview available.

Attributes

AttributeTypeDescription
Scheme*runtime.SchemeStores the runtime scheme for the TaskAction object, used for type-aware serialization and deserialization.
Recorderevents.EventRecorderRecords events related to the TaskAction object, providing an audit trail and debugging information.
PluginRegistry*plugin.RegistryManages and provides access to registered plugins, allowing the TaskActionReconciler to extend its functionality.
DataStore*storage.DataStoreProvides an interface for storing and retrieving data, enabling persistence for TaskAction related information.
SecretManagerpluginsCore.SecretManagerManages secrets, allowing the TaskActionReconciler to securely access sensitive information.
ResourceManagerpluginsCore.ResourceManagerManages resources, enabling the TaskActionReconciler to interact with and control external resources.
CatalogClientcatalog.AsyncClientProvides an asynchronous client for interacting with the catalog service, allowing non-blocking catalog operations.
Catalogcatalog.ClientProvides a synchronous client for interacting with the catalog service, enabling direct catalog operations.
MaxSystemFailuresuint32Defines the maximum number of system failures allowed before a TaskAction is considered unrecoverable, influencing retry logic.

Methods


evaluateCacheBeforeExecution()

@classmethod
def evaluateCacheBeforeExecution(
ctx: context.Context,
taskAction: *flyteorgv1.TaskAction,
tCtx: pluginsCore.TaskExecutionContext
) - > (pluginsCore.Transition, bool, error)

Evaluates the cache before task execution to determine if the task can be short-circuited using cached results. This method checks for existing cached outputs to avoid redundant computation.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, used for cancellation and deadlines.
taskAction*flyteorgv1.TaskActionThe TaskAction object representing the task to be executed, containing its definition and current state.
tCtxpluginsCore.TaskExecutionContextThe task execution context, providing access to task-specific information and utilities.

Returns

TypeDescription
(pluginsCore.Transition, bool, error)A transition object indicating the next state, a boolean indicating if the cache was short-circuited, and an error if any occurred.

finalizeCacheAfterExecution()

@classmethod
def finalizeCacheAfterExecution(
ctx: context.Context,
taskAction: *flyteorgv1.TaskAction,
tCtx: pluginsCore.TaskExecutionContext,
transition: pluginsCore.Transition,
cacheShortCircuited: bool
) - > (pluginsCore.Transition, error)

Finalizes cache operations after task execution, potentially writing new outputs to the cache or updating cache metadata. This ensures that subsequent identical task executions can benefit from caching.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, used for cancellation and deadlines.
taskAction*flyteorgv1.TaskActionThe TaskAction object representing the task that was executed.
tCtxpluginsCore.TaskExecutionContextThe task execution context, providing access to task-specific information and utilities.
transitionpluginsCore.TransitionThe transition object representing the state change after execution.
cacheShortCircuitedboolA boolean indicating whether the task was short-circuited by the cache.

Returns

TypeDescription
(pluginsCore.Transition, error)A transition object indicating the next state and an error if any occurred during finalization.

writeTaskOutputsToCache()

@classmethod
def writeTaskOutputsToCache(
ctx: context.Context,
tCtx: pluginsCore.TaskExecutionContext,
key: catalog.Key
) - > error

Writes the outputs of a completed task to the cache. This method is used to store results for future reuse, improving efficiency.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, used for cancellation and deadlines.
tCtxpluginsCore.TaskExecutionContextThe task execution context, providing access to task-specific information and outputs.
keycatalog.KeyThe cache key used to identify and retrieve the task outputs.

Returns

TypeDescription
errorAn error if the write operation fails, otherwise nil.

releaseCacheReservation()

@classmethod
def releaseCacheReservation(
ctx: context.Context,
cacheCfg: *taskCacheConfig
) - > error

Releases a previously acquired cache reservation. This is important for managing cache resources and preventing deadlocks.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, used for cancellation and deadlines.
cacheCfg*taskCacheConfigThe configuration object for the task cache, containing details about the reservation to be released.

Returns

TypeDescription
errorAn error if the reservation release fails, otherwise nil.

maxSystemFailures()

@classmethod
def maxSystemFailures() - > uint32

Retrieves the maximum number of system failures allowed for a task before it is marked as permanently failed. This helps in configuring resilience policies.

Returns

TypeDescription
uint32The maximum number of system failures as an unsigned 32-bit integer.

resetPluginResource()

@classmethod
def resetPluginResource(
ctx: context.Context,
taskAction: *flyteorgv1.TaskAction,
p: pluginsCore.Plugin,
tCtx: pluginsCore.TaskExecutionContext
)

Resets the resources associated with a plugin for a given task action. This is typically used to clean up or reinitialize plugin-specific state.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, used for cancellation and deadlines.
taskAction*flyteorgv1.TaskActionThe TaskAction object whose plugin resources need to be reset.
ppluginsCore.PluginThe plugin instance whose resources are to be reset.
tCtxpluginsCore.TaskExecutionContextThe task execution context, providing access to task-specific information.

recordSystemError()

@classmethod
def recordSystemError(
ctx: context.Context,
taskAction: *flyteorgv1.TaskAction,
original: *flyteorgv1.TaskAction,
pluginID: string,
handleErr: error
) - > (ctrl.Result, error)

Records a system error encountered during task execution and updates the TaskAction's state accordingly. This method handles error propagation and state transitions for system-level failures.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, used for cancellation and deadlines.
taskAction*flyteorgv1.TaskActionThe current TaskAction object that encountered the error.
original*flyteorgv1.TaskActionThe original TaskAction object before any modifications, used for comparison.
pluginIDstringThe identifier of the plugin that reported the error.
handleErrerrorThe error object that was encountered and needs to be recorded.

Returns

TypeDescription
(ctrl.Result, error)A reconciliation result and an error if the recording process itself fails.

finalizePermanentFailure()

@classmethod
def finalizePermanentFailure(
ctx: context.Context,
taskAction: *flyteorgv1.TaskAction,
original: *flyteorgv1.TaskAction,
execErr: *core.ExecutionError
) - > (ctrl.Result, error)

Finalizes the state of a TaskAction when it reaches a permanent failure. This involves updating its status and potentially performing cleanup actions.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, used for cancellation and deadlines.
taskAction*flyteorgv1.TaskActionThe current TaskAction object that has permanently failed.
original*flyteorgv1.TaskActionThe original TaskAction object before any modifications, used for comparison.
execErr*core.ExecutionErrorThe execution error object detailing the reason for the permanent failure.

Returns

TypeDescription
(ctrl.Result, error)A reconciliation result and an error if the finalization process itself fails.

Reconcile()

@classmethod
def Reconcile(
ctx: context.Context,
req: ctrl.Request
) - > (ctrl.Result, error)

Reconciles the state of a TaskAction object, ensuring it matches the desired state. This is the main entry point for the controller's reconciliation loop.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, used for cancellation and deadlines.
reqctrl.RequestThe reconciliation request containing the name and namespace of the TaskAction to reconcile.

Returns

TypeDescription
(ctrl.Result, error)A reconciliation result indicating whether to requeue the request and an error if reconciliation fails.

ensureTerminalLabels()

@classmethod
def ensureTerminalLabels(
ctx: context.Context,
taskAction: *flyteorgv1.TaskAction
) - > error

Ensures that the TaskAction object has the correct terminal labels applied. This is crucial for proper filtering and state management within the system.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, used for cancellation and deadlines.
taskAction*flyteorgv1.TaskActionThe TaskAction object to which terminal labels should be applied.

Returns

TypeDescription
errorAn error if updating the labels fails, otherwise nil.

handleAbortAndFinalize()

@classmethod
def handleAbortAndFinalize(
ctx: context.Context,
taskAction: *flyteorgv1.TaskAction
) - > (ctrl.Result, error)

Handles the abort request for a TaskAction and finalizes its state. This method ensures that resources are cleaned up and the task transitions to an aborted state.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, used for cancellation and deadlines.
taskAction*flyteorgv1.TaskActionThe TaskAction object that needs to be aborted and finalized.

Returns

TypeDescription
(ctrl.Result, error)A reconciliation result and an error if the abort or finalization process fails.

removeFinalizer()

@classmethod
def removeFinalizer(
ctx: context.Context,
taskAction: *flyteorgv1.TaskAction
) - > (ctrl.Result, error)

Removes the finalizer from a TaskAction object. This is typically done after all cleanup operations are complete, allowing the object to be garbage collected.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, used for cancellation and deadlines.
taskAction*flyteorgv1.TaskActionThe TaskAction object from which the finalizer should be removed.

Returns

TypeDescription
(ctrl.Result, error)A reconciliation result and an error if the finalizer removal fails.

updateTaskActionStatus()

@classmethod
def updateTaskActionStatus(
ctx: context.Context,
oldTaskAction: *flyteorgv1.TaskAction,
newTaskAction: *flyteorgv1.TaskAction,
phaseInfo: pluginsCore.PhaseInfo
) - > error

Updates the status of a TaskAction object based on the provided phase information. This method ensures that the TaskAction's state accurately reflects its current progress.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, used for cancellation and deadlines.
oldTaskAction*flyteorgv1.TaskActionThe previous state of the TaskAction object, used for comparison and optimistic locking.
newTaskAction*flyteorgv1.TaskActionThe updated TaskAction object with the new status.
phaseInfopluginsCore.PhaseInfoInformation about the current phase of the task, including its state and any associated errors.

Returns

TypeDescription
errorAn error if the status update fails, otherwise nil.

buildActionEvent()

@classmethod
def buildActionEvent(
ctx: context.Context,
taskAction: *flyteorgv1.TaskAction,
phaseInfo: pluginsCore.PhaseInfo
) - > *workflow.ActionEvent

Builds an ActionEvent object from a TaskAction and its current phase information. This event is used for auditing and external system notifications.

Parameters

NameTypeDescription
ctxcontext.ContextThe context for the request, used for cancellation and deadlines.
taskAction*flyteorgv1.TaskActionThe TaskAction object for which the event is being built.
phaseInfopluginsCore.PhaseInfoInformation about the current phase of the task, used to populate event details.

Returns

TypeDescription
*workflow.ActionEventAn ActionEvent object representing the current state and phase of the TaskAction.

SetupWithManager()

@classmethod
def SetupWithManager(
mgr: ctrl.Manager
) - > error

Sets up the TaskActionReconciler with a controller manager. This method registers the reconciler to watch for TaskAction events and ensures it is properly initialized within the controller runtime.

Parameters

NameTypeDescription
mgrctrl.ManagerThe controller manager responsible for running controllers and managing their lifecycle.

Returns

TypeDescription
errorAn error if the setup fails, otherwise nil.