Skip to main content

Monitoring App Status with Watches

Flyte provides a real-time monitoring system for applications based on Kubernetes informers. This system allows you to subscribe to events—such as application creation, status updates, or deletion—and receive them over a Go channel as they happen.

Initializing the Watch System

Before any subscriptions can be created, the watch system must be initialized. This is typically done once during the service startup. The AppK8sClient uses a shared Kubernetes informer to monitor Knative Services (KServices) in the flyte namespace.

In app/internal/setup.go, Flyte initializes the client and starts the watcher:

appK8sClient := appk8s.NewAppK8sClient(sc.K8sClient, sc.K8sCache, cfg)

// Start the KService informer
if err := appK8sClient.StartWatching(ctx); err != nil {
return fmt.Errorf("internalapp: failed to start KService watcher: %w", err)
}

// Register a cleanup worker to stop the watcher on shutdown
sc.AddWorker("app-kservice-watcher", func(ctx context.Context) error {
<-ctx.Done()
appK8sClient.StopWatching()
return nil
})

The StartWatching method ensures that the informer is only started once. Internally, it calls setupInformer, which registers event handlers for Add, Update, and Delete events on KService objects.

Subscribing to Application Events

To monitor events, you use the Subscribe method on the AppK8sClient. This method returns a channel of *flyteapp.WatchResponse objects.

  • Specific App: Pass the application name to receive events only for that app.
  • All Apps: Pass an empty string ("") to receive events for every managed application in the project/domain scope.
// Subscribe to events for a specific app
ch := appK8sClient.Subscribe("my-app-name")

// Always ensure you unsubscribe to prevent memory leaks
defer appK8sClient.Unsubscribe("my-app-name", ch)

The Snapshot + Stream Pattern

When implementing a streaming RPC (like the Watch method in InternalAppService), simply subscribing to the channel is not enough. If you subscribe first, you might miss the current state of the application. If you list the applications first and then subscribe, you might miss events that occurred between the list and the subscription.

Flyte solves this using a "Subscribe then List" pattern. By subscribing first, you ensure that any new events are captured in the channel's buffer while you fetch the initial snapshot.

The following implementation from app/internal/service/internal_app_service.go demonstrates this pattern:

func (s *InternalAppService) Watch(
ctx context.Context,
req *connect.Request[flyteapp.WatchRequest],
stream *connect.ServerStream[flyteapp.WatchResponse],
) error {
appName := req.Msg.GetTarget().GetAppId().GetName()

// 1. Subscribe first so no events are lost during the snapshot fetch
ch := s.k8s.Subscribe(appName)
defer s.k8s.Unsubscribe(appName, ch)

// 2. Send initial snapshot (current state)
snapshot, _, err := s.k8s.List(ctx, project, domain, 0, "")
if err != nil {
return connect.NewError(connect.CodeInternal, err)
}
for _, app := range snapshot {
// Filter and send CreateEvents for existing apps
if err := stream.Send(&flyteapp.WatchResponse{
Event: &flyteapp.WatchResponse_CreateEvent{
CreateEvent: &flyteapp.CreateEvent{App: app},
},
}); err != nil {
return err
}
}

// 3. Stream live updates from the channel
for {
select {
case <-ctx.Done():
return nil
case event, ok := <-ch:
if !ok {
return nil
}
if err := stream.Send(event); err != nil {
return err
}
}
}
}

Internal Mechanism and Filtering

The AppK8sClient does not monitor every KService in the cluster. It specifically filters for services managed by Flyte using the flyte.org/app-managed label.

In app/internal/k8s/app_client.go, the isManagedKService helper enforces this:

func isManagedKService(ksvc *servingv1.Service) bool {
return ksvc.Labels["flyte.org/app-managed"] == "true"
}

When an event occurs, handleKServiceEvent converts the Kubernetes-native servingv1.Service object into a Flyte App object and wraps it in the appropriate event type:

  • WatchResponse_CreateEvent (for k8swatch.Added)
  • WatchResponse_UpdateEvent (for k8swatch.Modified)
  • WatchResponse_DeleteEvent (for k8swatch.Deleted)

Configuration and Reliability

The reliability of the watch system depends on the WatchBufferSize configuration (default: 100). This setting determines how many events can be queued for a subscriber before the system starts dropping them.

If a subscriber is slow to consume events and the channel buffer fills up, Flyte will drop the update and log a warning to prevent the entire watch system from blocking:

// From app/internal/k8s/app_client.go
func (c *AppK8sClient) notifySubscribers(ctx context.Context, appName string, resp *flyteapp.WatchResponse) {
c.mu.RLock()
defer c.mu.RUnlock()

for ch := range c.subscribers[appName] {
select {
case ch <- resp:
default:
// If the channel is full, the event is dropped for this specific subscriber
logger.Warnf(ctx, "subscriber channel full, dropping update for app: %s", appName)
}
}
}

To handle high-frequency updates or many concurrent subscribers, you can increase the watchBufferSize in the InternalAppConfig.