Implementing Custom Storage Backends
To persist cache data in a custom database or storage system within Flyte, you must implement the Repository interface and its two sub-interfaces: CachedOutputRepo and ReservationRepo. These interfaces define how Flyte stores task/workflow outputs and manages distributed locks (reservations) to coordinate cache population.
Implementing the Repository Interface
The Repository interface acts as a container for the specific storage backends. You must provide implementations for both the output storage and the reservation system.
import (
"github.com/flyteorg/flyte/v2/cache_service/repository/interfaces"
)
type MyCustomRepository struct {
outputRepo interfaces.CachedOutputRepo
reservationRepo interfaces.ReservationRepo
}
func (r *MyCustomRepository) CachedOutputRepo() interfaces.CachedOutputRepo {
return r.outputRepo
}
func (r *MyCustomRepository) ReservationRepo() interfaces.ReservationRepo {
return r.reservationRepo
}
Implementing CachedOutputRepo
The CachedOutputRepo handles the persistence of models.CachedOutput objects. When implementing Put, you must ensure that it supports overwriting existing entries (upsert behavior).
import (
"context"
"github.com/flyteorg/flyte/v2/cache_service/repository/models"
)
type MyOutputRepo struct {
// your storage client (e.g., DynamoDB, Redis, etc.)
}
func (r *MyOutputRepo) Put(ctx context.Context, output *models.CachedOutput) error {
// Implementation must handle conflicts.
// In SQL, this is typically an 'ON CONFLICT (key) DO UPDATE'
return r.client.Upsert(ctx, output.Key, output)
}
func (r *MyOutputRepo) Get(ctx context.Context, key string) (*models.CachedOutput, error) {
return r.client.Get(ctx, key)
}
func (r *MyOutputRepo) Delete(ctx context.Context, key string) error {
return r.client.Delete(ctx, key)
}
Implementing ReservationRepo with Atomic Updates
The ReservationRepo is used for distributed coordination. The most critical method is UpdateIfExpiredOrOwned, which must be implemented atomically to prevent race conditions where multiple owners claim the same reservation.
The logic must ensure that a reservation is only updated if:
- The current
expires_attime is in the past (expired). - OR the
owner_idmatches the requester (extending an existing lock).
import (
"context"
"time"
"github.com/flyteorg/flyte/v2/cache_service/repository/errors"
"github.com/flyteorg/flyte/v2/cache_service/repository/models"
)
func (r *MyReservationRepo) UpdateIfExpiredOrOwned(ctx context.Context, res *models.Reservation, now time.Time) error {
// Example logic for an atomic update
success, err := r.client.UpdateIf(ctx, res.Key,
map[string]interface{}{
"owner_id": res.OwnerID,
"expires_at": res.ExpiresAt,
"updated_at": now,
},
// Condition: (expires_at <= now OR owner_id == res.OwnerID)
"expires_at <= ? OR owner_id = ?", now, res.OwnerID,
)
if err != nil {
return err
}
// If no rows were affected because the condition failed,
// you MUST return ErrReservationNotClaimable.
if !success {
return errors.ErrReservationNotClaimable
}
return nil
}
Handling Required Error Types
The Flyte Manager relies on specific error types to control its retry and fallback logic. Your implementation must return or wrap these errors from cache_service/repository/errors/errors.go:
| Method | Scenario | Required Error |
|---|---|---|
ReservationRepo.Create | Key already exists | errors.ErrAlreadyExists |
ReservationRepo.UpdateIfExpiredOrOwned | Condition not met | errors.ErrReservationNotClaimable |
CachedOutputRepo.Get | Key not found | sql.ErrNoRows (checked via errors.IsNotFound) |
Wiring the Custom Backend
Once implemented, pass your custom repositories to the manager.New function. This is typically done during the initialization of the CacheService in cache_service/service/service.go.
import (
"github.com/flyteorg/flyte/v2/cache_service/manager"
"github.com/flyteorg/flyte/v2/cache_service/config"
)
func NewCustomCacheService(cfg *config.Config) *CacheService {
myOutputRepo := &MyOutputRepo{}
myResRepo := &MyReservationRepo{}
return &CacheService{
manager: manager.New(
cfg,
myOutputRepo,
myResRepo,
),
}
}
Troubleshooting Gotchas
- Metadata Serialization: The
Metadatafield inmodels.CachedOutputis a[]bytecontaining marshaled protobuf data. Ensure your storage backend preserves this as a binary blob. - Time Precision: When implementing
UpdateIfExpiredOrOwned, ensure thenowtimestamp passed from the manager is compared accurately. Using different time precisions (e.g., milliseconds vs seconds) in your database can cause premature expiration or failed claims. - Unique Constraints:
ReservationRepo.Createmust fail withErrAlreadyExistsif the key is present, even if the existing reservation is expired. TheManagerhandles expiration via theUpdateIfExpiredOrOwnedmethod, not duringCreate.