In part 6 of this series (you can start at the beginning if you want) we arrived at a reasonably complete structural view of sharedIndexInformer
and all the concepts it leads to.
Now it’s time to look at the behavioral aspects of all this.
WordPress appears to scale all image uploads, so you’re going to have to squint a lot. I’ve also created a Dropbox link to the full-size image.
In the diagram below, I’ve used UML 2.0 constructs. Specifically, filled arrowheads represent synchronous calls, open arrowheads on solid lines represent asynchronous calls, and open arrowheads on dotted lines represent return values. Frames labeled alt
are, following UML 2.0, conditional branch points. Lastly I’ve simplified a few things with stereotypes that I hope are reasonably self-explanatory.
This diagram starts with someone or something calling the Run
function on a sharedIndexInformer
. The Run
function creates a new DeltaFIFO
, passing it the MetaNamespaceKeyFunc
as its KeyFunc
, and the sharedIndexInformer
‘s Indexer
(which is also a KeyLister
and a KeyGetter
, but you can’t tell from just looking at the code).
Then, the Run
function creates a new Controller
, which I’ve looked at in some detail in part 2, and calls its run
function asynchronously. The sharedIndexInformer
‘s Run
function now blocks until explicitly closed.
The new Controller
‘s run
function creates a new Reflector
which for the purposes of this series you can just handwave over: trust that by using its embedded ListerWatcher
it accurately puts Kubernetes resources into its store, which happens to be the DeltaFIFO
created earlier.
At this point, we have, at a high level, a Rube Goldberg machine that replicates Kubernetes resources into a DeltaFIFO
. If a new Pod
shows up in Kubernetes, for example, then it shows up in the DeltaFIFO
.
Now the Controller
‘s run
enters an endless-until-explicitly-stopped loop where it calls the Controller
‘s processLoop
function every second.
The processLoop
function drains the DeltaFIFO
of the items placed in it on a separate thread by the Reflector
. The DeltaFIFO
, in other words, is (as the name states) a buffered queue where the producer is effectively Kubernetes itself by way of a Reflector
, and the consumer is (effectively) whatever function was supplied to the Controller
when it was built.
So what function was supplied to this particular Controller
when it was built? The sharedIndexInformer
‘s handleDeltas
function.
The handleDeltas
function is thus a queue drainer, and for many behavioral analysis purposes, we can conveniently ignore all that came before. We know that when this function is called, it has received a set of additions, changes, full replacements or deletions from (effectively) Kubernetes. Here’s what it looks like:
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: isSync := d.Type == Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, isSync) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil }
Based on the kind of thing that it’s working on, it either adds, updates or deletes the event to or from yet another queue, this time the Indexer
that was created by the sharedIndexInformer
driving this whole train. Once that Add
, Update
or Delete
call returns, it then calls the distribute
function on the sharedIndexInformer
‘s associated sharedProcessor
.
The sharedProcessor
‘s distribute
function forwards on the notification to its appropriate processorListener
s, effectively multiplexing the notification out. So if a given Delta
object represents an addition, then the add
function of a processorListener
will be called.
The processorListener
add
function simply places the incoming notification onto a synchronous queue (a Go channel) named addCh
. At this point our notification’s journey is at a temporary end.
Meanwhile, back at the sharedIndexInformer
ranch, recall that its Run
method is still in play. The third meaningful thing that it does after creating a DeltaFIFO
(that’s being filled and drained now) and a Controller
(that’s indirectly filling it and draining it) is to call the run
function on its sharedProcessor
on a separate thread.
The sharedProcessor
run
function spawns two more threads and then hangs around until told to shut down. Thread number one calls the run
function on each processorListener
the sharedProcessor
has. Thread number two calls the pop
method on each processorListener
. We’ll look at the processListener
run
function first.
The processListener
run
function at a very high level simply pulls any notification off of its nextCh
synchronous queue (Go channel) and, depending on what kind it is, finally calls either OnUpdate
, OnAdd
or OnDelete
on the user-supplied ResourceEventHandler
. It’s simple enough I can reproduce it here:
func (p *processorListener) run() { defer utilruntime.HandleCrash() for next := range p.nextCh { switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next)) } } }
So what puts things on the nextCh
channel? The pop
function. This function runs until told to shut down explicitly, and pulls incoming notifications off its addCh
synchronous queue (Go channel) and puts them on the nextCh
channel.
So what puts things on the addCh
channel? Look up a few paragraphs and recall that this was the logical end to our Kubernetes event’s journey: a notification representing the event is placed on addCh
by the processorListener
add
function, invoked by the sharedProcessor
‘s distribute
function.
This seems like a good place to stop this post. I encourage you to check out the full size version of the sequence diagram and perhaps print it out or keep it next to you as you reread the series and a lot of the constructs in this package will make more sense.
In the next post, I hope to find some places for simplification as we begin the laborious process of translating this to Java.
2 thoughts on “Understanding Kubernetes’ tools/cache package: part 7”
Comments are closed.