Instrumenting Services with Prometheus
Instrumenting Flyte services with Prometheus involves creating a hierarchical metric scope, defining specific metric types (counters, gauges, and stopwatches), and optionally using context-aware labeled metrics to automatically track metadata like project and domain.
In this tutorial, you will build a service instrumentation layer that records execution counts, active worker counts, and operation durations, then exposes them for scraping.
Prerequisites
To follow this tutorial, you need the following packages imported:
github.com/flyteorg/flyte/v2/flytestdlib/promutilsgithub.com/flyteorg/flyte/v2/flytestdlib/promutils/labeledgithub.com/flyteorg/flyte/v2/flytestdlib/contextutilsgithub.com/flyteorg/flyte/v2/flytestdlib/profutils
Step 1: Initialize the Metric Scope
The promutils.Scope is the foundation for all metrics in Flyte. It provides a nestable prefix for your metrics, ensuring they are organized and avoid name collisions.
import "github.com/flyteorg/flyte/v2/flytestdlib/promutils"
func main() {
// Create a top-level scope for your service
rootScope := promutils.NewScope("my_service")
// Create a sub-scope for a specific component
workerScope := rootScope.NewSubScope("worker")
// workerScope will prefix all metrics with "my_service:worker:"
}
promutils.NewScope automatically appends a delimiter (defaulting to :) and sanitizes the name. Characters like - are replaced with _.
Step 2: Define and Record Basic Metrics
Use the scope to create standard Prometheus metrics. The MustNew* methods are convenient for initialization as they panic if registration fails (e.g., due to a duplicate name).
type WorkerMetrics struct {
ExecutionCount promutils.Counter
ActiveWorkers promutils.Gauge
Latency promutils.StopWatch
}
func NewWorkerMetrics(scope promutils.Scope) WorkerMetrics {
return WorkerMetrics{
ExecutionCount: scope.MustNewCounter("executions_total", "Total number of executions"),
ActiveWorkers: scope.MustNewGauge("active_workers", "Current number of active workers"),
// StopWatch records durations. time.Millisecond specifies the scale.
Latency: scope.MustNewStopWatch("latency", "Execution latency", time.Millisecond),
}
}
func (m WorkerMetrics) DoWork() {
m.ActiveWorkers.Inc()
defer m.ActiveWorkers.Dec()
// Start a timer and defer its Stop to record duration
timer := m.Latency.Start()
defer timer.Stop()
m.ExecutionCount.Inc()
// Perform work...
}
The StopWatch is a Flyte-specific wrapper around Prometheus Summaries or Histograms that simplifies timing operations. When you call MustNewStopWatch, Flyte automatically appends the scale suffix (e.g., _ms) to the metric name.
Step 3: Use Context-Aware Labeled Metrics
Flyte provides a labeled package that automatically extracts label values from a context.Context. This is useful for tracking metrics by project, domain, or workflow without manually passing those values to every metric call.
First, you must initialize the global metric keys at application startup:
import (
"github.com/flyteorg/flyte/v2/flytestdlib/contextutils"
"github.com/flyteorg/flyte/v2/flytestdlib/promutils/labeled"
)
func init() {
// Define which keys from the context should be used as Prometheus labels
labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey)
}
Then, use labeled.Counter, labeled.Gauge, or labeled.StopWatch in your structs:
type LabeledMetrics struct {
Errors labeled.Counter
}
func NewLabeledMetrics(scope promutils.Scope) LabeledMetrics {
return LabeledMetrics{
Errors: labeled.NewCounter("errors", "Count of errors by project/domain", scope),
}
}
func (m LabeledMetrics) HandleError(ctx context.Context) {
// Automatically pulls project, domain, and workflow ID from the context
m.Errors.Inc(ctx)
}
If labeled.SetMetricKeys is not called before labeled.NewCounter, the application will panic.
Step 4: Expose Metrics via HTTP
To allow Prometheus to scrape your metrics, you need to start an HTTP server. Flyte's profutils package provides a utility to start a server with the default Prometheus handler at /metrics.
import (
"context"
"github.com/flyteorg/flyte/v2/flytestdlib/profutils"
)
func main() {
ctx := context.Background()
// Start the profiling and metrics server on port 9090
// This exposes /metrics, /healthz, and pprof endpoints
err := profutils.StartProfilingServerWithDefaultHandlers(ctx, 9090, nil)
if err != nil {
panic(err)
}
// Your service logic here...
select {}
}
The StartProfilingServerWithDefaultHandlers function starts a background goroutine. It includes health checks and runtime profiling handlers by default.
Complete Example
Combining these steps, here is a complete instrumented worker:
package main
import (
"context"
"time"
"github.com/flyteorg/flyte/v2/flytestdlib/contextutils"
"github.com/flyteorg/flyte/v2/flytestdlib/profutils"
"github.com/flyteorg/flyte/v2/flytestdlib/promutils"
"github.com/flyteorg/flyte/v2/flytestdlib/promutils/labeled"
)
type MyWorker struct {
scope promutils.Scope
metrics WorkerMetrics
}
type WorkerMetrics struct {
SuccessCount labeled.Counter
Latency promutils.StopWatch
}
func init() {
labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey)
}
func NewMyWorker(scope promutils.Scope) *MyWorker {
return &MyWorker{
scope: scope,
metrics: WorkerMetrics{
SuccessCount: labeled.NewCounter("success", "Total successes", scope),
Latency: scope.MustNewStopWatch("latency", "Work latency", time.Millisecond),
},
}
}
func (w *MyWorker) Run(ctx context.Context) {
timer := w.metrics.Latency.Start()
defer timer.Stop()
// Simulate work
time.Sleep(100 * time.Millisecond)
w.metrics.SuccessCount.Inc(ctx)
}
func main() {
ctx := context.Background()
// 1. Setup Scope
scope := promutils.NewScope("flyte_example")
// 2. Initialize Worker
worker := NewMyWorker(scope.NewSubScope("worker"))
// 3. Start Metrics Server
go profutils.StartProfilingServerWithDefaultHandlers(ctx, 9090, nil)
// 4. Run with context values
workerCtx := contextutils.WithProjectDomain(ctx, "flyte-project", "development")
for {
worker.Run(workerCtx)
time.Sleep(time.Second)
}
}
By running this, you can visit http://localhost:9090/metrics to see your metrics:
flyte_example:worker:successwith labelsproject="flyte-project"anddomain="development".flyte_example:worker:latency_msrecording the duration of theRunmethod.