Data Movement with Copilot
Flyte Copilot automates the movement of data between remote storage (like S3 or GCS) and the local filesystem of a task container. This allows task code to interact with local files instead of implementing storage-specific logic.
Downloading Inputs
To download task inputs before execution, use the Downloader class. It fetches a remote LiteralMap and persists each variable to a local directory.
import (
"context"
"github.com/flyteorg/flyte/v2/flytecopilot/data"
"github.com/flyteorg/flyte/v2/flytestdlib/storage"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core"
)
// Initialize the downloader
dl := data.NewDownloader(
ctx,
dataStore,
core.DataLoadingConfig_JSON,
core.IOStrategy_DOWNLOAD_EAGER,
)
// Download inputs from remote storage to a local directory
err := dl.DownloadInputs(
ctx,
storage.DataReference("s3://my-bucket/inputs.pb"),
"/var/flyte/inputs",
)
The DownloadInputs method performs the following:
- Reads the remote
LiteralMapfrom the providedinputRef. - Creates the local
outputDir. - Concurrently downloads each variable using
RecursiveDownload. - Generates an
inputs.pbfile in the local directory. - Optionally generates
inputs.jsonorinputs.yamlbased on the configuredformat.
Supported Data Types
The Downloader handles various Flyte types:
- Primitives: Written as strings to files named after the variable.
- Blobs: Single files or multipart directories.
- Schemas: Handled as multipart blobs.
- Collections/Maps: Downloaded recursively into subdirectories.
Uploading Outputs
After a task completes, use the Uploader to push local files back to remote storage. It uses a VariableMap to identify which local files correspond to expected output variables.
import (
"github.com/flyteorg/flyte/v2/flytecopilot/data"
"github.com/flyteorg/flyte/v2/flytestdlib/storage"
)
// Initialize the uploader
ul := data.NewUploader(
ctx,
dataStore,
core.DataLoadingConfig_JSON,
core.IOStrategy_UPLOAD_ON_EXIT,
"_ERROR",
)
// Upload local files to remote storage
err := ul.RecursiveUpload(
ctx,
outputInterface, // *core.VariableMap defining expected outputs
"/var/flyte/outputs",
storage.DataReference("s3://my-bucket/outputs.pb"),
storage.DataReference("s3://my-bucket/raw_data/"),
)
The RecursiveUpload method:
- Checks for an error file (e.g.,
_ERROR) in the local directory. If found, it stops and reports a user error. - Iterates through the
VariableMap. - For
Simpletypes, it reads the local file (max 1024 bytes) and creates a Flyte literal. - For
Blobtypes, it uploads the local file or directory to thedataRawPath. - Writes the final
outputs.pbmetadata file to themetaOutputPath.
Handling Large Data and Concurrency
Flyte Copilot uses FutureMap (a map of flytestdlib/futures) to perform I/O operations concurrently. This is particularly important for multipart blobs (directories).
When Downloader encounters a multipart blob, it:
- Lists all parts of the blob using the
storage.DataStore. - Spawns goroutines to download each part in parallel.
- Uses a
sync.WaitGroupandsync.Mutexto coordinate the downloads and directory creation.
Similarly, the Uploader uses filepath.Walk to identify all files in a local directory and uploads them concurrently using futures.NewAsyncFuture.
Reporting Task Failures
The Uploader is designed to detect task failures by looking for a specific error file before attempting to upload data.
// Inside flytecopilot/data/upload.go
errFile := path.Join(fromPath, u.errorFileName)
if info, err := os.Stat(errFile); err == nil {
b, _ := os.ReadFile(errFile)
return errors.Errorf("User Error: %s", string(b))
}
If your task fails, write the error message to the file specified by errorFileName (default is _ERROR). The Copilot will read this file and return it as a "User Error," preventing the upload of potentially partial or corrupt output data.
Important Constraints
- Primitive Size: When uploading
Simpletypes, the local file must not exceed 1024 bytes (defined bymaxPrimitiveSizeinflytecopilot/data/upload.go). Larger data must be handled as aBlob. - Directory Structure: For multipart blobs, the
Downloaderpreserves the relative path structure of the remote objects when recreating them locally. - Metadata Formats: While
inputs.pbis always created, you can configure theDownloaderto also produceinputs.jsonorinputs.yamlfor easier consumption by scripts.