Skip to main content

Asynchronous Catalog Operations and Futures

When Flyte plugins perform catalog lookups or uploads, these network operations can introduce significant latency. Blocking the main execution thread while waiting for a catalog response would limit the throughput of the entire system. To solve this, Flyte provides the AsyncClient and Future interfaces, which offload catalog interactions to background workqueues and allow plugins to check for completion in a non-blocking manner.

The Async Client Interface

The AsyncClient interface, defined in flyteplugins/go/tasks/pluginmachinery/catalog/async_client.go, provides two primary methods for interacting with the Flyte Catalog:

type AsyncClient interface {
// Returns if an entry exists for the given task and input.
Download(ctx context.Context, requests ...DownloadRequest) (outputFuture DownloadFuture, err error)

// Adds a new entry to catalog for the given task execution context and the generated output
Upload(ctx context.Context, requests ...UploadRequest) (putFuture UploadFuture, err error)
}

In a Flyte plugin, you typically access the AsyncClient through the TaskExecutionContext:

func (p myPlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) {
catalogClient := tCtx.Catalog()
// ... use catalogClient.Download or catalogClient.Upload
}

Performing Asynchronous Downloads

To check if a task's outputs are already cached, you construct a DownloadRequest and call Download. This method returns immediately with a DownloadFuture.

Creating a Download Request

A DownloadRequest requires a Key (to identify the catalog entry) and a Target (an io.OutputWriter where the data should be written if found).

request := catalog.DownloadRequest{
Key: catalog.Key{
Identifier: taskID,
CacheVersion: "1.0",
InputReader: tCtx.InputReader(),
},
Target: tCtx.OutputWriter(),
}

future, err := catalogClient.Download(ctx, request)

Handling the Download Future

Because the download happens in the background, you must check the status of the returned DownloadFuture. If the status is ResponseStatusNotReady, the plugin should return a waiting phase so the Flyte engine can re-invoke it later.

switch future.GetResponseStatus() {
case catalog.ResponseStatusNotReady:
// The background worker is still processing the request.
return core.DoTransition(core.PhaseInfoWaitingForCache(version, info)), nil

case catalog.ResponseStatusReady:
// The operation is complete. Check for errors or retrieve the response.
if err := future.GetResponseError(); err != nil {
return core.UnknownTransition, err
}

response, err := future.GetResponse()
if err != nil {
return core.UnknownTransition, err
}

if response.GetCachedCount() > 0 {
// Cache hit! The data has been written to the Target OutputWriter.
return core.DoTransition(core.PhaseInfoSuccess(info)), nil
}
}

Performing Asynchronous Uploads

Uploading results to the catalog follows a similar pattern using UploadRequest and UploadFuture.

uploadReq := catalog.UploadRequest{
Key: key,
ArtifactData: tCtx.OutputWriter(),
ArtifactMetadata: catalog.Metadata{
TaskExecutionIdentifier: tCtx.TaskExecutionMetadata().GetTaskExecutionID(),
},
}

future, err := catalogClient.Upload(ctx, uploadReq)

The UploadFuture is a marker interface for Future. You check its status using GetResponseStatus() just like a download.

Internal Mechanism and Deduplication

The standard implementation, AsyncClientImpl (found in flyteplugins/go/tasks/pluginmachinery/catalog/async_client_impl.go), manages two separate workqueue.IndexedWorkQueue instances: one for readers (downloads) and one for writers (uploads).

Deduplication via Consistent Hashing

To prevent redundant network calls for the same data, AsyncClientImpl generates a unique workItemID for every request.

  • Downloads: The ID is generated by hashing the output prefix path using consistentHash.
  • Uploads: The ID is generated by hashing the input literals using hashInputs.

If multiple concurrent tasks request the same catalog entry, the IndexedWorkQueue ensures they are deduplicated based on these IDs.

Background Workers

When Download or Upload is called, the client:

  1. Formats the workItemID.
  2. Queues a new ReaderWorkItem or WriterWorkItem in the respective queue.
  3. Immediately calls Get(workItemID) on the queue to see if a worker has already finished it.
  4. Wraps the result in a future struct (defined in flyteplugins/go/tasks/pluginmachinery/catalog/response.go).

Configuration

The behavior of the background workqueues is controlled via the Flyte configuration under the catalogCache section. These settings allow you to tune the number of concurrent workers and the size of the deduplication cache.

ParameterDescriptionDefault
reader.workersNumber of concurrent threads performing catalog lookups.10
reader.maxRetriesHow many times to retry a failed lookup.3
reader.indexCacheMaxItemsMax items in the deduplication index. Should be large enough for your biggest array tasks.10000
writer.workersNumber of concurrent threads performing catalog uploads.10
writer.maxRetriesHow many times to retry a failed upload.3
writer.indexCacheMaxItemsMax items in the writer deduplication index.10000

If indexCacheMaxItems is too small, items may be evicted from the index before they are processed, leading to redundant work or "item not found" errors during status lookups.