Real-time Log Streaming
Flyte provides a real-time log streaming service that allows users to fetch execution logs directly from Kubernetes pods or other backends via a gRPC/Connect stream. This functionality is primarily handled by the RunLogsService and the LogStreamer interface.
Enabling Log Streaming
Log streaming is conditionally enabled during the application bootstrap process. In runs/setup.go, Flyte checks for the presence of a Kubernetes configuration (sc.K8sConfig). If provided, it instantiates a K8sLogStreamer and mounts the RunLogsService.
// runs/setup.go
if sc.K8sConfig != nil {
// Create the streamer for Kubernetes pods
logStreamer, err := service.NewK8sLogStreamer(sc.K8sConfig)
if err != nil {
return fmt.Errorf("runs: failed to create k8s log streamer: %w", err)
}
// Initialize the service with the streamer and repository
runLogsSvc := service.NewRunLogsService(repo, logStreamer)
// Mount the service handler on the mux
runLogsPath, runLogsHandler := workflowconnect.NewRunLogsServiceHandler(runLogsSvc, connect.WithInterceptors(otelInterceptor))
sc.Mux.Handle(runLogsPath, runLogsHandler)
logger.Infof(ctx, "Mounted RunLogsService at %s", runLogsPath)
}
Streaming Logs from Kubernetes
The K8sLogStreamer implementation in runs/service/k8s_log_streamer.go fetches logs directly from the Kubernetes API. It uses the LogContext (resolved from the database by RunLogsService) to identify the correct pod, namespace, and container.
Log Following Logic
The streamer automatically determines whether to "follow" the log stream based on the pod's current phase. If the pod is in the PodRunning phase, Follow is set to true. For pods in other phases (e.g., Succeeded, Failed, Pending), it disables following to return existing logs immediately.
// runs/service/k8s_log_streamer.go
podObj, err := s.clientset.CoreV1().Pods(pod.GetNamespace()).Get(ctx, pod.GetPodName(), metav1.GetOptions{})
// ... error handling ...
opts := &corev1.PodLogOptions{
Container: container.GetContainerName(),
Follow: podObj.Status.Phase == corev1.PodRunning,
Timestamps: true,
}
Handling Start Times
If a container start time is available in the LogContext, the streamer uses SinceTime and clears the TailLines option. This ensures that the stream includes all logs from the beginning of that specific process rather than just the last few lines.
// runs/service/k8s_log_streamer.go
if startTime := container.GetProcess().GetContainerStartTime(); startTime != nil {
t := metav1.NewTime(startTime.AsTime())
opts.SinceTime = &t
opts.TailLines = nil // Ignore default tail lines when start time is known
}
Implementing a Custom Log Streamer
To support other log backends (such as AWS CloudWatch or GCP Stackdriver), you must implement the LogStreamer interface defined in runs/service/run_logs_service.go.
// runs/service/run_logs_service.go
type LogStreamer interface {
TailLogs(ctx context.Context, logContext *core.LogContext, stream *connect.ServerStream[workflow.TailLogsResponse]) error
}
The TailLogs method receives a LogContext containing pod and container metadata, and a ServerStream to which it should send TailLogsResponse messages containing LogLine objects.
Managing Concurrency and Timeouts
Flyte implements safeguards to prevent log streaming from exhausting system resources or being prematurely terminated.
Concurrency Limits
The RunLogsService uses a weighted semaphore to limit the number of simultaneous log streams. By default, this is capped at 100 concurrent streams. If the limit is reached, the service returns a ResourceExhausted error.
// runs/service/run_logs_service.go
const defaultMaxConcurrentStreams = 100
func (s *RunLogsService) TailLogs(ctx context.Context, req *connect.Request[workflow.TailLogsRequest], stream *connect.ServerStream[workflow.TailLogsResponse]) error {
// ...
if !s.sem.TryAcquire(1) {
return connect.NewError(connect.CodeResourceExhausted, fmt.Errorf("too many concurrent log streams"))
}
defer s.sem.Release(1)
// ...
}
Timeout Management
To prevent long-lived log streams from being closed by the Kubernetes client-go library's default timeouts, K8sLogStreamer explicitly clears the timeout in its REST configuration:
// runs/service/k8s_log_streamer.go
func NewK8sLogStreamer(k8sConfig *rest.Config) (*K8sLogStreamer, error) {
cfg := rest.CopyConfig(k8sConfig)
cfg.Timeout = 0 // Prevent premature closure of long-lived streams
// ...
}
Additionally, the streamer creates a separate context for the Kubernetes log request that is decoupled from the incoming gRPC deadline but still respects client cancellation. This ensures that a short proxy timeout doesn't kill a stream that the user still wants to follow.
Troubleshooting
- Service Not Mounted: If the
RunLogsServiceis not appearing in your service list, ensure thatK8sConfigis properly initialized in your application configuration. - Empty Logs for Terminated Pods: If a pod has been deleted from Kubernetes (e.g., by a cleanup controller),
K8sLogStreamerwill return aNotFounderror. In such cases, logs must be retrieved from an external log provider if configured. - Resource Exhausted: If you frequently encounter "too many concurrent log streams", you may need to adjust the
defaultMaxConcurrentStreamsconstant or implement more aggressive client-side stream management.