Skip to main content

Task Plugins & Machinery

Flyte provides an extensible framework for integrating with external compute backends by mapping Flyte tasks to Kubernetes Custom Resources (CRDs). This machinery allows Flyte to orchestrate complex distributed workloads like Spark, Ray, and Dask while leveraging the native operators of those ecosystems.

The K8s Plugin Interface

All Kubernetes-based task plugins in Flyte implement the k8s.Plugin interface defined in flyteplugins/go/tasks/pluginmachinery/k8s/plugin.go. This interface defines how Flyte creates, monitors, and cleans up Kubernetes resources.

type Plugin interface {
// BuildIdentityResource creates a query object used to query k8s resources.
BuildIdentityResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionMetadata) (client.Object, error)

// BuildResource creates the full resource object that will be posted to k8s.
BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (client.Object, error)

// GetTaskPhase analyses the k8s resource and reports the status as TaskPhase.
GetTaskPhase(ctx context.Context, pluginContext PluginContext, resource client.Object) (pluginsCore.PhaseInfo, error)

// GetProperties returns properties desired by the plugin.
GetProperties() PluginProperties

// GarbageCollectable enables cleanup of resources.
GarbageCollectable
}

Plugins register themselves with the Flyte engine using the pluginmachinery.PluginRegistry().RegisterK8sPlugin() method, typically within an init() function.

Spark Plugin

The Spark plugin translates Flyte tasks into SparkApplication resources for the spark-on-k8s-operator. It is implemented by the sparkResourceHandler in flyteplugins/go/tasks/plugins/k8s/spark/spark.go.

Building the Spark Resource

The BuildResource method unmarshals the task's custom configuration into a plugins.SparkJob struct and constructs the SparkApplication CRD.

func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (client.Object, error) {
taskTemplate, err := taskCtx.TaskReader().Read(ctx)
// ...
sparkJob := plugins.SparkJob{}
err = utils.UnmarshalStruct(taskTemplate.GetCustom(), &sparkJob)

sparkConfig := getSparkConfig(taskCtx, &sparkJob)
driverSpec, err := createDriverSpec(ctx, taskCtx, sparkConfig, &sparkJob)
executorSpec, err := createExecutorSpec(ctx, taskCtx, sparkConfig, &sparkJob)
app := createSparkApplication(&sparkJob, sparkConfig, driverSpec, executorSpec)
return app, nil
}

Key Behaviors

  • Interruptibility: Spark driver pods are forced to run as non-interruptible to ensure the stability of the Spark session, even if the task itself is marked as interruptible. This is handled in createDriverSpec using flytek8s.WithInterruptible(false).
  • UI Integration: The plugin surfaces the Spark Driver UI (when running) and the Spark History UI (after completion) by appending core.TaskLog entries in getEventInfoForSpark.

Ray Plugin

The Ray plugin manages RayJob resources for the KubeRay operator. It is implemented in flyteplugins/go/tasks/plugins/k8s/ray/ray.go.

Cluster Configuration

The Ray plugin handles complex cluster topologies by defining HeadGroupSpec and WorkerGroupSpecs. It uses flytek8s.ToK8sPodSpec to generate the base pod specifications for both head and worker nodes.

func constructRayJob(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext, rayJob *plugins.RayJob, ...) (*rayv1.RayJob, error) {
// ...
headPodTemplate, err := buildHeadPodTemplate(...)
rayClusterSpec := rayv1.RayClusterSpec{
HeadGroupSpec: rayv1.HeadGroupSpec{
Template: headPodTemplate,
RayStartParams: headNodeRayStartParams,
},
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{},
}
// ...
}

Key Behaviors

  • Submitter Pod: To prevent version mismatches, the RayJob submitter pod is configured to use the same image as the Ray head node in buildSubmitterPodTemplate.
  • Logs Sidecar: The plugin can inject a sidecar container into the head pod to capture and expose Ray job logs, as seen in injectLogsSidecar.

Dask Plugin

The Dask plugin integrates with the dask-kubernetes operator to manage DaskJob resources. It is implemented in flyteplugins/go/tasks/plugins/k8s/dask/dask.go.

Mapping Task Phases

The GetTaskPhase method maps the DaskJob status to Flyte's internal task phases.

func (p daskResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContext, r client.Object) (pluginsCore.PhaseInfo, error) {
job := r.(*daskAPI.DaskJob)
status := job.Status.JobStatus
// ...
switch status {
case daskAPI.DaskJobFailed:
return pluginsCore.PhaseInfoRetryableFailure(errors.DownstreamSystemError, "Dask Job failed", &info), nil
case daskAPI.DaskJobSuccessful:
return pluginsCore.PhaseInfoSuccess(&info), nil
default:
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, &info), nil
}
}

Dashboard Integration

Dask dashboard links require a specific --dashboard-prefix to work correctly through reverse proxies. The plugin calculates this prefix from the configured log templates in dashboardPrefixFromLogConfig and injects it into the scheduler's arguments.

Core Machinery: ToK8sPodSpec

A critical utility used by all K8s plugins is flytek8s.ToK8sPodSpec (found in flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_spec.go). This function converts Flyte's internal task representation into a standard Kubernetes PodSpec, handling:

  • Container images and arguments
  • Resource requests and limits
  • Environment variables
  • Volume mounts and secrets
  • Affinity and tolerations

Plugins typically call this first and then wrap the resulting PodSpec into their specific CRD structures. For example, the Dask plugin uses it to build both the worker and scheduler specs:

podSpec, objectMeta, primaryContainerName, err := flytek8s.ToK8sPodSpec(ctx, taskCtx)