In part 5 of this series (you can start from the beginning if you like) we put all the structural pieces of the tools/cache
package together. However, I realize I made a mistake and did not cover the sharedProcessor
and processorListener
struct
s! I’ll do that here before moving on to looking at the behavioral aspects of the package.
Let’s look at processorListener
first.
To begin with, let’s agree that processorListener
is an awful name for anything in software. Agreed? OK, good; let’s move on.
A processsorListener
is an implementation construct in the tools/cache
project that buffers up a set of notifications and distributes them to a ResourceEventHandler
(covered in part 3). If you add
a notification, eventually a ResourceEventHandler
‘s OnAdd
, OnUpdate
or OnDelete
function will get called on a separate thread. Its structural code is quite simple:
type processorListener struct { nextCh chan interface{} addCh chan interface{} handler ResourceEventHandler // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed. // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications // added until we OOM. // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but // we should try to do something better. pendingNotifications buffer.RingGrowing // requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer requestedResyncPeriod time.Duration // resyncPeriod is how frequently the listener wants a full resync from the shared informer. This // value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the // informer's overall resync check period. resyncPeriod time.Duration // nextResync is the earliest time the listener should get a full resync nextResync time.Time // resyncLock guards access to resyncPeriod and nextResync resyncLock sync.Mutex }
A processorListener
has a never-ending run
function that pulls notifications off of its nextCh
Go channel (basically a synchronous blocking queue) and forwards them to its ResourceEventHandler
:
func (p *processorListener) run() { defer utilruntime.HandleCrash() for next := range p.nextCh { switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next)) } } }
So how do notifications get put on this nextCh
? A processorListener
has a pop
function that is also never-ending (somewhat surprisingly). The code is not intuitive to me at all, but if you squint you can see that basically it’s pulling items off of its pendingNotifications
ring buffer and putting them on the nextCh
Go channel:
func (p *processorListener) pop() { defer utilruntime.HandleCrash() defer close(p.nextCh) // Tell .run() to stop var nextCh chan<- interface{} var notification interface{} for { select { case nextCh <- notification: // Notification dispatched var ok bool notification, ok = p.pendingNotifications.ReadOne() if !ok { // Nothing to pop nextCh = nil // Disable this select case } case notificationToAdd, ok := <-p.addCh: if !ok { return } if notification == nil { // No notification to pop (and pendingNotifications is empty) // Optimize the case - skip adding to pendingNotifications notification = notificationToAdd nextCh = p.nextCh } else { // There is already a notification waiting to be dispatched p.pendingNotifications.WriteOne(notificationToAdd) } } } }
So something has to kick off the run
and pop
functions. That job falls to the sharedProcessor
. A sharedProcessor
is really simple:
type sharedProcessor struct { listenersLock sync.RWMutex listeners []*processorListener syncingListeners []*processorListener clock clock.Clock wg wait.Group }
It too has a never-ending Run
function. The first thing it does is to kick off its processorListener
s’ run
and pop
functions on separate threads. Then it blocks and waits for a signal to close:
func (p *sharedProcessor) run(stopCh <-chan struct{}) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { p.wg.Start(listener.run) p.wg.Start(listener.pop) } }() <-stopCh p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop } p.wg.Wait() // Wait for all .pop() and .run() to stop }
OK, so who tells the sharedProcessor
‘s run
method to do its thing? The sharedIndexInformer
‘s run
method. In there, you’ll find this line:
wg.StartWithChannel(processorStopCh, s.processor.run)
That spawns the sharedIndexInformer
‘s sharedProcessor
‘s run
function in a new thread (and if a signal is sent down the processorStopCh
channel then it will stop).
Stepping back for a moment, why all these threads? Why all this complexity?
The best I can do is this:
A processorListener
is effectively the guts of a thread that could be blocked for a while by a badly-behaved ResourceEventListener
, which is under the end-user’s control. So you want its “dequeuing” behavior to be on its own thread so that a badly-behaved ResourceEventListener
doesn’t accidentally cause the whole pinball machine to stop working while Kubernetes continues to deliver events at a frantic pace.
A sharedProcessor
is really a kind of façade that bundles a bunch of processorListener
s together and can spray a single notification across all of them, in addition to managing their threading concerns. In Java, for example, where we have things like the ability to interrupt a thread built in, we could probably blend these two concerns together. A better name for this thing might be something more like EventDistributor
.
A sharedIndexInformer
, as previously discussed, has its own threading concerns so as not to slow down the reception of Kubernetes events.
So now that we’ve added the processorListener
and sharedProcessor
types into the mix, let’s amend our overall structural diagram to include them:
In the next post, we’ll cover the behavioral aspects of all this.
2 thoughts on “Understanding Kubernetes’ tools/cache package: part 6”
Comments are closed.