Data Proxy Service Architecture
The Data Proxy service in Flyte provides a centralized API for managing data uploads and downloads across the cluster. It abstracts the underlying storage backend (such as S3, GCS, or Azure Blob Storage) by providing signed URLs and managing the persistence of large input sets, action results, and logs.
Core Service Components
The Data Proxy is implemented through two primary services defined in dataproxy/service/:
Service: The main gRPC/Connect handler that implements theDataProxyServiceAPI. It coordinates with internal Flyte services (Task, Run, Project) and thestorage.DataStoreto manage data lifecycle.ClusterService: A discovery service that implements theSelectClusterAPI, allowing clients to identify the correct cluster endpoint for data operations.
Managing Data Uploads
When a client needs to upload data to Flyte storage, it requests a signed URL via the CreateUploadLocation method. This ensures that clients do not need direct credentials for the underlying storage bucket.
Requesting an Upload Location
You can request an upload location by specifying the project, domain, and file details. Providing a ContentMd5 hash is recommended to ensure data integrity and prevent accidental overwrites.
req := &dataproxy.CreateUploadLocationRequest{
Project: "my-project",
Domain: "development",
Filename: "data.parquet",
FilenameRoot: "user-uploads",
ContentMd5: md5Hash, // []byte
ExpiresIn: durationpb.New(30 * time.Minute),
}
resp, err := service.CreateUploadLocation(ctx, connect.NewRequest(req))
Internal Validation and Path Construction
Internally, the Service performs several checks before generating a signed URL:
- Project Validation: It calls the
ProjectServiceClientto verify that the specified project exists. - Limit Enforcement: It validates that the requested
ExpiresInduration does not exceed the configuredMaxExpiresIn(default 1 hour). - Path Construction: The
constructStoragePathmethod builds a structured path in the format:storage_prefix/project/domain/filename_root/filename. Iffilename_rootis not provided, it uses a URL-safe base32 encoding of theContentMd5hash as the prefix. - Overwrite Protection: The
checkFileExistsmethod performs a best-effort check. If a file already exists at the target location, the service will only allow the upload if the providedContentMd5matches the existing file's hash. If no hash is provided and the file exists, the request is rejected withCodeAlreadyExists.
Offloading Large Inputs
Flyte uses the UploadInputs method to handle large input sets that exceed gRPC message size limits. This method persists inputs to storage and returns a reference that can be used when creating a run.
Deterministic Hashing for Caching
The service ensures that inputs are stored deterministically to support Flyte's caching mechanisms.
// Inside dataproxy/service/dataproxy_service.go
func (s *Service) UploadInputs(ctx context.Context, req *connect.Request[dataproxy.UploadInputsRequest]) (*connect.Response[dataproxy.UploadInputsResponse], error) {
// ...
// Resolve task template to identify cache-ignored variables
taskTemplate, _ := s.resolveTaskTemplate(ctx, req.Msg)
// Filter out variables that should not affect the cache key
filteredInputs := filterInputs(req.Msg.GetInputs(), taskTemplate.GetMetadata().GetCacheIgnoreInputVars())
// Deterministically hash the filtered inputs
inputsHash, _ := hashInputsProto(filteredInputs)
// Store the full (unfiltered) inputs at a path derived from the hash
// Path: storagePrefix/org/project/domain/offloaded-inputs/<hash>/inputs.pb
// ...
}
The hashInputsProto function uses proto.MarshalOptions{Deterministic: true} and an FNV-64a hash to generate a consistent identifier for the input set.
Retrieving Action Data and Logs
The Data Proxy also serves as the gateway for retrieving results and logs from completed or running actions.
GetActionData: This method retrieves the input and output literals for a specific action. It first calls theRunServiceClientto get the URIs for the data and then reads the Protobuf messages directly from theDataStore.CreateDownloadLink: Generates signed URLs for downloading artifacts. Currently, this is primarily used forARTIFACT_TYPE_REPORT. It resolves the artifact's native URL by querying the action's attempt details via theRunServiceClient.TailLogs: Provides a streaming API for action logs. It retrieves the log context (e.g., Kubernetes pod information or external log provider metadata) from theRunServiceClientand delegates the streaming to alogs.LogStreamer.
Cluster Discovery
The ClusterService provides a simple mechanism for clients to discover the cluster endpoint. Its SelectCluster implementation currently echoes back the Host header from the request, acting as a confirmation of the endpoint the client is already communicating with.
func (s *ClusterService) SelectCluster(ctx context.Context, req *connect.Request[dataproxy.SelectClusterRequest]) (*connect.Response[dataproxy.SelectClusterResponse], error) {
// Extracts the host from the incoming request headers
host := req.Header().Get("Host")
return connect.NewResponse(&dataproxy.SelectClusterResponse{
ClusterEndpoint: host,
}), nil
}
Configuration
The service behavior is controlled via the DataProxyConfig (found in dataproxy/config/config.go). You can configure these settings under the dataproxy section:
| Parameter | Default | Description |
|---|---|---|
upload.maxSize | 100Mi | Maximum allowed size for data uploads. |
upload.maxExpiresIn | 1h | Maximum duration for upload signed URLs. |
upload.storagePrefix | uploads | The root directory in the storage bucket for proxy uploads. |
download.maxExpiresIn | 1h | Maximum duration for download signed URLs. |
These configurations are enforced during request validation in validateUploadRequest and validateDownloadRequest.