Data Proxy & Storage
Flyte provides a unified storage abstraction layer that allows the system to interact with various object stores (S3, GCS, Azure Blob Storage, Local, etc.) through a consistent interface. This layer, primarily implemented in the flytestdlib/storage package, handles byte-level access, object serialization, and path management.
The DataStore Interface
The DataStore class in flytestdlib/storage/storage.go is the primary entry point for all data operations. It is designed as a composite of three specialized interfaces:
- RawStore: Provides low-level byte operations (
ReadRaw,WriteRaw,CopyRaw,Head,List,Delete). - ProtobufStore: Handles serialization and deserialization of Protobuf messages directly to and from storage.
- ReferenceConstructor: Manages the construction of storage paths (URIs) to ensure consistency across different backends.
Initializing a DataStore
You initialize a DataStore by providing a Config object and a metrics scope. The NewDataStore function automatically selects the appropriate backend implementation based on the Type field in the configuration.
func ExampleNewDataStore() {
testScope := promutils.NewTestScope()
ctx := context.Background()
// Initialize with an in-memory backend for testing
store, err := NewDataStore(&Config{
Type: TypeMemory,
}, testScope.NewSubScope("exp_new"))
if err != nil {
fmt.Printf("Failed to create data store. Error: %v", err)
}
// Construct a reference: mem://root/subkey/subkey2
ref, err := store.ConstructReference(ctx, DataReference("root"), "subkey", "subkey2")
// Write raw bytes
dataToStore := "hello world"
err = store.WriteRaw(ctx, ref, int64(len(dataToStore)), Options{}, strings.NewReader(dataToStore))
}
Storage Backends and Configuration
Flyte supports multiple storage backends via the stow library and custom implementations. The backend is configured via the Storage section in the Flyte configuration.
| Type | Description |
|---|---|
s3 | Amazon S3 or S3-compatible storage (Minio). |
local | Local file system storage. |
mem | In-memory storage, typically used for testing. |
redis | Redis-backed storage where objects are stored as string values. |
stow | Generic stow-supported backend (e.g., Azure Blob Storage). |
Multi-Container Support
By default, Flyte restricts operations to a single "initial container" (bucket). If you need to access multiple buckets within the same DataStore instance, you must enable MultiContainerEnabled in the Config:
// flytestdlib/storage/config.go
type Config struct {
Type Type `json:"type"`
InitContainer string `json:"container"`
MultiContainerEnabled bool `json:"enable-multicontainer"`
// ...
}
When MultiContainerEnabled is true, the DataStore will parse the container name directly from the DataReference URI (e.g., s3://my-bucket/key) instead of defaulting to the InitContainer.
Structured Data with ProtobufStore
While RawStore handles io.Reader and io.ReadCloser, most Flyte components interact with structured data. The ProtobufStore interface simplifies this by handling the boilerplate of marshaling and unmarshaling.
The DefaultProtobufStore (found in flytestdlib/storage/protobuf_store.go) wraps a RawStore and uses github.com/golang/protobuf/proto for serialization.
// Example of reading a Protobuf message from storage
var inputs task.Inputs
err := store.ReadProtobuf(ctx, storage.DataReference("s3://bucket/inputs.pb"), &inputs)
Data Caching
To improve performance and reduce latency for frequently accessed metadata or small objects, Flyte includes an optional in-memory caching layer using freecache. This is implemented as a decorator pattern in flytestdlib/storage/cached_rawstore.go.
The cachedRawStore intercepts ReadRaw and WriteRaw calls. If a requested DataReference is in the cache, it returns the bytes immediately.
Note on Cache Limits: As defined in flytestdlib/storage/config.go, if an object's size is larger than 1/1024 of the total cache size (MaxSizeMegabytes), freecache will not write the entry to the cache to prevent a single large object from evicting too many smaller entries.
Data Proxy and Literal Translation
The dataproxy package provides a service that acts as a bridge between Flyte's internal Protobuf representations (Literals) and the JSON format used by the UI and CLI (Remote Structured JSON Format - RSJF).
Handling Offloaded Data
When Flyte Literals are too large to be passed directly through the control plane (e.g., large input sets), they are "offloaded" to the object store. The TranslatorService in dataproxy/translator.go uses the DataStore to retrieve these offloaded literals before converting them to JSON.
// dataproxy/translator.go
func (s *TranslatorService) readOffloadedLiterals(
ctx context.Context,
req *workflow.LiteralsToLaunchFormJsonRequest,
) ([]*task.NamedLiteral, error) {
uri := req.GetLiteralsUri()
var inputsOrOutputs task.Inputs
// Uses the internal DataStore to fetch the offloaded .pb file
if err := s.dataStore.ReadProtobuf(ctx, storage.DataReference(uri), &inputsOrOutputs); err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to read literals: %w", err))
}
return inputsOrOutputs.GetLiterals(), nil
}
This architecture ensures that the Flyte control plane remains performant by offloading heavy data lifting to the DataProxy and the underlying storage layer.