Storage Abstraction Layer
Flyte provides a unified storage abstraction layer that allows the system to interact with various cloud and local backends (S3, GCS, Azure Blob Storage, Redis, or local disk) using a consistent API. This layer decouples Flyte's core logic from the specifics of underlying storage providers, handling path construction, raw byte access, and protobuf serialization automatically.
Initializing the DataStore
When you need to interact with storage in Flyte, you use the DataStore class. It serves as the primary entry point for both raw byte operations and structured protobuf persistence. To initialize it, you provide a Config object and a promutils.Scope for metrics tracking.
import (
"context"
"github.com/flyteorg/flyte/v2/flytestdlib/promutils"
"github.com/flyteorg/flyte/v2/flytestdlib/storage"
)
func initStore() (*storage.DataStore, error) {
testScope := promutils.NewTestScope()
cfg := &storage.Config{
Type: storage.TypeS3, // or TypeLocal, TypeMemory, etc.
Connection: storage.ConnectionConfig{
Region: "us-east-1",
},
InitContainer: "my-flyte-bucket",
}
return storage.NewDataStore(cfg, testScope)
}
Internally, NewDataStore (defined in flytestdlib/storage/rawstores.go) calls RefreshConfig, which assembles the store by:
- Selecting a
RawStoreimplementation based on theType(e.g.,newStowRawStorefor S3/GCS). - Wrapping it in a
cachedRawStoreif caching is enabled in the config. - Creating a
DefaultProtobufStorethat uses theRawStorefor persistence. - Combining these into a
DataStoreusingNewCompositeDataStore.
Managing Paths with DataReferences
Flyte uses the DataReference type (a string alias) to represent locations in storage. Instead of manually concatenating strings to build paths, you should use the ReferenceConstructor interface provided by the DataStore. This ensures that paths are correctly formatted for the underlying storage scheme (e.g., s3://bucket/key vs /tmp/key).
// Construct a reference like s3://my-flyte-bucket/metadata/project/domain/id
ref, err := store.ConstructReference(ctx, storage.DataReference("s3://my-flyte-bucket"), "metadata", "project", "domain", "id")
The URLPathConstructor (in flytestdlib/storage/url_path.go) implements this logic by using Go's net/url package to resolve references, ensuring that separators and schemes are handled correctly.
Parsing References
If you need to extract the bucket or key from a reference, use the Split method on DataReference. It includes specialized logic for providers like Azure ADLS Gen2, where the container is encoded in the userinfo portion of the URL:
// In flytestdlib/storage/storage.go
func (r DataReference) Split() (scheme, container, key string, err error) {
// ... handles abfs[s]://container@storageaccount.dfs.core.windows.net/path
}
Raw Byte Access
For unstructured data like logs or large blobs, the RawStore interface provides low-level methods. DataStore implements this interface, allowing you to read and write raw bytes directly.
dataToStore := "hello world"
err = store.WriteRaw(ctx, ref, int64(len(dataToStore)), storage.Options{}, strings.NewReader(dataToStore))
reader, err := store.ReadRaw(ctx, ref)
defer reader.Close()
The RawStore interface (defined in flytestdlib/storage/storage.go) includes:
ReadRaw(ctx, reference): Returns anio.ReadCloser.WriteRaw(ctx, reference, size, opts, reader): Persists bytes from a reader.Head(ctx, reference): Retrieves metadata like existence and size without downloading the content.List(ctx, reference, maxItems, cursor): Performs paginated listing of objects under a prefix.
Protobuf Serialization
Flyte frequently stores structured metadata as Protobuf messages. The ProtobufStore interface simplifies this by handling the marshalling and unmarshalling logic.
msg := &myproto.MyMessage{Value: "example"}
err := store.WriteProtobuf(ctx, ref, storage.Options{}, msg)
receivedMsg := &myproto.MyMessage{}
err = store.ReadProtobuf(ctx, ref, receivedMsg)
The DefaultProtobufStore (in flytestdlib/storage/protobuf_store.go) implements this by:
- Writing: Marshalling the
proto.Messageinto a byte array and callingWriteRawon the underlyingRawStore. - Reading: Calling
ReadRawto get the bytes and then unmarshalling them into the providedproto.Messagepointer.
Advanced Storage Features
Caching
To reduce latency and cloud provider costs, Flyte can cache storage metadata and small objects in memory. This is implemented by cachedRawStore, which wraps a RawStore. It uses freecache internally.
- Constraint: If an object's size is larger than 1/1024 of the total cache size, it will not be cached to prevent a single large object from evicting too many small ones.
Scheme Routing (Redis Integration)
Flyte supports a hybrid storage model where metadata can be stored in Redis for fast access while large blobs remain in S3. If Redis.Addr is provided in the Config, DataStore uses a schemeRoutingStore.
- References starting with
redis://are routed to the Redis backend. - All other references (e.g.,
s3://) are routed to the primary blob store.
Download Limits
To prevent memory exhaustion, the stowStore implementation enforces a download size limit configured via Storage.limits.maxDownloadMBs (defaulting to 2MB). If a ReadRaw call attempts to download a file exceeding this limit, it will return an error.