Result Caching & Memoization
Flyte implements result caching and memoization to avoid redundant computations by reusing outputs from previous executions with identical inputs. This system is composed of a standalone Cache Service that manages metadata and a Reservation mechanism to coordinate concurrent executions of the same task.
The Cache Service Architecture
The Cache Service is a standalone component that provides a gRPC/Connect interface for managing cached results. It does not store the actual data blobs; instead, it persists metadata and URIs pointing to the data in object storage (e.g., S3 or GCS).
The core logic resides in the Manager class within cache_service/manager/manager.go. It interacts with two primary repositories:
CachedOutputRepo: Stores the mapping between a cache key and the URI of the task's outputs.ReservationRepo: Manages temporary ownership of a cache key during task execution.
Cache Key Generation
Cache keys are deterministic hashes derived from the task's identity and its inputs. In flyteplugins/go/tasks/pluginmachinery/catalog/cache_service/client.go, the buildCacheKey function constructs this key by combining:
- A hash of the task identifier (Project, Domain, Name).
- A hash of the task's typed interface (Inputs and Outputs).
- A hash of the actual input literals.
- A user-defined
CacheVersion.
// From flyteplugins/go/tasks/pluginmachinery/catalog/cache_service/client.go
func buildCacheKey(ctx context.Context, key catalog.Key) (string, error) {
// ... hashes identifier, signature, and inputs ...
return fmt.Sprintf("%s-%s-%s-%s", identifierHash, signatureHash, inputsHash, key.CacheVersion), nil
}
Serialized Caching and Reservations
When multiple workers attempt to execute the same task with the same inputs simultaneously, Flyte uses a Reservation mechanism to prevent a "thundering herd" problem. This is enabled via the cache_serializable setting in the task metadata.
The Manager.GetOrExtendReservation method in cache_service/manager/manager.go coordinates this:
- If no reservation exists, the caller creates one and becomes the "owner."
- If a reservation exists and is held by the same owner or has expired, the caller extends it.
- If a different owner holds a valid reservation, the caller is denied ownership and must wait.
Reservations are protected by heartbeats. If a worker fails to extend its reservation within the heartbeatGracePeriodMultiplier (defaulting to 3), the reservation expires and becomes available for other workers.
Execution Lifecycle Integration
The TaskActionReconciler in executor/pkg/controller/taskaction_cache.go integrates caching into the task lifecycle through two primary phases: pre-execution evaluation and post-execution finalization.
Pre-Execution: evaluateCacheBeforeExecution
Before starting a task, the executor checks the catalog for a hit. If a hit is found, it downloads the outputs and transitions the task directly to success. If it's a miss and serializable caching is enabled, it attempts to acquire a reservation.
// From executor/pkg/controller/taskaction_cache.go
func (r *TaskActionReconciler) evaluateCacheBeforeExecution(...) {
// 1. Check for cache hit
entry, err := r.Catalog.Get(ctx, cacheCfg.key)
if err == nil {
tCtx.OutputWriter().Put(ctx, entry.GetOutputs())
return pluginsCore.DoTransition(pluginsCore.PhaseInfoSuccess(info)), true, nil
}
// 2. If miss and serializable, try to get reservation
if cacheCfg.serializable {
reservation, err := r.Catalog.GetOrExtendReservation(ctx, cacheCfg.key, cacheCfg.ownerID, ...)
if reservation.GetOwnerId() != cacheCfg.ownerID {
// Wait for the current owner to populate the cache
return pluginsCore.DoTransition(pluginsCore.PhaseInfoWaitingForCache(...)), true, nil
}
}
}
Post-Execution: finalizeCacheAfterExecution
After a task completes successfully, the executor uploads the output URI to the cache service and releases any held reservations.
// From executor/pkg/controller/taskaction_cache.go
func (r *TaskActionReconciler) finalizeCacheAfterExecution(...) {
if phase.IsSuccess() {
// Populate the cache with the new output URI
r.writeTaskOutputsToCache(ctx, tCtx, cacheCfg.key)
if cacheCfg.serializable {
r.releaseCacheReservation(ctx, cacheCfg)
}
}
}
Configuration
The behavior of the cache service and its reservation system is controlled via the cache_service.Config struct. Key parameters include:
| Parameter | Default | Description |
|---|---|---|
HeartbeatGracePeriodMultiplier | 3 | Number of missed heartbeats before a reservation is considered expired. |
MaxReservationHeartbeat | 10s | The maximum interval allowed for heartbeats from workers. |
Server.Port | 8094 | The port on which the Cache Service listens for Connect/gRPC requests. |
These settings ensure that if a worker crashes while holding a reservation, the task is not blocked indefinitely, as the ReservationRepo implementation in cache_service/repository/interfaces/reservation.go allows UpdateIfExpiredOrOwned to reclaim stale reservations.