Skip to main content

Real-time Log Streaming

To stream real-time logs from Flyte application replicas, you use the TailLogs RPC provided by the AppLogsService. This service handles the request flow from the control plane to the data plane, resolving application identifiers to specific Kubernetes pods and streaming their output back to the client.

Stream Logs by Application ID

When you request logs for an AppId, Flyte resolves all active replicas (pods) for that application and tails them concurrently. The InternalAppLogsService manages this resolution and serializes the log lines from multiple sources into a single stream.

// From app/internal/service/app_logs_service.go

func (s *InternalAppLogsService) TailLogs(
ctx context.Context,
req *connect.Request[flyteapp.TailLogsRequest],
stream *connect.ServerStream[flyteapp.TailLogsResponse],
) error {
// 1. Limit concurrency to protect the data plane
if !s.sem.TryAcquire(1) {
return connect.NewError(connect.CodeResourceExhausted, fmt.Errorf("too many concurrent log streams"))
}
defer s.sem.Release(1)

// 2. Resolve AppId or ReplicaId to specific pods
replicas, err := s.resolveReplicas(ctx, req.Msg)
if err != nil {
return err
}

// 3. Stream from all replicas in parallel
var sendMu sync.Mutex
send := func(replicaID *flyteapp.ReplicaIdentifier) func(*flyteapp.LogLines) error {
return func(logs *flyteapp.LogLines) error {
logs.ReplicaId = replicaID
sendMu.Lock() // Connect streams are not thread-safe
defer sendMu.Unlock()
return stream.Send(&flyteapp.TailLogsResponse{
Resp: &flyteapp.TailLogsResponse_Batches{
Batches: &flyteapp.LogLinesBatch{Logs: []*flyteapp.LogLines{logs}},
},
})
}
}

// ... logic to call s.streamer.TailLogs for each replica ...
}

Stream Logs from a Specific Replica

If you already have a ReplicaIdentifier (which includes the AppId and the specific pod name), you can target a single replica. The K8sAppLogStreamer handles the low-level Kubernetes log streaming, automatically selecting the user container and skipping sidecars like Knative's queue-proxy.

// From app/internal/service/app_logs_streamer.go

func (s *K8sAppLogStreamer) TailLogs(ctx context.Context, replicaID *flyteapp.ReplicaIdentifier, send func(*flyteapp.LogLines) error) error {
podName := replicaID.GetName()
ns := appk8s.AppNamespace

pod, err := s.clientset.CoreV1().Pods(ns).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return connect.NewError(connect.CodeNotFound, fmt.Errorf("pod %s not found", podName))
}

// Automatically selects the user container, skipping "queue-proxy"
containerName := pickUserContainer(pod)

opts := &corev1.PodLogOptions{
Container: containerName,
Follow: pod.Status.Phase == corev1.PodRunning,
Timestamps: true,
TailLines: &tailLines, // Defaults to 1000 lines
}

// Detach from inbound gRPC deadline to allow long-lived 'tail -f'
streamCtx, streamCancel := context.WithCancel(context.Background())
defer streamCancel()
stop := context.AfterFunc(ctx, streamCancel)
defer stop()

logStream, err := s.clientset.CoreV1().Pods(ns).GetLogs(podName, opts).Stream(streamCtx)
// ... stream processing ...
}

Control Plane Proxying

The AppLogsService in the control plane acts as a gateway. It does not interact with Kubernetes directly; instead, it proxies the TailLogs request to the InternalAppLogsService (data plane) using an internalClient.

// From app/service/app_logs_service.go

func (s *AppLogsService) TailLogs(
ctx context.Context,
req *connect.Request[flyteapp.TailLogsRequest],
stream *connect.ServerStream[flyteapp.TailLogsResponse],
) error {
// Proxy the request to the data plane
clientStream, err := s.internalClient.TailLogs(ctx, req)
if err != nil {
return connect.NewError(connect.CodeInternal, err)
}
defer clientStream.Close()

// Forward received log batches to the original caller
for clientStream.Receive() {
if err := stream.Send(clientStream.Msg()); err != nil {
return err
}
}
return clientStream.Err()
}

Key Implementation Details

  • Concurrency Limits: The InternalAppLogsService uses a semaphore (defaultMaxConcurrentLogStreams) to limit the data plane to 100 concurrent log streams. Requests exceeding this limit return a ResourceExhausted error.
  • Container Selection: The pickUserContainer function in K8sAppLogStreamer ensures that logs are pulled from the application container. It explicitly ignores the queue-proxy container often found in Knative-based deployments.
  • Deadline Detachment: To support long-lived log tailing, K8sAppLogStreamer detaches the Kubernetes log stream from the inbound gRPC context deadline. It uses context.AfterFunc to ensure the stream is still cancelled if the client disconnects, but prevents the stream from being killed by intermediate proxy timeouts.
  • Thread Safety: Because connect.ServerStream.Send is not thread-safe, InternalAppLogsService uses a sync.Mutex to serialize log batches when multiple replicas are being streamed simultaneously.