Skip to main content

Coordinating Concurrent Tasks with Reservations

When multiple Flyte workers attempt to execute the same task with the same inputs simultaneously, they may all experience a cache miss and start computing the same result. This "thundering herd" effect wastes computational resources. Flyte solves this using a Reservation mechanism, which ensures that only one worker populates the cache for a specific key while others wait for the result.

Enabling Serializable Caching

To use reservations, you must enable serializable caching in your task configuration. When serializable is set to true, the Flyte executor coordinates with the Cache Service to manage task execution ownership.

In the Flyte executor's TaskActionReconciler (found in executor/pkg/controller/taskaction_cache.go), the system evaluates the cache before execution:

// executor/pkg/controller/taskaction_cache.go

if cacheCfg.serializable {
reservation, err := r.Catalog.GetOrExtendReservation(ctx, cacheCfg.key, cacheCfg.ownerID, cacheReservationHeartbeatInterval)
if err != nil {
return pluginsCore.UnknownTransition, false, fmt.Errorf("acquiring cache reservation: %w", err)
}
if reservation.GetOwnerId() == cacheCfg.ownerID {
// We own the reservation, proceed to execute
return pluginsCore.UnknownTransition, false, nil
}

// Someone else owns it, wait
info := cacheTaskInfo(corepb.CatalogCacheStatus_CACHE_MISS, "waiting for serialized cache owner")
phaseInfo := pluginsCore.PhaseInfoWaitingForCache(taskAction.Status.PluginPhaseVersion, info)
return pluginsCore.DoTransition(phaseInfo), true, nil
}

The Reservation Lifecycle

The coordination logic is managed by the Manager class in cache_service/manager/manager.go. It follows a strict protocol to ensure distributed safety.

1. Acquisition and Extension

When a worker calls GetOrExtendReservation, the Manager calculates an expiration time based on a heartbeat interval and a grace period multiplier.

// cache_service/manager/manager.go

reservation := &models.Reservation{
Key: reservationKey, // Prefixed with "reservation:"
OwnerID: request.GetOwnerId(),
HeartbeatSeconds: int64(heartbeat.Seconds()),
ExpiresAt: now.Add(heartbeat * time.Duration(m.heartbeatGracePeriodMultiplier)),
}

The Manager then attempts to persist this reservation via the ReservationRepo. If a reservation already exists, it only allows an update if the caller is the current owner or if the existing reservation has expired.

2. Atomic Ownership Enforcement

To prevent race conditions where two workers might think they acquired the reservation at the same time, the ReservationRepo (specifically the implementation in cache_service/repository/impl/reservation.go) uses an atomic SQL UPDATE statement:

UPDATE cache_service_reservations
SET owner_id = $1, heartbeat_seconds = $2, expires_at = $3, updated_at = $4
WHERE key = $5 AND (expires_at <= $6 OR owner_id = $7)

This query ensures that a reservation is only "claimable" if:

  • The expires_at timestamp is in the past (it has expired).
  • The owner_id matches the requester (it is a heartbeat extension).

3. Heartbeats and Expiration

Reservations are not permanent. The worker must periodically "heartbeat" to extend the reservation. If a worker crashes, the reservation will eventually expire, allowing another worker to take over.

The expiration is controlled by two main configurations in the Manager:

  • MaxReservationHeartbeatInterval: The maximum allowed time between heartbeats (defaulting to 10 seconds).
  • HeartbeatGracePeriodMultiplier: A multiplier applied to the heartbeat interval to determine the actual ExpiresAt time (defaulting to 3). For a 10s heartbeat, the reservation expires after 30s of inactivity.

4. Releasing the Reservation

Once a worker successfully computes the result and populates the cache using Put, it must call ReleaseReservation. This removes the reservation record from the database, signaling to other waiting workers that the result is now available in the cache.

The Manager.ReleaseReservation method ensures that workers can only delete reservations they actually own:

// cache_service/manager/manager.go

func (m *Manager) ReleaseReservation(ctx context.Context, request *cacheservicepb.ReleaseReservationRequest) error {
reservationKey := fmt.Sprintf("%s:%s", reservationPrefix, request.GetKey())
return m.reservations.DeleteByKeyAndOwner(ctx, reservationKey, request.GetOwnerId())
}

Client-Side Tracking

On the plugin side, Flyte uses the Client interface (flyteplugins/go/tasks/pluginmachinery/catalog/client.go) to interact with these services. The client can maintain a ReservationCache to track the status of reservations locally, reducing the need for constant network calls when a task is in a waiting state.

// flyteplugins/go/tasks/pluginmachinery/catalog/client.go

type ReservationCache struct {
Timestamp time.Time
ReservationStatus core.CatalogReservation_Status
}

This local cache allows the executor to quickly determine if it should continue waiting or re-poll the Cache Service for the final output.