Understanding Kubernetes’ tools/cache package: part 7

In part 6 of this series (you can start at the beginning if you want) we arrived at a reasonably complete structural view of sharedIndexInformer and all the concepts it leads to.

Now it’s time to look at the behavioral aspects of all this.

WordPress appears to scale all image uploads, so you’re going to have to squint a lot.  I’ve also created a Dropbox link to the full-size image.

In the diagram below, I’ve used UML 2.0 constructs.  Specifically, filled arrowheads represent synchronous calls, open arrowheads on solid lines represent asynchronous calls, and open arrowheads on dotted lines represent return values.  Frames labeled alt are, following UML 2.0, conditional branch points.  Lastly I’ve simplified a few things with stereotypes that I hope are reasonably self-explanatory.

ControllerSequence

This diagram starts with someone or something calling the Run function on a sharedIndexInformer.  The Run function creates a new DeltaFIFO, passing it the MetaNamespaceKeyFunc as its KeyFunc, and the sharedIndexInformer‘s Indexer (which is also a KeyLister and a KeyGetter, but you can’t tell from just looking at the code).

Then, the Run function creates a new Controller, which I’ve looked at in some detail in part 2, and calls its run function asynchronously.  The sharedIndexInformer‘s Run function now blocks until explicitly closed.

The new Controller‘s run function creates a new Reflector which for the purposes of this series you can just handwave over: trust that by using its embedded ListerWatcher it accurately puts Kubernetes resources into its store, which happens to be the DeltaFIFO created earlier.

At this point, we have, at a high level, a Rube Goldberg machine that replicates Kubernetes resources into a DeltaFIFO.  If a new Pod shows up in Kubernetes, for example, then it shows up in the DeltaFIFO.

Now the Controller‘s run enters an endless-until-explicitly-stopped loop where it calls the Controller‘s processLoop function every second.

The processLoop function drains the DeltaFIFO of the items placed in it on a separate thread by the Reflector.  The DeltaFIFO, in other words, is (as the name states) a buffered queue where the producer is effectively Kubernetes itself by way of a Reflector, and the consumer is (effectively) whatever function was supplied to the Controller when it was built.

So what function was supplied to this particular Controller when it was built?  The sharedIndexInformer‘s handleDeltas function.

The handleDeltas function is thus a queue drainer, and for many behavioral analysis purposes, we can conveniently ignore all that came before.  We know that when this function is called, it has received a set of additions, changes, full replacements or deletions from (effectively) Kubernetes. Here’s what it looks like:

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Added, Updated:
			isSync := d.Type == Sync
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

Based on the kind of thing that it’s working on, it either adds, updates or deletes the event to or from yet another queue, this time the Indexer that was created by the sharedIndexInformer driving this whole train.  Once that Add, Update or Delete call returns, it then calls the distribute function on the sharedIndexInformer‘s associated sharedProcessor.

The sharedProcessor‘s distribute function forwards on the notification to its appropriate processorListeners, effectively multiplexing the notification out.  So if a given Delta object represents an addition, then the add function of a processorListener will be called.

The processorListener add function simply places the incoming notification onto a synchronous queue (a Go channel) named addChAt this point our notification’s journey is at a temporary end.

Meanwhile, back at the sharedIndexInformer ranch, recall that its Run method is still in play.  The third meaningful thing that it does after creating a DeltaFIFO (that’s being filled and drained now) and a Controller (that’s indirectly filling it and draining it) is to call the run function on its sharedProcessor on a separate thread.

The sharedProcessor run function spawns two more threads and then hangs around until told to shut down.  Thread number one calls the run function on each processorListener the sharedProcessor has.  Thread number two calls the pop method on each processorListener.  We’ll look at the processListener run function first.

The processListener run function at a very high level simply pulls any notification off of its nextCh synchronous queue (Go channel) and, depending on what kind it is, finally calls either OnUpdate, OnAdd or OnDelete on the user-supplied ResourceEventHandler.  It’s simple enough I can reproduce it here:

func (p *processorListener) run() {
	defer utilruntime.HandleCrash()

	for next := range p.nextCh {
		switch notification := next.(type) {
		case updateNotification:
			p.handler.OnUpdate(notification.oldObj, notification.newObj)
		case addNotification:
			p.handler.OnAdd(notification.newObj)
		case deleteNotification:
			p.handler.OnDelete(notification.oldObj)
		default:
			utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
		}
	}
}

So what puts things on the nextCh channel?  The pop function.  This function runs until told to shut down explicitly, and pulls incoming notifications off its addCh synchronous queue (Go channel) and puts them on the nextCh channel.

So what puts things on the addCh channel?  Look up a few paragraphs and recall that this was the logical end to our Kubernetes event’s journey: a notification representing the event is placed on addCh by the processorListener add function, invoked by the sharedProcessor‘s distribute function.

This seems like a good place to stop this post.  I encourage you to check out the full size version of the sequence diagram and perhaps print it out or keep it next to you as you reread the series and a lot of the constructs in this package will make more sense.

In the next post, I hope to find some places for simplification as we begin the laborious process of translating this to Java.

Advertisements

2 thoughts on “Understanding Kubernetes’ tools/cache package: part 7

  1. Pingback: Understanding Kubernetes’ tools/cache package: part 6 | Blame Laird

  2. Pingback: Understanding Kubernetes’ tools/cache package: part 8 | Blame Laird

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s