Skip to main content

Implementing Custom Storage Backends

To persist cache data in a custom database or storage system within Flyte, you must implement the Repository interface and its two sub-interfaces: CachedOutputRepo and ReservationRepo. These interfaces define how Flyte stores task/workflow outputs and manages distributed locks (reservations) to coordinate cache population.

Implementing the Repository Interface

The Repository interface acts as a container for the specific storage backends. You must provide implementations for both the output storage and the reservation system.

import (
"github.com/flyteorg/flyte/v2/cache_service/repository/interfaces"
)

type MyCustomRepository struct {
outputRepo interfaces.CachedOutputRepo
reservationRepo interfaces.ReservationRepo
}

func (r *MyCustomRepository) CachedOutputRepo() interfaces.CachedOutputRepo {
return r.outputRepo
}

func (r *MyCustomRepository) ReservationRepo() interfaces.ReservationRepo {
return r.reservationRepo
}

Implementing CachedOutputRepo

The CachedOutputRepo handles the persistence of models.CachedOutput objects. When implementing Put, you must ensure that it supports overwriting existing entries (upsert behavior).

import (
"context"
"github.com/flyteorg/flyte/v2/cache_service/repository/models"
)

type MyOutputRepo struct {
// your storage client (e.g., DynamoDB, Redis, etc.)
}

func (r *MyOutputRepo) Put(ctx context.Context, output *models.CachedOutput) error {
// Implementation must handle conflicts.
// In SQL, this is typically an 'ON CONFLICT (key) DO UPDATE'
return r.client.Upsert(ctx, output.Key, output)
}

func (r *MyOutputRepo) Get(ctx context.Context, key string) (*models.CachedOutput, error) {
return r.client.Get(ctx, key)
}

func (r *MyOutputRepo) Delete(ctx context.Context, key string) error {
return r.client.Delete(ctx, key)
}

Implementing ReservationRepo with Atomic Updates

The ReservationRepo is used for distributed coordination. The most critical method is UpdateIfExpiredOrOwned, which must be implemented atomically to prevent race conditions where multiple owners claim the same reservation.

The logic must ensure that a reservation is only updated if:

  1. The current expires_at time is in the past (expired).
  2. OR the owner_id matches the requester (extending an existing lock).
import (
"context"
"time"
"github.com/flyteorg/flyte/v2/cache_service/repository/errors"
"github.com/flyteorg/flyte/v2/cache_service/repository/models"
)

func (r *MyReservationRepo) UpdateIfExpiredOrOwned(ctx context.Context, res *models.Reservation, now time.Time) error {
// Example logic for an atomic update
success, err := r.client.UpdateIf(ctx, res.Key,
map[string]interface{}{
"owner_id": res.OwnerID,
"expires_at": res.ExpiresAt,
"updated_at": now,
},
// Condition: (expires_at <= now OR owner_id == res.OwnerID)
"expires_at <= ? OR owner_id = ?", now, res.OwnerID,
)

if err != nil {
return err
}

// If no rows were affected because the condition failed,
// you MUST return ErrReservationNotClaimable.
if !success {
return errors.ErrReservationNotClaimable
}

return nil
}

Handling Required Error Types

The Flyte Manager relies on specific error types to control its retry and fallback logic. Your implementation must return or wrap these errors from cache_service/repository/errors/errors.go:

MethodScenarioRequired Error
ReservationRepo.CreateKey already existserrors.ErrAlreadyExists
ReservationRepo.UpdateIfExpiredOrOwnedCondition not meterrors.ErrReservationNotClaimable
CachedOutputRepo.GetKey not foundsql.ErrNoRows (checked via errors.IsNotFound)

Wiring the Custom Backend

Once implemented, pass your custom repositories to the manager.New function. This is typically done during the initialization of the CacheService in cache_service/service/service.go.

import (
"github.com/flyteorg/flyte/v2/cache_service/manager"
"github.com/flyteorg/flyte/v2/cache_service/config"
)

func NewCustomCacheService(cfg *config.Config) *CacheService {
myOutputRepo := &MyOutputRepo{}
myResRepo := &MyReservationRepo{}

return &CacheService{
manager: manager.New(
cfg,
myOutputRepo,
myResRepo,
),
}
}

Troubleshooting Gotchas

  • Metadata Serialization: The Metadata field in models.CachedOutput is a []byte containing marshaled protobuf data. Ensure your storage backend preserves this as a binary blob.
  • Time Precision: When implementing UpdateIfExpiredOrOwned, ensure the now timestamp passed from the manager is compared accurately. Using different time precisions (e.g., milliseconds vs seconds) in your database can cause premature expiration or failed claims.
  • Unique Constraints: ReservationRepo.Create must fail with ErrAlreadyExists if the key is present, even if the existing reservation is expired. The Manager handles expiration via the UpdateIfExpiredOrOwned method, not during Create.