Result Caching Concepts
Flyte uses a memoization system to optimize workflow execution by reusing results from previously completed tasks. This system is centered around the Catalog, which acts as a persistent store for task outputs indexed by a unique key derived from the task's identity and its inputs.
The Cache Key
The foundation of Flyte's memoization is the catalog.Key. A cache hit only occurs if the generated key matches an existing entry in the Catalog.
The catalog.Key (defined in flyteplugins/go/tasks/pluginmachinery/catalog/client.go) consists of:
- Identifier: The task's unique identifier (Project, Domain, Name).
- Cache Version: A user-defined string that allows manual cache invalidation.
- Input Values: A hash of the task's input literals.
- Ignore List: A list of input variables to exclude from the hash calculation.
Key Generation Logic
Flyte generates the cache key by hashing the task identifier and the input map. The HashIdentifierExceptVersion function in flyteplugins/go/tasks/pluginmachinery/catalog/hashing.go ensures that the task's identity is captured without being tied to a specific internal version unless explicitly desired via the CacheVersion field.
The inputs are hashed using HashLiteralMap:
func HashLiteralMap(ctx context.Context, literalMap *core.LiteralMap, cacheIgnoreInputVars []string) (string, error) {
// ... logic to filter out ignored variables ...
hashifiedInputs := &core.LiteralMap{
Literals: hashifiedLiteralMap,
}
inputsHash, err := pbhash.ComputeHash(ctx, hashifiedInputs)
if err != nil {
return "", err
}
return base64.RawURLEncoding.EncodeToString(inputsHash), nil
}
This ensures that if you run the same task with the same inputs (excluding those in cacheIgnoreInputVars), Flyte produces the same key.
Cache Entries and Status
When you query the Catalog, it returns a catalog.Entry, which encapsulates both the data and the result of the lookup.
Entry
The Entry struct (in flyteplugins/go/tasks/pluginmachinery/catalog/client.go) provides access to the cached outputs:
type Entry struct {
outputs io.OutputReader
status Status
}
If a cache hit occurs, outputs contains an io.OutputReader that the Flyte engine uses to populate the task's outputs without actually running the task code.
Status
The Status struct tracks the outcome of Catalog operations (Get, Put, or Update):
type Status struct {
cacheStatus core.CatalogCacheStatus
metadata *core.CatalogMetadata
}
Common cacheStatus values include:
CACHE_HIT: The result was found and retrieved.CACHE_MISS: No result exists for the given key.CACHE_POPULATED: The result was successfully stored after a task execution.CACHE_PUT_FAILURE: An error occurred while trying to write to the Catalog.
The Catalog Client
The catalog.Client interface defines how the Flyte engine interacts with the memoization service.
Checking the Cache (Get)
Before executing a task, the Flyte propeller (via the TaskActionReconciler in executor/pkg/controller/taskaction_cache.go) checks for a cached result:
entry, err := r.Catalog.Get(ctx, cacheCfg.key)
if err == nil {
// Cache Hit: Write outputs and transition to Success
if err := tCtx.OutputWriter().Put(ctx, entry.GetOutputs()); err != nil {
return pluginsCore.UnknownTransition, false, fmt.Errorf("persisting cached outputs: %w", err)
}
info := cacheTaskInfo(corepb.CatalogCacheStatus_CACHE_HIT, "cache hit")
return pluginsCore.DoTransition(pluginsCore.PhaseInfoSuccess(info)), true, nil
}
Populating the Cache (Put)
After a task completes successfully, the engine stores the results using Put:
func (r *TaskActionReconciler) writeTaskOutputsToCache(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, key catalog.Key) error {
outputPaths := ioutils.NewReadOnlyOutputFilePaths(ctx, r.DataStore, tCtx.OutputWriter().GetOutputPrefixPath())
outputReader := ioutils.NewRemoteFileOutputReader(ctx, r.DataStore, outputPaths, 0)
_, err := r.Catalog.Put(ctx, key, outputReader, cacheMetadataForUpload(tCtx, key.Identifier))
return err
}
Serializable Caching
For tasks where redundant execution must be strictly avoided (e.g., expensive API calls or non-idempotent operations), Flyte supports Serializable Caching. This mechanism uses reservations to ensure that only one execution "owns" the right to populate the cache for a specific key.
Reservation Flow
When serializable caching is enabled, the engine calls GetOrExtendReservation if a cache miss occurs:
- Acquire Reservation: The first execution to reach the Catalog for a specific key receives a reservation.
- Wait for Owner: Subsequent executions for the same key see that a reservation is held by another owner and transition to a
WaitingForCachestate. - Heartbeat: The owner must periodically extend the reservation.
- Release/Populate: Once the owner finishes, it calls
Put(which releases the reservation) or explicitly callsReleaseReservation.
In executor/pkg/controller/taskaction_cache.go, this is implemented as:
if cacheCfg.serializable {
reservation, err := r.Catalog.GetOrExtendReservation(ctx, cacheCfg.key, cacheCfg.ownerID, cacheReservationHeartbeatInterval)
if err != nil {
return pluginsCore.UnknownTransition, false, fmt.Errorf("acquiring cache reservation: %w", err)
}
// If we are not the owner, wait
if reservation.GetOwnerId() != cacheCfg.ownerID {
info := cacheTaskInfo(corepb.CatalogCacheStatus_CACHE_MISS, "waiting for serialized cache owner")
phaseInfo := pluginsCore.PhaseInfoWaitingForCache(taskAction.Status.PluginPhaseVersion, info)
return pluginsCore.DoTransition(phaseInfo), true, nil
}
}
This system prevents "thundering herd" problems where multiple identical task instances compute the same result simultaneously.