Asynchronous Catalog Operations and Futures
When Flyte plugins perform catalog lookups or uploads, these network operations can introduce significant latency. Blocking the main execution thread while waiting for a catalog response would limit the throughput of the entire system. To solve this, Flyte provides the AsyncClient and Future interfaces, which offload catalog interactions to background workqueues and allow plugins to check for completion in a non-blocking manner.
The Async Client Interface
The AsyncClient interface, defined in flyteplugins/go/tasks/pluginmachinery/catalog/async_client.go, provides two primary methods for interacting with the Flyte Catalog:
type AsyncClient interface {
// Returns if an entry exists for the given task and input.
Download(ctx context.Context, requests ...DownloadRequest) (outputFuture DownloadFuture, err error)
// Adds a new entry to catalog for the given task execution context and the generated output
Upload(ctx context.Context, requests ...UploadRequest) (putFuture UploadFuture, err error)
}
In a Flyte plugin, you typically access the AsyncClient through the TaskExecutionContext:
func (p myPlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) {
catalogClient := tCtx.Catalog()
// ... use catalogClient.Download or catalogClient.Upload
}
Performing Asynchronous Downloads
To check if a task's outputs are already cached, you construct a DownloadRequest and call Download. This method returns immediately with a DownloadFuture.
Creating a Download Request
A DownloadRequest requires a Key (to identify the catalog entry) and a Target (an io.OutputWriter where the data should be written if found).
request := catalog.DownloadRequest{
Key: catalog.Key{
Identifier: taskID,
CacheVersion: "1.0",
InputReader: tCtx.InputReader(),
},
Target: tCtx.OutputWriter(),
}
future, err := catalogClient.Download(ctx, request)
Handling the Download Future
Because the download happens in the background, you must check the status of the returned DownloadFuture. If the status is ResponseStatusNotReady, the plugin should return a waiting phase so the Flyte engine can re-invoke it later.
switch future.GetResponseStatus() {
case catalog.ResponseStatusNotReady:
// The background worker is still processing the request.
return core.DoTransition(core.PhaseInfoWaitingForCache(version, info)), nil
case catalog.ResponseStatusReady:
// The operation is complete. Check for errors or retrieve the response.
if err := future.GetResponseError(); err != nil {
return core.UnknownTransition, err
}
response, err := future.GetResponse()
if err != nil {
return core.UnknownTransition, err
}
if response.GetCachedCount() > 0 {
// Cache hit! The data has been written to the Target OutputWriter.
return core.DoTransition(core.PhaseInfoSuccess(info)), nil
}
}
Performing Asynchronous Uploads
Uploading results to the catalog follows a similar pattern using UploadRequest and UploadFuture.
uploadReq := catalog.UploadRequest{
Key: key,
ArtifactData: tCtx.OutputWriter(),
ArtifactMetadata: catalog.Metadata{
TaskExecutionIdentifier: tCtx.TaskExecutionMetadata().GetTaskExecutionID(),
},
}
future, err := catalogClient.Upload(ctx, uploadReq)
The UploadFuture is a marker interface for Future. You check its status using GetResponseStatus() just like a download.
Internal Mechanism and Deduplication
The standard implementation, AsyncClientImpl (found in flyteplugins/go/tasks/pluginmachinery/catalog/async_client_impl.go), manages two separate workqueue.IndexedWorkQueue instances: one for readers (downloads) and one for writers (uploads).
Deduplication via Consistent Hashing
To prevent redundant network calls for the same data, AsyncClientImpl generates a unique workItemID for every request.
- Downloads: The ID is generated by hashing the output prefix path using
consistentHash. - Uploads: The ID is generated by hashing the input literals using
hashInputs.
If multiple concurrent tasks request the same catalog entry, the IndexedWorkQueue ensures they are deduplicated based on these IDs.
Background Workers
When Download or Upload is called, the client:
- Formats the
workItemID. - Queues a new
ReaderWorkItemorWriterWorkItemin the respective queue. - Immediately calls
Get(workItemID)on the queue to see if a worker has already finished it. - Wraps the result in a
futurestruct (defined inflyteplugins/go/tasks/pluginmachinery/catalog/response.go).
Configuration
The behavior of the background workqueues is controlled via the Flyte configuration under the catalogCache section. These settings allow you to tune the number of concurrent workers and the size of the deduplication cache.
| Parameter | Description | Default |
|---|---|---|
reader.workers | Number of concurrent threads performing catalog lookups. | 10 |
reader.maxRetries | How many times to retry a failed lookup. | 3 |
reader.indexCacheMaxItems | Max items in the deduplication index. Should be large enough for your biggest array tasks. | 10000 |
writer.workers | Number of concurrent threads performing catalog uploads. | 10 |
writer.maxRetries | How many times to retry a failed upload. | 3 |
writer.indexCacheMaxItems | Max items in the writer deduplication index. | 10000 |
If indexCacheMaxItems is too small, items may be evicted from the index before they are processed, leading to redundant work or "item not found" errors during status lookups.