Skip to main content

Core Plugin Architecture

When you need to extend Flyte with custom execution logic—such as integrating with a specific cloud service or implementing a specialized local operator—you implement the core plugin interfaces. These interfaces define how Flyte's engine (Propeller) initializes your code, passes task data, and manages the task lifecycle.

Registering a Plugin

To make Flyte aware of your plugin, you must register a PluginEntry in the global registry. This is typically done within an init() function. The PluginEntry defines which task types your plugin handles and how to instantiate it.

// Example from flyteplugins/go/tasks/plugins/core/sleep/plugin.go
func init() {
pluginmachinery.PluginRegistry().RegisterCorePlugin(
core.PluginEntry{
ID: "sleep",
RegisteredTaskTypes: []core.TaskType{"sleep"},
LoadPlugin: func(ctx context.Context, iCtx core.SetupContext) (core.Plugin, error) {
return &Plugin{
taskStartTimes: make(map[string]time.Time),
}, nil
},
IsDefault: false,
},
)
}

The LoadPlugin field is a PluginLoader function. Flyte calls this lazily when it encounters a task type your plugin is registered for. It is guaranteed to be called before any execution methods like Handle or Abort.

Initializing with SetupContext

The PluginLoader receives a SetupContext (defined in flyteplugins/go/tasks/pluginmachinery/core/setup_context.go), which provides access to infrastructure-level components. You use this context during initialization to set up shared resources:

  • KubeClient(): Access the Kubernetes API if your plugin manages K8s resources.
  • SecretManager(): Retrieve configured secrets required for authentication with external services.
  • ResourceRegistrar(): Register resource quotas (e.g., limiting the number of concurrent external API calls).
  • MetricsScope(): Obtain a promutils.Scope to publish plugin-specific Prometheus metrics.

The Plugin Lifecycle

The Plugin interface (in flyteplugins/go/tasks/pluginmachinery/core/plugin.go) governs the execution of every task instance. It consists of three primary methods:

Handle

The Handle method contains the core logic. It is invoked repeatedly by the Flyte engine until the task reaches a terminal state (Success or Failure).

Handle(ctx context.Context, tCtx TaskExecutionContext) (Transition, error)

Because Flyte operates on a non-blocking control loop, Handle must return quickly. If your task performs a long-running operation, Handle should initiate the operation and return a Transition to a Running phase. Flyte will then re-invoke Handle later to check the status.

Abort and Finalize

  • Abort: Called when a task is cancelled (e.g., the user aborts the workflow). Use this to clean up external resources or stop running jobs.
  • Finalize: Always called after the task completes, regardless of whether it succeeded, failed, or was aborted. This is the place for final cleanup logic.

Critical Requirement: All three methods must be idempotent. Flyte may call them multiple times for the same task execution due to network retries or engine restarts.

Interacting with Task Data

Every lifecycle method receives a TaskExecutionContext (flyteplugins/go/tasks/pluginmachinery/core/exec_context.go). This is your primary API for interacting with the specific task instance:

  • Inputs: Use InputReader() to retrieve the data passed to the task.
  • Outputs: Use OutputWriter() to persist the results of the task execution.
  • Metadata: Use TaskExecutionMetadata() to get the task's unique ID, resource overrides, or annotations.
  • Storage: Use DataStore() to interact directly with Flyte's configured storage backend (e.g., S3, GCS).

Managing Internal State

Since Handle is called multiple times, you often need to persist internal state between calls (e.g., an external job ID). Use the PluginStateReader and PluginStateWriter for this:

// Reading state
var currentState MyPluginState
_, err := tCtx.PluginStateReader().Get(&currentState)

// Writing state (visible in the NEXT Handle call)
err := tCtx.PluginStateWriter().Put(0, &newState)

Driving State with Transitions

The Handle method returns a Transition, which tells the Flyte engine how to progress the task's state machine. You create transitions using core.DoTransition(info PhaseInfo).

Common transitions include:

  • PhaseInfoRunning: The task is still in progress. Flyte will call Handle again after a backoff period.
  • PhaseInfoSuccess: The task completed successfully.
  • PhaseInfoFailure: The task failed. You must provide an error code and message.

Example of a transition in the sleep plugin:

// From flyteplugins/go/tasks/plugins/core/sleep/plugin.go
if time.Since(startTime) >= sleepDuration {
return core.DoTransition(core.PhaseInfoSuccess(nil)), nil
}

return core.DoTransition(core.PhaseInfoRunning(core.DefaultPhaseVersion, nil)), nil

Plugin Properties

The GetProperties() method allows your plugin to signal specific requirements to the engine via the PluginProperties struct:

  • DisableNodeLevelCaching: Set this to true if the task's output should never be cached.
  • GeneratedNameMaxLength: If your plugin generates resource names (like Kubernetes Job names), you can constrain the length. Flyte enforces a minimum of 8 characters for this value; providing a smaller value will cause LoadPlugin to return an error.