Internal Work Queues
When a Flyte plugin needs to download data from a remote catalog or perform background I/O, blocking the main execution loop to wait for results would degrade performance across the entire cluster. The IndexedWorkQueue provides a specialized mechanism to offload these tasks to background workers while maintaining an in-memory index of their results.
Architecture
The IndexedWorkQueue is implemented in flyteplugins/go/tasks/pluginmachinery/workqueue/queue.go as a wrapper around two primary components:
- Kubernetes Workqueue: A
k8s.io/client-go/util/workqueuethat manages the distribution of tasks to concurrent workers and handles retries. - LRU Cache: A
github.com/hashicorp/golang-lrucache that serves as an index forWorkItemInfo. This index allows the system to track the status of a work item (Succeeded, Failed, or NotDone) even after the worker has finished processing it.
This combination ensures that if the same work item is queued multiple times, the system can immediately return the cached result or recognize that the item is already being processed, preventing redundant work.
Defining a Processor
The business logic for a work queue is defined by implementing the Processor interface. The Process method receives a WorkItem and returns a WorkStatus along with an error.
type Processor interface {
Process(ctx context.Context, workItem WorkItem) (WorkStatus, error)
}
A typical implementation, such as the one used for Catalog downloads, performs I/O and returns WorkStatusSucceeded upon completion. If the processor returns an error, the queue will automatically retry the item based on the MaxRetries configuration.
Lifecycle and State Management
The lifecycle of a work item is governed by the WorkStatus enum:
- WorkStatusNotDone: The item is not finished. If returned by a processor, the item is re-added to the work queue for another processing cycle.
- WorkStatusSucceeded: A terminal state indicating successful completion.
- WorkStatusFailed: A terminal state indicating the item failed and will not be retried further.
Starting the Queue
Before any items can be added, the queue must be initialized and started. The Start method spawns the configured number of worker goroutines.
// From flyteplugins/go/tasks/pluginmachinery/workqueue/queue_test.go
q, _ := NewIndexedWorkQueue("test-queue", processor, config, scope)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start must be called or Queue() will return ErrNotYetStarted
q.Start(ctx)
Idempotent Queuing
The Queue method is idempotent. If an item with the same WorkItemID is already in the LRU index, the request is ignored.
err := q.Queue(ctx, "item-123", myPayload)
Retrieving Results
Because processing is asynchronous, callers use Get to poll for the status of a work item.
info, found, err := q.Get("item-123")
if found {
switch info.Status() {
case workqueue.WorkStatusSucceeded:
// Handle success
case workqueue.WorkStatusNotDone:
// Still processing
}
}
Context Preservation
One critical feature of the IndexedWorkQueue is how it handles telemetry and logging. When Queue is called, the implementation uses copyAllowedLogFields to capture log fields from the current context.Context.
When a background worker eventually picks up the item, it restores these fields into the worker's context using contextWithValues. This ensures that logs generated inside the Processor.Process method contain the same trace IDs and metadata as the original request that queued the work.
Configuration
The Config struct in flyteplugins/go/tasks/pluginmachinery/workqueue/config.go controls the operational behavior of the queue:
| Field | Description |
|---|---|
Workers | Number of concurrent goroutines processing items. |
MaxRetries | How many times an item is retried if Process returns an error. |
IndexCacheMaxItems | The size of the LRU cache. |
In the Flyte Catalog implementation (flyteplugins/go/tasks/pluginmachinery/catalog/config.go), these values default to:
- Workers: 10
- MaxRetries: 3
- IndexCacheMaxItems: 10,000
Warning: If
IndexCacheMaxItemsis too small, older results will be evicted from the LRU cache. If a caller callsGet()for an evicted item, it will returnfound=false, even if the item was previously processed successfully.
Usage in Catalog Operations
The primary user of this machinery is the Catalog AsyncClientImpl. It uses two separate queues: one for reading (downloads) and one for writing (uploads).
When Download is called, it queues the requests and immediately checks their status. If the items are already cached from a previous call, the results are returned immediately. If not, the caller receives a "Not Ready" status, and the work continues in the background.
// Simplified pattern from flyteplugins/go/tasks/pluginmachinery/catalog/async_client_impl.go
func (c AsyncClientImpl) Download(ctx context.Context, requests ...DownloadRequest) (DownloadFuture, error) {
for _, request := range requests {
id := formatWorkItemID(request.Key)
c.Reader.Queue(ctx, id, NewReaderWorkItem(request.Key, request.Target))
info, found, _ := c.Reader.Get(id)
if found && info.Status() == workqueue.WorkStatusSucceeded {
// Result is ready
}
}
// ... return future
}