Catalog and Memoization
Flyte uses the Catalog to implement task memoization, allowing it to skip execution if a task with the same inputs and version has already been successfully run. While the underlying Catalog service provides a synchronous gRPC interface, Flyte plugins primarily interact with it through an asynchronous client to avoid blocking the main execution loop.
Defining the Cache Key
The Key struct in flyteplugins/go/tasks/pluginmachinery/catalog/client.go uniquely identifies a cached artifact. It combines the task's identity with its input values and versioning information.
When you want to check the cache or store a result, you must first construct a Key. In practice, this is often done by extracting information from the TaskExecutionContext:
import (
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/catalog"
"google.golang.org/protobuf/proto"
)
// Example from executor/pkg/controller/taskaction_cache.go
key := catalog.Key{
Identifier: proto.Clone(taskTemplate.GetId()).(*core.Identifier),
CacheVersion: taskMetadata.GetCacheVersion(),
CacheIgnoreInputVars: taskMetadata.GetCacheIgnoreInputVars(),
TypedInterface: taskTemplate.GetInterface(),
InputReader: tCtx.InputReader(),
}
The InputReader is critical because the Catalog client uses it to fetch and hash the actual input values. If you specify CacheIgnoreInputVars, those specific variables are excluded from the hash calculation, allowing for more flexible memoization.
Asynchronous Catalog Interaction
Plugins access the Catalog via the AsyncClient interface, which is available through tCtx.Catalog(). This client uses internal workqueues to process requests in the background.
Checking for Cached Results (Download)
To check if a result exists, use the Download method. It accepts one or more DownloadRequest objects, making it efficient for batch operations like array tasks.
request := catalog.DownloadRequest{
Key: key,
Target: tCtx.OutputWriter(), // Where to write the outputs if found
}
future, err := tCtx.Catalog().Download(ctx, request)
if err != nil {
// Handle system error (e.g., failed to queue request)
}
// Check if the operation is complete
if future.GetResponseStatus() == catalog.ResponseStatusReady {
resp, err := future.GetResponse()
if err != nil {
// Handle execution error
}
if resp.GetCachedCount() > 0 {
// Cache hit! The outputs have been written to the Target (OutputWriter)
}
}
The DownloadFuture follows a non-blocking pattern. If GetResponseStatus() returns ResponseStatusNotReady, the plugin should typically exit and wait for the next execution cycle. Flyte's engine will re-invoke the plugin, and the AsyncClient will return the same future, which may then be ready.
Storing Results (Upload)
Once a task completes successfully, you store its results in the Catalog using the Upload method.
uploadRequest := catalog.UploadRequest{
Key: key,
ArtifactData: tCtx.OutputWriter(),
ArtifactMetadata: catalog.Metadata{
TaskExecutionIdentifier: tCtx.TaskExecutionMetadata().GetTaskExecutionID(),
},
}
putFuture, err := tCtx.Catalog().Upload(ctx, uploadRequest)
Similar to downloading, Upload returns an UploadFuture. While you don't always need to wait for the upload to finish before completing the task, checking the future ensures that the memoization data was successfully persisted.
Internal Mechanism: Workqueues and Processors
The AsyncClientImpl (found in flyteplugins/go/tasks/pluginmachinery/catalog/async_client_impl.go) manages concurrency and retries using two distinct IndexedWorkQueue instances:
- Reader Queue: Handles
Downloadrequests by calling the synchronousClient.Get. - Writer Queue: Handles
Uploadrequests by callingClient.PutorClient.Update.
When you call Download or Upload, the client:
- Generates a unique
workItemIDbased on theKeyand input hashes. - Queues the request in the respective workqueue.
- Immediately checks the status of that item in the queue.
If the item was already processed (e.g., in a previous execution cycle), the status will be WorkStatusSucceeded, and the future will be marked as ResponseStatusReady.
The Synchronous Client
The AsyncClient is a wrapper around the Client interface (flyteplugins/go/tasks/pluginmachinery/catalog/client.go). This lower-level interface performs the actual gRPC calls to the Flyte Catalog service:
Get(ctx, key): Retrieves an entry.Put(ctx, key, reader, metadata): Stores a new entry.GetOrExtendReservation(...): Used for "serializable" caching to prevent multiple workers from executing the same task simultaneously when a cache miss occurs.
Configuration and Performance
The performance of the Catalog integration is governed by the workqueue settings. These are typically configured in the FlytePropeller configuration under catalogCache:
- Workers:
catalogCache.reader.workersandcatalogCache.writer.workers(default: 10) control how many concurrent gRPC calls are made to the Catalog service. - Max Retries:
catalogCache.reader.maxRetries(default: 3) defines how many times a failed Catalog request is retled before the future reports an error. - Index Cache Size:
catalogCache.reader.indexCacheMaxItems(default: 10000) determines how many work item statuses are kept in memory. This should be large enough to cover the active set of tasks in the cluster.
[!IMPORTANT] The
AsyncClientImplmust be started via itsStart(ctx)method (usually handled by the Flyte initialization logic). If not started, requests will be queued but never processed, and futures will remain inResponseStatusNotReadyindefinitely.