Skip to main content

Result Caching & Memoization

Flyte implements result caching and memoization to avoid redundant computations by reusing outputs from previous executions with identical inputs. This system is composed of a standalone Cache Service that manages metadata and a Reservation mechanism to coordinate concurrent executions of the same task.

The Cache Service Architecture

The Cache Service is a standalone component that provides a gRPC/Connect interface for managing cached results. It does not store the actual data blobs; instead, it persists metadata and URIs pointing to the data in object storage (e.g., S3 or GCS).

The core logic resides in the Manager class within cache_service/manager/manager.go. It interacts with two primary repositories:

  • CachedOutputRepo: Stores the mapping between a cache key and the URI of the task's outputs.
  • ReservationRepo: Manages temporary ownership of a cache key during task execution.

Cache Key Generation

Cache keys are deterministic hashes derived from the task's identity and its inputs. In flyteplugins/go/tasks/pluginmachinery/catalog/cache_service/client.go, the buildCacheKey function constructs this key by combining:

  1. A hash of the task identifier (Project, Domain, Name).
  2. A hash of the task's typed interface (Inputs and Outputs).
  3. A hash of the actual input literals.
  4. A user-defined CacheVersion.
// From flyteplugins/go/tasks/pluginmachinery/catalog/cache_service/client.go
func buildCacheKey(ctx context.Context, key catalog.Key) (string, error) {
// ... hashes identifier, signature, and inputs ...
return fmt.Sprintf("%s-%s-%s-%s", identifierHash, signatureHash, inputsHash, key.CacheVersion), nil
}

Serialized Caching and Reservations

When multiple workers attempt to execute the same task with the same inputs simultaneously, Flyte uses a Reservation mechanism to prevent a "thundering herd" problem. This is enabled via the cache_serializable setting in the task metadata.

The Manager.GetOrExtendReservation method in cache_service/manager/manager.go coordinates this:

  • If no reservation exists, the caller creates one and becomes the "owner."
  • If a reservation exists and is held by the same owner or has expired, the caller extends it.
  • If a different owner holds a valid reservation, the caller is denied ownership and must wait.

Reservations are protected by heartbeats. If a worker fails to extend its reservation within the heartbeatGracePeriodMultiplier (defaulting to 3), the reservation expires and becomes available for other workers.

Execution Lifecycle Integration

The TaskActionReconciler in executor/pkg/controller/taskaction_cache.go integrates caching into the task lifecycle through two primary phases: pre-execution evaluation and post-execution finalization.

Pre-Execution: evaluateCacheBeforeExecution

Before starting a task, the executor checks the catalog for a hit. If a hit is found, it downloads the outputs and transitions the task directly to success. If it's a miss and serializable caching is enabled, it attempts to acquire a reservation.

// From executor/pkg/controller/taskaction_cache.go
func (r *TaskActionReconciler) evaluateCacheBeforeExecution(...) {
// 1. Check for cache hit
entry, err := r.Catalog.Get(ctx, cacheCfg.key)
if err == nil {
tCtx.OutputWriter().Put(ctx, entry.GetOutputs())
return pluginsCore.DoTransition(pluginsCore.PhaseInfoSuccess(info)), true, nil
}

// 2. If miss and serializable, try to get reservation
if cacheCfg.serializable {
reservation, err := r.Catalog.GetOrExtendReservation(ctx, cacheCfg.key, cacheCfg.ownerID, ...)
if reservation.GetOwnerId() != cacheCfg.ownerID {
// Wait for the current owner to populate the cache
return pluginsCore.DoTransition(pluginsCore.PhaseInfoWaitingForCache(...)), true, nil
}
}
}

Post-Execution: finalizeCacheAfterExecution

After a task completes successfully, the executor uploads the output URI to the cache service and releases any held reservations.

// From executor/pkg/controller/taskaction_cache.go
func (r *TaskActionReconciler) finalizeCacheAfterExecution(...) {
if phase.IsSuccess() {
// Populate the cache with the new output URI
r.writeTaskOutputsToCache(ctx, tCtx, cacheCfg.key)

if cacheCfg.serializable {
r.releaseCacheReservation(ctx, cacheCfg)
}
}
}

Configuration

The behavior of the cache service and its reservation system is controlled via the cache_service.Config struct. Key parameters include:

ParameterDefaultDescription
HeartbeatGracePeriodMultiplier3Number of missed heartbeats before a reservation is considered expired.
MaxReservationHeartbeat10sThe maximum interval allowed for heartbeats from workers.
Server.Port8094The port on which the Cache Service listens for Connect/gRPC requests.

These settings ensure that if a worker crashes while holding a reservation, the task is not blocked indefinitely, as the ReservationRepo implementation in cache_service/repository/interfaces/reservation.go allows UpdateIfExpiredOrOwned to reclaim stale reservations.