Skip to main content

Data Movement with Copilot

Flyte Copilot automates the movement of data between remote storage (like S3 or GCS) and the local filesystem of a task container. This allows task code to interact with local files instead of implementing storage-specific logic.

Downloading Inputs

To download task inputs before execution, use the Downloader class. It fetches a remote LiteralMap and persists each variable to a local directory.

import (
"context"
"github.com/flyteorg/flyte/v2/flytecopilot/data"
"github.com/flyteorg/flyte/v2/flytestdlib/storage"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core"
)

// Initialize the downloader
dl := data.NewDownloader(
ctx,
dataStore,
core.DataLoadingConfig_JSON,
core.IOStrategy_DOWNLOAD_EAGER,
)

// Download inputs from remote storage to a local directory
err := dl.DownloadInputs(
ctx,
storage.DataReference("s3://my-bucket/inputs.pb"),
"/var/flyte/inputs",
)

The DownloadInputs method performs the following:

  1. Reads the remote LiteralMap from the provided inputRef.
  2. Creates the local outputDir.
  3. Concurrently downloads each variable using RecursiveDownload.
  4. Generates an inputs.pb file in the local directory.
  5. Optionally generates inputs.json or inputs.yaml based on the configured format.

Supported Data Types

The Downloader handles various Flyte types:

  • Primitives: Written as strings to files named after the variable.
  • Blobs: Single files or multipart directories.
  • Schemas: Handled as multipart blobs.
  • Collections/Maps: Downloaded recursively into subdirectories.

Uploading Outputs

After a task completes, use the Uploader to push local files back to remote storage. It uses a VariableMap to identify which local files correspond to expected output variables.

import (
"github.com/flyteorg/flyte/v2/flytecopilot/data"
"github.com/flyteorg/flyte/v2/flytestdlib/storage"
)

// Initialize the uploader
ul := data.NewUploader(
ctx,
dataStore,
core.DataLoadingConfig_JSON,
core.IOStrategy_UPLOAD_ON_EXIT,
"_ERROR",
)

// Upload local files to remote storage
err := ul.RecursiveUpload(
ctx,
outputInterface, // *core.VariableMap defining expected outputs
"/var/flyte/outputs",
storage.DataReference("s3://my-bucket/outputs.pb"),
storage.DataReference("s3://my-bucket/raw_data/"),
)

The RecursiveUpload method:

  1. Checks for an error file (e.g., _ERROR) in the local directory. If found, it stops and reports a user error.
  2. Iterates through the VariableMap.
  3. For Simple types, it reads the local file (max 1024 bytes) and creates a Flyte literal.
  4. For Blob types, it uploads the local file or directory to the dataRawPath.
  5. Writes the final outputs.pb metadata file to the metaOutputPath.

Handling Large Data and Concurrency

Flyte Copilot uses FutureMap (a map of flytestdlib/futures) to perform I/O operations concurrently. This is particularly important for multipart blobs (directories).

When Downloader encounters a multipart blob, it:

  1. Lists all parts of the blob using the storage.DataStore.
  2. Spawns goroutines to download each part in parallel.
  3. Uses a sync.WaitGroup and sync.Mutex to coordinate the downloads and directory creation.

Similarly, the Uploader uses filepath.Walk to identify all files in a local directory and uploads them concurrently using futures.NewAsyncFuture.

Reporting Task Failures

The Uploader is designed to detect task failures by looking for a specific error file before attempting to upload data.

// Inside flytecopilot/data/upload.go
errFile := path.Join(fromPath, u.errorFileName)
if info, err := os.Stat(errFile); err == nil {
b, _ := os.ReadFile(errFile)
return errors.Errorf("User Error: %s", string(b))
}

If your task fails, write the error message to the file specified by errorFileName (default is _ERROR). The Copilot will read this file and return it as a "User Error," preventing the upload of potentially partial or corrupt output data.

Important Constraints

  • Primitive Size: When uploading Simple types, the local file must not exceed 1024 bytes (defined by maxPrimitiveSize in flytecopilot/data/upload.go). Larger data must be handled as a Blob.
  • Directory Structure: For multipart blobs, the Downloader preserves the relative path structure of the remote objects when recreating them locally.
  • Metadata Formats: While inputs.pb is always created, you can configure the Downloader to also produce inputs.json or inputs.yaml for easier consumption by scripts.