Skip to main content

Resource Management and Quotas

When Flyte tasks interact with external services like Qubole, Databricks, or Snowflake, they often encounter strict concurrency limits or rate quotas. If Flyte ignores these limits and launches hundreds of concurrent tasks, the external service may throttle requests, fail jobs, or even become unstable.

Flyte provides a resource management system that allows plugins to track and respect these external quotas using a token-based allocation mechanism.

Core Concepts

The resource management system is built around three primary abstractions:

  • Resource: An abstraction of anything with a limited quota (e.g., a specific Qubole cluster or a global API rate limit).
  • Token: A placeholder representing a single unit of a resource. Flyte manages resources by tracking the allocation and release of these tokens.
  • Namespace: A unique identifier for a resource pool. Namespaces can be hierarchical, allowing for granular control over different clusters or service endpoints.

Registering Quotas

Before a plugin can request resources, it must register the available quota for a specific namespace during the Flyte Propeller setup phase. This is done using the ResourceRegistrar interface.

In your plugin's LoadPlugin function (which implements core.PluginLoader), use the SetupContext to register your quotas:

func (p *myPlugin) LoadPlugin(ctx context.Context, iCtx core.SetupContext) (core.Plugin, error) {
// Define a namespace for your resource
namespace := core.ResourceNamespace("my-external-service-cluster-1")

// Register a quota of 100 concurrent tokens for this namespace
err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, namespace, 100)
if err != nil {
return nil, err
}

return p, nil
}

If you are building a Web API plugin using the webapi framework, this registration is often handled automatically based on your PluginConfig:

// From flyteplugins/go/tasks/pluginmachinery/internal/webapi/core.go
if quotas := p.GetConfig().ResourceQuotas; len(quotas) > 0 {
for ns, quota := range quotas {
err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, ns, quota)
if err != nil {
return nil, err
}
}
}

Allocating Resources

During task execution, the plugin must request a token before performing the external operation. This is done via the ResourceManager available in the TaskExecutionContext.

The AllocateResource method requires:

  1. Namespace: The resource pool to allocate from.
  2. Allocation Token: A unique string for this specific execution (usually the task execution ID).
  3. Constraints: A ResourceConstraintsSpec to enforce multi-level limits.
// From flyteplugins/go/tasks/pluginmachinery/internal/webapi/allocation_token.go
token := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()
status, err := tCtx.ResourceManager().AllocateResource(ctx, namespace, token, constraints)

switch status {
case core.AllocationStatusGranted:
// Proceed with the external request
case core.AllocationStatusExhausted:
// Quota is full; return a transition to wait
return core.PhaseInfoWaitingForResourcesInfo(
clock.Now(), 0, "Quota exceeded. Waiting for resource.", nil), nil
}

Resource Namespaces and Hierarchy

ResourceNamespace is a string type that supports sub-namespaces. This is useful for partitioning a global quota into smaller pools. Use CreateSubNamespace to build these identifiers:

// From flyteplugins/go/tasks/pluginmachinery/core/resource_manager.go
globalNS := core.ResourceNamespace("qubole")
clusterNS := globalNS.CreateSubNamespace("cluster-abc") // Result: "qubole:cluster-abc"

Multi-level Constraints

The ResourceConstraintsSpec allows you to enforce quotas at different levels simultaneously, such as limiting total concurrent jobs for a project while also respecting a global namespace limit.

constraints := core.ResourceConstraintsSpec{
ProjectScopeResourceConstraint: &core.ResourceConstraint{Value: 10},
NamespaceScopeResourceConstraint: &core.ResourceConstraint{Value: 50},
}

Releasing Resources

To prevent "quota leaks," plugins must release tokens once the external operation is complete, regardless of whether the task succeeded, failed, or was aborted. This should typically be handled in the Finalize or Abort methods of your plugin.

func (p *myPlugin) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error {
token := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()
return tCtx.ResourceManager().ReleaseResource(ctx, namespace, token)
}

Failure to call ReleaseResource will result in the token pool remaining occupied, eventually leading to AllocationStatusExhausted for all subsequent tasks even if the external service is idle.

Allocation Statuses

The AllocateResource method returns an AllocationStatus enum (defined in flyteplugins/go/tasks/pluginmachinery/core/resource_manager.go):

StatusDescription
AllocationStatusGrantedThe token was successfully allocated. The plugin may proceed.
AllocationStatusExhaustedNo resources are available in the pool. The plugin should wait and retry.
AllocationStatusNamespaceQuotaExceededThe global pool has space, but the specific namespace quota is exceeded (currently reserved for future use).
AllocationUndefinedReturned when an error occurs during the allocation process.