Developing Web API Plugins
Flyte provides a specialized framework for building plugins that interact with external REST or gRPC services. By using the Web API plugin machinery, you can offload long-running computations to external systems while Flyte handles the state management, polling, and rate limiting.
This tutorial walks you through building an asynchronous Web API plugin using the AsyncPlugin interface.
Prerequisites
To follow this tutorial, you need to import the following packages from Flyte:
import (
"context"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/webapi"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core"
)
Step 1: Define the Plugin State
An AsyncPlugin relies on two primary data structures to manage state: ResourceMeta and Resource. These are used to store information about the external task and its current status.
- ResourceMeta: Contains the minimum information needed to identify and query the resource in the remote service (e.g., a job ID).
- Resource: Contains the latest status retrieved from the remote service.
In the connector plugin (flyteplugins/go/tasks/plugins/webapi/connector/plugin.go), these are implemented as wrappers:
type ResourceMetaWrapper struct {
OutputPrefix string
ConnectorResourceMeta []byte
TaskCategory *connectorPb.TaskCategory
}
type ResourceWrapper struct {
Phase flyteIdl.TaskExecution_Phase
Message string
LogLinks []*flyteIdl.TaskLog
}
Step 2: Implement the AsyncPlugin Interface
The AsyncPlugin interface requires implementing methods for the full lifecycle of an external task.
Create: Idempotent Resource Initiation
The Create method is called by Flyte to start the task in the remote service. It must be idempotent because Flyte may call it multiple times for the same task execution.
func (p *Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextReader) (
webapi.ResourceMeta, webapi.Resource, error) {
// 1. Prepare the request using taskCtx (inputs, template, etc.)
taskTemplate, _ := taskCtx.TaskReader().Read(ctx)
outputPrefix := taskCtx.OutputWriter().GetOutputPrefixPath().String()
// 2. Call the remote service
res, err := p.client.CreateTask(ctx, &connectorPb.CreateTaskRequest{
Template: taskTemplate,
OutputPrefix: outputPrefix,
})
if err != nil {
return nil, nil, err
}
// 3. Return the metadata needed to track this task
return ResourceMetaWrapper{
OutputPrefix: outputPrefix,
ConnectorResourceMeta: res.GetResourceMeta(),
}, nil, nil
}
Get: Background Polling
The Get method is called periodically by Flyte's background workers to refresh the state of the resource.
func (p *Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (webapi.Resource, error) {
metadata := taskCtx.ResourceMeta().(ResourceMetaWrapper)
// Query the remote service using the metadata
res, err := p.client.GetTask(ctx, &connectorPb.GetTaskRequest{
ResourceMeta: metadata.ConnectorResourceMeta,
})
if err != nil {
return nil, err
}
return ResourceWrapper{
Phase: res.GetResource().GetPhase(),
Message: res.GetResource().GetMessage(),
}, nil
}
Status: Mapping to Flyte Phases
The Status method translates the cached Resource into a Flyte PhaseInfo. This method should be highly efficient and avoid network calls.
func (p *Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (core.PhaseInfo, error) {
resource := taskCtx.Resource().(ResourceWrapper)
taskInfo := &core.TaskInfo{Logs: resource.LogLinks}
switch resource.Phase {
case flyteIdl.TaskExecution_RUNNING:
return core.PhaseInfoRunning(core.DefaultPhaseVersion, taskInfo), nil
case flyteIdl.TaskExecution_SUCCEEDED:
return core.PhaseInfoSuccess(taskInfo), nil
case flyteIdl.TaskExecution_FAILED:
return core.PhaseInfoFailure("ExternalError", resource.Message, taskInfo), nil
default:
return core.PhaseInfoQueued(time.Now(), core.DefaultPhaseVersion, "waiting"), nil
}
}
Delete: Cleanup
The Delete method is called when a task is aborted or needs to be cleaned up.
func (p *Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error {
if taskCtx.ResourceMeta() == nil {
return nil
}
metadata := taskCtx.ResourceMeta().(ResourceMetaWrapper)
_, err := p.client.DeleteTask(ctx, &connectorPb.DeleteTaskRequest{
ResourceMeta: metadata.ConnectorResourceMeta,
})
return err
}
Step 3: Configure the Plugin
Use PluginConfig to define how Flyte should interact with your Web API, including rate limits and cache settings.
func (p *Plugin) GetConfig() webapi.PluginConfig {
return webapi.PluginConfig{
ReadRateLimiter: webapi.RateLimiterConfig{
QPS: 30,
Burst: 50,
},
WriteRateLimiter: webapi.RateLimiterConfig{
QPS: 20,
Burst: 30,
},
Caching: webapi.CachingConfig{
Size: 1000,
ResyncInterval: time.Second * 30,
Workers: 10,
},
// Provide an empty instance for state unmarshaling
ResourceMeta: ResourceMetaWrapper{},
}
}
Step 4: Register the Plugin
To make your plugin available in Flyte, you must define a PluginEntry and register it with the global PluginRegistry.
func RegisterMyPlugin() {
// Register custom types for serialization
gob.Register(ResourceMetaWrapper{})
gob.Register(ResourceWrapper{})
entry := webapi.PluginEntry{
ID: "my-web-api-plugin",
SupportedTaskTypes: []core.TaskType{"my_task_type"},
PluginLoader: func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) {
// Initialize your client and plugin here
return &Plugin{client: NewClient()}, nil
},
}
pluginmachinery.PluginRegistry().RegisterRemotePlugin(entry)
}
Summary of the Lifecycle
- Registration: Flyte loads the plugin via
PluginLoaderduring startup. - Execution: When a task of a supported type is encountered, Flyte calls
Create. - Caching: The returned
ResourceMetais stored in an in-memory auto-refresh cache. - Polling: Background workers periodically call
Getfor all active resources in the cache. - Progression: Flyte calls
Statusto determine if the task has moved to a new phase (e.g., fromRUNNINGtoSUCCEEDED). - Finalization: Once terminal, the resource is removed from the cache. If the workflow is aborted,
Deleteis called.