Skip to main content

Configuring Cache Expiration and Workqueues

To tune Flyte's catalog performance and cache behavior, you must configure the catalogCache section of the Flyte configuration. This allows you to control how task outputs are memoized, how long they persist, and how the system handles high-throughput catalog interactions via asynchronous workqueues.

Configuring the Catalog Cache

The following YAML configuration demonstrates how to set cache expiration, enforce project-domain isolation, and scale the reader/writer workqueues for high-concurrency environments:

catalogCache:
maxCacheAge: 24h
cacheKey:
enforceExecutionProjectDomain: true
reader:
workers: 20
maxRetries: 5
maxItems: 50000
writer:
workers: 20
maxRetries: 5
maxItems: 50000

These settings map directly to the Config struct defined in flyteplugins/go/tasks/pluginmachinery/catalog/config.go:

type Config struct {
ReaderWorkqueueConfig workqueue.Config `json:"reader" pflag:",Catalog reader workqueue config..."`
WriterWorkqueueConfig workqueue.Config `json:"writer" pflag:",Catalog writer workqueue config..."`
CacheKey CacheKeyConfig `json:"cacheKey" pflag:",Cache key configuration."`
MaxCacheAge stdconfig.Duration `json:"maxCacheAge" pflag:",Cache entries past this age will incur cache miss. 0 means cache never expires."`
}

Tuning Workqueues for High Throughput

Flyte uses separate workqueues for reading from and writing to the catalog to prevent slow writes from blocking cache lookups. These are managed by the AsyncClientImpl in flyteplugins/go/tasks/pluginmachinery/catalog/async_client_impl.go.

Each workqueue is configured using the workqueue.Config struct:

  • workers: The number of concurrent goroutines processing catalog requests. Increase this if you see high latency in task transitions due to catalog bottlenecks.
  • maxRetries: The number of times a catalog operation (like a S3/GCS download of metadata) will be retried before failing.
  • maxItems: The size of the index cache for the workqueue.

In flyteplugins/go/tasks/pluginmachinery/catalog/async_client_impl.go, these queues are initialized as follows:

func NewAsyncClient(client Client, cfg Config, scope promutils.Scope) (AsyncClientImpl, error) {
readerWorkQueue, err := workqueue.NewIndexedWorkQueue("reader", NewReaderProcessor(client), cfg.ReaderWorkqueueConfig,
scope.NewSubScope("reader"))
// ...
writerWorkQueue, err := workqueue.NewIndexedWorkQueue("writer", NewWriterProcessor(client), cfg.WriterWorkqueueConfig,
scope.NewSubScope("writer"))
// ...
}

Managing Cache Expiration

The maxCacheAge parameter controls the TTL (Time To Live) of cache entries.

  • Expiration: If a cache entry is older than the specified duration, Flyte treats it as a cache miss and re-executes the task.
  • Infinite Cache: Setting maxCacheAge: 0 (the default) means cache entries never expire based on age.

Enforcing Cache Isolation

By default, Flyte may allow cache hits across different projects if the task signature and inputs match. To restrict cache hits to the same project and domain where the execution is running, use the CacheKeyConfig in flyteplugins/go/tasks/pluginmachinery/catalog/config.go:

type CacheKeyConfig struct {
EnforceExecutionProjectDomain bool `json:"enforceExecutionProjectDomain" pflag:", Use execution project domain when computing the cache key..."`
}

When enforceExecutionProjectDomain is set to true, Flyte includes the current execution's project and domain in the cache key hash, even if the task being executed was defined in a different project.

Troubleshooting and Best Practices

Sizing for Array Tasks

The maxItems setting in both reader and writer workqueues must be large enough to accommodate the largest array tasks allowed in your Flyte deployment. Because each sub-task in an array task can trigger a separate catalog lookup or upload, a small maxItems value will cause the workqueue index to overflow, leading to performance degradation or dropped requests.

Monitoring Catalog Performance

The AsyncClientImpl automatically creates sub-scopes for monitoring. You can track the performance of these configurations using the following Prometheus metrics (assuming the default executor:catalog scope):

  • executor:catalog:reader:queue_length
  • executor:catalog:writer:queue_length
  • executor:catalog:reader:error_count

Initialization in Custom Plugins

If you are implementing a custom executor or plugin that needs to interact with the catalog, ensure you initialize the client using the global config as seen in executor/setup.go:

catalogCfg := catalog.GetConfig()
asyncCatalogClient, err := catalog.NewAsyncClient(cacheClient, *catalogCfg, promutils.NewScope("executor:catalog"))
if err != nil {
return fmt.Errorf("failed to create catalog cache client: %w", err)
}