Skip to main content

Getting Started with the Catalog Client

The Catalog Client in Flyte provides the primary interface for interacting with the memoization system. It allows tasks to cache their results and retrieve them in subsequent executions, significantly reducing redundant computations.

This tutorial walks you through initializing both synchronous and asynchronous Catalog clients and performing basic operations to manage task result caching.

Prerequisites

To follow this tutorial, you need:

  • A configured storage.DataStore to handle artifact data.
  • The URL of the Flyte Cache Service.
  • A promutils.Scope for metrics.

Step 1: Initialize the Synchronous Catalog Client

The synchronous client, implemented by cache_service.Client, communicates directly with the Flyte Cache Service using ConnectRPC. It is responsible for generating cache keys and managing artifact metadata.

import (
"time"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/catalog"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/catalog/cache_service"
"github.com/flyteorg/flyte/v2/flytestdlib/storage"
)

// Assume dataStore and cacheServiceURL are already initialized
catalogCfg := catalog.GetConfig()
cacheClient := cache_service.NewHTTPClient(
dataStore,
cacheServiceURL,
catalogCfg.MaxCacheAge.Duration,
)

The NewHTTPClient function creates a concrete cache_service.Client that implements the catalog.Client interface. The maxCacheAge parameter ensures that the client treats entries older than the specified duration as cache misses, even if they exist in the remote service.

Step 2: Initialize and Start the Asynchronous Client

In most plugin implementations, you should use the AsyncClient. It wraps the synchronous client and uses internal workqueues to perform operations without blocking the main execution loop.

import (
"context"
"github.com/flyteorg/flyte/v2/flytestdlib/promutils"
)

// Initialize the AsyncClient using the sync client created in Step 1
asyncCatalogClient, err := catalog.NewAsyncClient(
cacheClient,
*catalogCfg,
promutils.NewScope("executor:catalog"),
)
if err != nil {
return err
}

// CRITICAL: You must start the client to begin processing the workqueues
ctx := context.Background()
if err := asyncCatalogClient.Start(ctx); err != nil {
return err
}

The NewAsyncClient function returns an AsyncClientImpl which contains two IndexedWorkQueue instances: one for reading (downloads) and one for writing (uploads). Calling Start(ctx) is mandatory; otherwise, requests will be queued but never executed.

Step 3: Perform a Synchronous Cache Put

To cache a result synchronously, you use the Put method. Note that the data you are caching must already exist in the DataStore before you call this method.

import (
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core"
)

key := catalog.Key{
Identifier: &core.Identifier{
Project: "flytesnacks",
Domain: "development",
Name: "my_task",
Version: "v1",
},
CacheVersion: "1.0",
// InputReader is used to hash input values for the cache key
InputReader: inputReader,
}

metadata := catalog.Metadata{
TaskExecutionIdentifier: taskExecID,
}

// status indicates if the put was successful or if it was a duplicate
status, err := cacheClient.Put(ctx, key, outputReader, metadata)
if err != nil {
return err
}

if status.GetCacheStatus() == core.CatalogCacheStatus_CACHE_HIT {
// Entry already existed
}

The catalog.Key is the unique identifier for the cached entry. It is computed based on the task identifier, the CacheVersion, and the actual input values provided via the InputReader.

Step 4: Perform an Asynchronous Download

Asynchronous operations return a Future object. You can check the status of the operation later or register a callback.

// Define the download request
requests := []catalog.DownloadRequest{
{
Key: key,
Target: outputWriter, // Where the cached data will be written
},
}

// Start the async download
future, err := asyncCatalogClient.Download(ctx, requests...)
if err != nil {
return err
}

// Later in the execution loop, check if the result is ready
if future.GetResponseStatus() == catalog.ResponseStatusReady {
response, err := future.GetResponse()
if err != nil {
return err
}

if response.GetCachedCount() > 0 {
// Cache hit! The data has been written to the Target (outputWriter)
} else {
// Cache miss
}
}

The Download method queues the request and returns a DownloadFuture. When the status becomes ResponseStatusReady, the DownloadResponse provides details about the cached results. If a cache hit occurs, the AsyncClient automatically handles downloading the artifact data from storage and writing it to the provided Target.

Complete Example Summary

By combining these steps, you can integrate Flyte's catalog into your custom components:

  1. Setup: Initialize cache_service.Client and catalog.AsyncClientImpl.
  2. Lifecycle: Call Start() on the async client during your application's startup phase.
  3. Execution: Use Download at the beginning of a task execution to check for memoized results.
  4. Completion: Use Upload (the async version of Put) at the end of a task execution to store new results.

For more advanced usage, such as managing task reservations to prevent redundant concurrent executions, refer to the GetOrExtendReservation and ReleaseReservation methods in the catalog.Client interface.