Understanding Kubernetes’ tools/cache package: part 6

In part 5 of this series (you can start from the beginning if you like) we put all the structural pieces of the tools/cache package together.  However, I realize I made a mistake and did not cover the sharedProcessor and processorListener structs!  I’ll do that here before moving on to looking at the behavioral aspects of the package.

Let’s look at processorListener first.

To begin with, let’s agree that processorListener is an awful name for anything in software.  Agreed?  OK, good; let’s move on.

A processsorListener is an implementation construct in the tools/cache project that buffers up a set of notifications and distributes them to a ResourceEventHandler (covered in part 3).  If you add a notification, eventually a ResourceEventHandler‘s OnAdd, OnUpdate or OnDelete function will get called on a separate thread.  Its structural code is quite simple:

type processorListener struct {
	nextCh chan interface{}
	addCh  chan interface{}

	handler ResourceEventHandler

	// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
	// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
	// added until we OOM.
	// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
	// we should try to do something better.
	pendingNotifications buffer.RingGrowing

	// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
	requestedResyncPeriod time.Duration
	// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
	// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
	// informer's overall resync check period.
	resyncPeriod time.Duration
	// nextResync is the earliest time the listener should get a full resync
	nextResync time.Time
	// resyncLock guards access to resyncPeriod and nextResync
	resyncLock sync.Mutex
}

A processorListener has a never-ending run function that pulls notifications off of its nextCh Go channel (basically a synchronous blocking queue) and forwards them to its ResourceEventHandler:

func (p *processorListener) run() {
	defer utilruntime.HandleCrash()

	for next := range p.nextCh {
		switch notification := next.(type) {
		case updateNotification:
			p.handler.OnUpdate(notification.oldObj, notification.newObj)
		case addNotification:
			p.handler.OnAdd(notification.newObj)
		case deleteNotification:
			p.handler.OnDelete(notification.oldObj)
		default:
			utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
		}
	}
}

So how do notifications get put on this nextCh?  A processorListener has a pop function that is also never-ending (somewhat surprisingly).  The code is not intuitive to me at all, but if you squint you can see that basically it’s pulling items off of its pendingNotifications ring buffer and putting them on the nextCh Go channel:

func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		case nextCh <- notification:
			// Notification dispatched
			var ok bool
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
				nextCh = nil // Disable this select case
			}
		case notificationToAdd, ok := <-p.addCh:
			if !ok {
				return
			}
			if notification == nil { // No notification to pop (and pendingNotifications is empty)
				// Optimize the case - skip adding to pendingNotifications
				notification = notificationToAdd
				nextCh = p.nextCh
			} else { // There is already a notification waiting to be dispatched
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}

So something has to kick off the run and pop functions.  That job falls to the sharedProcessor.  A sharedProcessor is really simple:

type sharedProcessor struct {
	listenersLock    sync.RWMutex
	listeners        []*processorListener
	syncingListeners []*processorListener
	clock            clock.Clock
	wg               wait.Group
}

It too has a never-ending Run function.  The first thing it does is to kick off its processorListeners’ run and pop functions on separate threads.  Then it blocks and waits for a signal to close:

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	func() {
		p.listenersLock.RLock()
		defer p.listenersLock.RUnlock()
		for _, listener := range p.listeners {
			p.wg.Start(listener.run)
			p.wg.Start(listener.pop)
		}
	}()
	<-stopCh
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	for _, listener := range p.listeners {
		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
	}
	p.wg.Wait() // Wait for all .pop() and .run() to stop
}

OK, so who tells the sharedProcessor‘s run method to do its thing?  The sharedIndexInformer‘s run method.  In there, you’ll find this line:

wg.StartWithChannel(processorStopCh, s.processor.run)

That spawns the sharedIndexInformer‘s sharedProcessor‘s run function in a new thread (and if a signal is sent down the processorStopCh channel then it will stop).

Stepping back for a moment, why all these threads? Why all this complexity?

The best I can do is this:

A processorListener is effectively the guts of a thread that could be blocked for a while by a badly-behaved ResourceEventListener, which is under the end-user’s control.  So you want its “dequeuing” behavior to be on its own thread so that a badly-behaved ResourceEventListener doesn’t accidentally cause the whole pinball machine to stop working while Kubernetes continues to deliver events at a frantic pace.

A sharedProcessor is really a kind of façade that bundles a bunch of processorListeners together and can spray a single notification across all of them, in addition to managing their threading concerns.  In Java, for example, where we have things like the ability to interrupt a thread built in, we could probably blend these two concerns together.  A better name for this thing might be something more like EventDistributor.

A sharedIndexInformer, as previously discussed, has its own threading concerns so as not to slow down the reception of Kubernetes events.

So now that we’ve added the processorListener and sharedProcessor types into the mix, let’s amend our overall structural diagram to include them:

KubernetesControllerBlogPart5

In the next post, we’ll cover the behavioral aspects of all this.

Advertisement

Author: Laird Nelson

Devoted husband and father; working on Helidon at the intersection of Java, Jakarta EE, architecture, Kubernetes and microservices at Oracle; open source guy; Hammond B3 player and Bainbridge Islander.

2 thoughts on “Understanding Kubernetes’ tools/cache package: part 6”

Comments are closed.

%d bloggers like this: