Building Kubernetes Plugins
Flyte provides a specialized framework for building plugins that manage Kubernetes resources. By implementing the Plugin interface, you can integrate custom Kubernetes operators (CRDs) or standard resources like Pods into Flyte's execution engine.
In this guide, you will build a Kubernetes-based plugin by implementing the core machinery required to create resources and track their lifecycle.
Prerequisites
To build a Kubernetes plugin in Flyte, you need:
- A Go environment with the
flytepluginspackage. - Familiarity with the
controller-runtimelibrary used by Kubernetes operators. - A defined Kubernetes resource (e.g., a
Podor a Custom Resource likeSparkApplication).
1. Define the Plugin Handler
The first step is to create a struct that implements the Plugin interface defined in flyteplugins/go/tasks/pluginmachinery/k8s/plugin.go. This handler will contain the logic for resource creation and status mapping.
import (
"context"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/k8s"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type myK8sHandler struct {
}
func (m myK8sHandler) GetProperties() k8s.PluginProperties {
return k8s.PluginProperties{}
}
The myK8sHandler struct will serve as the entry point for all Kubernetes interactions for your specific task type.
2. Implement Resource Construction
You must implement two methods to handle resource identification and creation: BuildIdentityResource and BuildResource.
BuildIdentityResource is used by Flyte to create a "blank" object of your resource type, which is used to query the Kubernetes API server.
func (m myK8sHandler) BuildIdentityResource(ctx context.Context, taskCtx core.TaskExecutionMetadata) (client.Object, error) {
return &v1.Pod{}, nil
}
BuildResource is where you define the actual specification of the Kubernetes object that Flyte will submit. You should use flytek8s.ApplyFlytePodConfiguration to ensure platform-wide defaults (like labels and annotations) are applied.
import (
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/flytek8s"
)
func (m myK8sHandler) BuildResource(ctx context.Context, taskCtx core.TaskExecutionContext) (client.Object, error) {
// 1. Build the base pod spec using Flyte helpers
podSpec, objectMeta, primaryContainerName, err := flytek8s.BuildRawPod(ctx, taskCtx)
if err != nil {
return nil, err
}
// 2. Apply global configurations from K8sPluginConfig
podSpec, objectMeta, err = flytek8s.ApplyFlytePodConfiguration(ctx, taskCtx, podSpec, objectMeta, primaryContainerName)
if err != nil {
return nil, err
}
return &v1.Pod{
ObjectMeta: *objectMeta,
Spec: *podSpec,
}, nil
}
By calling ApplyFlytePodConfiguration, your plugin automatically respects settings defined in K8sPluginConfig, such as DefaultAnnotations, DefaultLabels, and DefaultCPURequest.
3. Map Kubernetes Status to Flyte Phases
Flyte needs to know the state of your Kubernetes resource to progress the workflow. You implement this in GetTaskPhase, which receives the current state of the resource from the Kubernetes API.
func (m myK8sHandler) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContext, resource client.Object) (core.PhaseInfo, error) {
pod := resource.(*v1.Pod)
switch pod.Status.Phase {
case v1.PodSucceeded:
return core.PhaseInfoSuccess(nil), nil
case v1.PodFailed:
return core.PhaseInfoRetryableFailure("PodFailed", pod.Status.Message, nil), nil
case v1.PodPending:
return core.PhaseInfoInitializing(pod.CreationTimestamp.Time, core.DefaultPhaseVersion, "Pod is pending", nil), nil
case v1.PodRunning:
return core.PhaseInfoRunning(core.DefaultPhaseVersion, nil), nil
}
return core.PhaseInfoUndefined, nil
}
This method translates Kubernetes-specific statuses (like v1.PodSucceeded) into Flyte's internal core.Phase (like PhaseSuccess).
4. Register the Plugin
To make Flyte aware of your plugin, you must register it using a PluginEntry in an init() function. This tells Flyte which task types your plugin handles and which Kubernetes resource it should watch.
import (
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery"
)
func init() {
pluginmachinery.PluginRegistry().RegisterK8sPlugin(
k8s.PluginEntry{
ID: "my_custom_task",
RegisteredTaskTypes: []core.TaskType{"my_custom_task"},
ResourceToWatch: &v1.Pod{},
Plugin: myK8sHandler{},
IsDefault: false,
})
}
The ResourceToWatch field is critical; it informs the Flyte propeller's informer factory which GVK (Group, Version, Kind) to cache and monitor.
5. Configure Global Defaults
You can control the behavior of all Kubernetes plugins, including your new one, by modifying the K8sPluginConfig. This configuration is typically loaded from the platform's configuration files.
For example, to ensure all pods created by your plugin have a specific CPU request or a finalizer, you would configure the K8sPluginConfig fields:
InjectFinalizer: Set totrueto ensure resources are cleaned up only after Flyte acknowledges termination.DefaultCPURequest: Sets a baseline for containers that don't specify their own.DeleteResourceOnFinalize: Iftrue, Flyte will explicitly delete the Kubernetes resource once the task reaches a terminal state.
These settings are defined in flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go and are automatically applied if you used ApplyFlytePodConfiguration in Step 2.
Complete Result
You have now implemented a Kubernetes plugin that:
- Defines a custom handler for a specific task type.
- Constructs a Kubernetes resource while applying platform-wide defaults.
- Monitors the resource and reports status back to Flyte.
- Registers itself with the Flyte plugin registry.
When a task of type my_custom_task is executed, Flyte will now use your plugin to manage the lifecycle of the corresponding Kubernetes Pod.