Handling Task Inputs and Outputs
Flyte abstracts data movement between the engine and task plugins through a set of IO interfaces. This ensures that plugins do not need to manage storage-specific logic (like S3 or GCS bucket paths) and can instead interact with high-level InputReader and OutputWriter components.
Reading Task Inputs
To access inputs within a plugin, use the InputReader provided in the task context. The Get method retrieves the inputs as a core.LiteralMap.
// From flyteplugins/go/tasks/pluginmachinery/catalog/async_client_impl.go
func (c *asyncClient) Get(ctx context.Context, tCtx core.TaskExecutionContext) (*core.LiteralMap, error) {
// Retrieve the inputs for this task
inputs, err := tCtx.InputReader().Get(ctx)
if err != nil {
return nil, err
}
// Process inputs...
return inputs, nil
}
The InputReader also provides access to the underlying storage paths via the InputFilePaths interface:
GetInputPath(): Returns the URN for theinputs.pbfile.GetInputPrefixPath(): Returns the directory containing the input files.
Writing Task Outputs
Writing outputs involves two components: an OutputReader (which holds the data to be written) and an OutputWriter (which handles the persistence to the storage backend).
Writing Literals
For small outputs or metadata generated in-memory, use ioutils.NewInMemoryOutputReader.
// Example of writing a LiteralMap to the output storage
literals := &core.LiteralMap{
Literals: map[string]*core.Literal{
"result": {Value: &core.Literal_Scalar{Scalar: &core.Scalar{Value: &core.Scalar_Primitive{Primitive: &core.Primitive{Value: &core.Primitive_Integer{Integer: 42}}}}}},
},
}
// Create a reader for the in-memory data
outputReader := ioutils.NewInMemoryOutputReader(literals, nil, nil)
// Commit the outputs via the writer
err := tCtx.OutputWriter().Put(ctx, outputReader)
Writing Errors
If a task fails with a user-defined error, you can write an ExecutionError instead of literals.
// From flyteplugins/go/tasks/pluginmachinery/io/iface.go
execError := &io.ExecutionError{
Code: "UserError",
Message: "The task failed due to invalid input data",
Kind: io.ErrorKindUser,
}
outputReader := ioutils.NewInMemoryOutputReader(nil, nil, execError)
err := tCtx.OutputWriter().Put(ctx, outputReader)
Managing Output Paths and Metadata
The OutputWriter implements OutputFilePaths, which provides standardized locations for various task artifacts. These paths are typically URNs in the configured storage backend (e.g., s3://my-bucket/path/to/task/).
| Method | Description | Default Filename |
|---|---|---|
GetOutputPath() | Path for the core.LiteralMap results. | outputs.pb |
GetErrorPath() | Path for the core.ErrorDocument if the task fails. | error.pb |
GetDeckPath() | Path for the Flyte Deck HTML file. | deck.html |
GetOutputPrefixPath() | The base directory for all execution metadata. | N/A |
Handling Checkpoints
Flyte supports task checkpointing to allow long-running tasks to resume after a retry. Checkpoint paths are managed through the CheckpointPaths interface, which is embedded in OutputFilePaths.
// Accessing checkpoint paths from the OutputWriter
outputPaths := tCtx.OutputWriter()
// Path to write the current checkpoint
currentCheckpoint := outputPaths.GetCheckpointPrefix()
// Path to read the checkpoint from the previous attempt (if any)
previousCheckpoint := outputPaths.GetPreviousCheckpointsPrefix()
The framework ensures that GetPreviousCheckpointsPrefix() points to the checkpoint directory of the immediately preceding attempt, while GetCheckpointPrefix() provides a fresh location for the current attempt.
Initializing IO Components
When implementing custom executors or testing plugins, you can use the ioutils package to initialize the IO machinery.
// From executor/pkg/plugin/task_exec_context.go
// 1. Initialize InputReader
inputPaths := ioutils.NewInputFilePaths(ctx, dataStore, inputPathPrefix)
inputReader := ioutils.NewRemoteFileInputReader(ctx, dataStore, inputPaths)
// 2. Initialize OutputWriter with Checkpoints
rawOutputPaths := ioutils.NewRawOutputPaths(ctx, outputPrefix)
outputFilePaths := ioutils.NewCheckpointRemoteFilePaths(
ctx,
dataStore,
outputPrefix,
rawOutputPaths,
prevCheckpointPath,
)
outputWriter := ioutils.NewRemoteFileOutputWriter(ctx, dataStore, outputFilePaths)
Troubleshooting and Gotchas
Missing Input Files
InputReader.Get(ctx) will return an error if the inputs.pb file does not exist in the expected location. If your plugin needs to handle optional inputs or check for existence manually, use the storage package to verify the path returned by GetInputPath().
Exactly-Once Semantics
The RawOutputPaths provided by OutputFilePaths are unique per execution attempt. This is designed to prevent different retries of the same task from overwriting each other's data in the "raw" storage area (where large blobs or dataframes are typically stored).
OutputWriter.Put Behavior
The OutputWriter.Put method in RemoteFileOutputWriter (found in flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go) performs the following logic:
- If the
OutputReaderreports an error viaIsError(), it writes the error document toGetErrorPath(). - If no error is reported, it calls
Read()and writes the resultingLiteralMaptoGetOutputPath(). - It returns an error if both the literals and the execution error are nil.