Understanding Kubernetes’ tools/cache package: part 3

In part 2 we encountered the Controller concept and explored how its lenient contract nevertheless “expects” that processing ability is added to the capabilities offered by its associated Reflector.  You may want to start with part 0 if you haven’t been following along in this series to understand what it’s all about.

In this post, we’ll look in a little more detail at two things:

  1. the de facto standard implementation of the Controller type (which, strictly speaking is only one of many possibilities, but which colors the Controller concept’s expectations, unfortunately)
  2. the notional idea of an informer, and the concrete concept of a SharedInformer, specifically a SharedIndexInformer

controllerstruct-Backed Controller Implementation

We’ve seen that the Controller type is extraordinarily underspecified.  All you need to do to have a Controller is to implement three undocumented functions: Run, HasSynced and LastSyncResourceVersion.  Technically speaking you could implement them any way you want.  Pragmatically speaking, though, the controllerstruct-backed implementation of this type, its de facto standard reference implementation, also shows that there is an implicit requirement that any Controller implementation will, as part of its Run function implementation, process a Queue which it is also assumed to have, which is additionally assumed to be the “target” of a Reflector it is assumed to have.  That’s a lot of assumptions; as we model this in Java we’ll formalize some of them.

Let’s look into the details of queue processing.  First, recall that a Queue is just a Store with the ability to Pop.  See part 2 for more details.

In controller.go, to keep our mental stack clean, let’s just look at the processLoop function in isolation:

// processLoop drains the work queue.
// TODO: Consider doing the processing in parallel. This will require a little thought
// to make sure that we don't end up processing the same object multiple times
// concurrently.
//
// TODO: Plumb through the stopCh here (and down to the queue) so that this can
// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would
// also be helpful.
func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == FIFOClosedError {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

Since this is Go and the function starts with a lowercase p, we know it is private to the controllerstruct-backed implementation.  For now, we’ll trust that this is called from somewhere useful within that implementation.

In this loop, we can sort of see intuitively that an object is popped off the Queue supplied at creation time to this Controller implementation, and is handed to some sort of processor function which was also supplied at creation time to this Controller implementation.  Then, if there was an error, and we’re supposed to retry, we re-enqueue the object.

More specifically, PopProcessFunc(c.config.Process) turns out to be a Go type conversion that, in this case, converts the ProcessFunc stored in the Config supplied to this Controller implementation at creation time to a PopProcessFunc defined by the fifo.go file.  Recall that c.config.Process is of type ProcessFunc.  (The two types seem, to this Go rookie, at any rate, equivalent.)

From all this, and looking at it from a Java programmer’s perspective, we’re starting to see an abstract class—not an interface—emerge.  There are certain things that a Controller implementation is really expected to do, in nearly all cases, whether they’re called out in the contract or not (they’re not).  Specifically, its mandated Run function is, clearly, “supposed to” invoke logic one way or another equivalent to that outlined in the processLoop above.  And it’s expected in one fashion or another that the Controller implementation will be working with a Queue that can be drained.  And there’s an expectation that Run will spawn parallel computation to accomplish all this.  We’ll file this away for now, but now we know in quite some detail exactly what a Controller implementation is expected to do, even if those things aren’t really called out in its contract.

One question that might occur to you at this point is: aren’t we done?  Don’t we have a mechanism to reflect the Kubernetes API server via lists and watches into a cache, and, with what we now know about Controller implementations, the ability to process that cache?  Doesn’t that give us a framework to receive and act on Kubernetes resource specification messages, as discussed in part 0?

For now, accept that the answer is no, and so push everything you’ve learned so far onto your mental stack under the umbrella term of Controller.  Recall briefly that lurking underneath it are concepts like Reflectors, Stores, Queues, ListerWatchers, ProcessFuncs and the like, but for now you can wrap them all up into Controller and free up some space for what comes next: informers.

Informers

You might have picked up on a filename naming pattern at this point: Controller is defined in controller.go; ListerWatcher is defined in listwatch.go and so on.  But you won’t find an informer.go file.  And you won’t find an Informer type.  Nevertheless, there is an informer concept, which you can piece together from logical, but not concrete, extensions of it.

First, let’s look at the NewInformer function, defined in controller.go:

// NewInformer returns a Store and a controller for populating the store
// while also providing event notifications. You should only used the returned
// Store for Get/List operations; Add/Modify/Deletes will cause the event
// notifications to be faulty.
//
// Parameters:
//  * lw is list and watch functions for the source of the resource you want to
//    be informed of.
//  * objType is an object of the type that you expect to receive.
//  * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
//    calls, even if nothing changed). Otherwise, re-list will be delayed as
//    long as possible (until the upstream source closes the watch or times out,
//    or you stop the controller).
//  * h is the object you want notifications sent to.
//
func NewInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
) (Store, Controller) {
	// This will hold the client state, as we know it.
	clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)

	// This will hold incoming changes. Note how we pass clientState in as a
	// KeyLister, that way resync operations will result in the correct set
	// of update/delete deltas.
	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState)

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    lw,
		ObjectType:       objType,
		FullResyncPeriod: resyncPeriod,
		RetryOnError:     false,

		Process: func(obj interface{}) error {
			// from oldest to newest
			for _, d := range obj.(Deltas) {
				switch d.Type {
				case Sync, Added, Updated:
					if old, exists, err := clientState.Get(d.Object); err == nil && exists {
						if err := clientState.Update(d.Object); err != nil {
							return err
						}
						h.OnUpdate(old, d.Object)
					} else {
						if err := clientState.Add(d.Object); err != nil {
							return err
						}
						h.OnAdd(d.Object)
					}
				case Deleted:
					if err := clientState.Delete(d.Object); err != nil {
						return err
					}
					h.OnDelete(d.Object)
				}
			}
			return nil
		},
	}
	return clientState, New(cfg)
}

As you can see, there’s no such thing as an informer.  But you can also see that there’s an implicit construct here: a particular kind of Controller, really, whose associated Queue is something called a DeltaFIFO (which we’ll get to later), that therefore works on deltas, and that has some sort of notion of event handlers that are notified after a given delta is “converted” back into its respective object-verb combination.  We see our old friends ListerWatcher and Config in there as well: squinting a bit, you can see that you are supplying the raw materials for a Reflector and its containing Controller as well as for a custom ProcessFunc that delegates to the event handler stuff by way of an additional transient Store representing the client’s state and permitting this pinball machine to determine whether a particular element being processed was an addition, modification or a deletion.

Physically, the NewInformer function simply leverages Go’s ability to have multiple return types and returns a Controller and its associated Store, not an Informer since there is no such thing, and adds some unenforced requirements about the usage of the Store it returns (i.e. don’t modify it).  A client could “throw away” the Controller and use just the returned Store, maybe in conjunction with the event handler it supplied, or maybe not, and that Store would serve as a convenient cache of the Kubernetes API server.

Anyway, from all this we can posit that conceptually an informer “is a” Controller together with the ability to distribute its Queue-related operations to an appropriate event handler.  This will help us when we model this in Java, and, to Java programmers, should start to look a little bit like good old JavaBeans event listeners.  This is a particularly good insight as ultimately where we’d like to get to is for some Java programmer to write some sort of method that is called when an addition, modification or deletion is found, without that programmer having to worry about all the multithreaded queue manipulation we’ve encountered so far.  It also helps because this sort of thing is one of those cases where Java can end up helping us write simpler code.

As you might expect, there are different concrete kinds of (definitionally notional) informers.  We’ll look at one in particular, but know that there are others.  The one that we’ll look at is called a SharedIndexInformer, and can be found in the shared_informer.go file.

SharedIndexInformer

A SharedIndexInformer is, itself, a SharedInformer implementation that adds the ability to index its contents.  I mention this with all of its forward references just to set the stage: you should be asking questions like: what’s shared?  What’s being indexed?  Why do we need to share things? and so on.

Let’s look first at the SharedInformer contract:

// SharedInformer has a shared data cache and is capable of distributing notifications for changes
// to the cache to multiple listeners who registered via AddEventHandler. If you use this, there is
// one behavior change compared to a standard Informer.  When you receive a notification, the cache
// will be AT LEAST as fresh as the notification, but it MAY be more fresh.  You should NOT depend
// on the contents of the cache exactly matching the notification you've received in handler
// functions.  If there was a create, followed by a delete, the cache may NOT have your item.  This
// has advantages over the broadcaster since it allows us to share a common cache across many
// controllers. Extending the broadcaster would have required us keep duplicate caches for each
// watch.
type SharedInformer interface {
	// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
	// period.  Events to a single handler are delivered sequentially, but there is no coordination
	// between different handlers.
	AddEventHandler(handler ResourceEventHandler)
	// AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
	// specified resync period.  Events to a single handler are delivered sequentially, but there is
	// no coordination between different handlers.
	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
	// GetStore returns the Store.
	GetStore() Store
	// GetController gives back a synthetic interface that "votes" to start the informer
	GetController() Controller
	// Run starts the shared informer, which will be stopped when stopCh is closed.
	Run(stopCh <-chan struct{})
	// HasSynced returns true if the shared informer's store has synced.
	HasSynced() bool
	// LastSyncResourceVersion is the resource version observed when last synced with the underlying
	// store. The value returned is not synchronized with access to the underlying store and is not
	// thread-safe.
	LastSyncResourceVersion() string
}

This tells us what any SharedInformer must do. Recall that conceptually an informer, which has no Go construct to represent it, despite the documentation’s mention of a “standard Informer“, which does not exist, is a combination of a Controller and an event distribution mechanism. A SharedInformer is a kind of informer that can support many event handlers. You may also notice that it happens to have the same function signatures as Controller, which in Go terms means that it behaves like a Controller, or, for all intents and purposes, effectively is one.  Lastly, the Shared part of the term SharedInformer somewhat clumsily refers to the fact that since this particular informer construct can have many event handlers, then the single cache that it is built with—the Queue housed by the Controller it was built with, in most cases—becomes “shared” between those handlers as a result and by default.

All that SharedIndexInformer adds to the picture is the ability to locate items in its cache by various keys:

type SharedIndexInformer interface {
	SharedInformer
	// AddIndexers add indexers to the informer before it starts.
	AddIndexers(indexers Indexers) error
	GetIndexer() Indexer
}

Here you need to pay very close attention to singular and plural nouns.  Note that what you add is an Indexers, plural, and what you get is an Indexer, singular.  Let’s look at what those are.

We’ll start with Indexers, the plural, since oddly enough it turns out to be the singular item, and is not, most notably, a bunch of Indexer instances!

This is actually simply a kind of map (singular) and can be found in the index.go file:

// Indexers maps a name to a IndexFunc
type Indexers map[string]IndexFunc

An IndexFunc in turn is just a mapping function:

// IndexFunc knows how to provide an indexed value for an object.
type IndexFunc func(obj interface{}) ([]string, error)

So hand it a Kubernetes resource, for example, and it will give you back a set (I’m assuming) of string values corresponding to it in some way.

An Indexers, a singular object with a plural name, is therefore a simple map of such functions each of which is, in turn, indexed under its own string key somewhere.

So you can add a bunch of these maps to a SharedIndexInformer for its usage.

An Indexer, a singular noun describing an aggregate concept (!), is a collection (!) of such Indexers instances with some additional aggregate behavior layered on top:

// Indexer is a storage interface that lets you list objects using multiple indexing functions
type Indexer interface {
	Store
	// Retrieve list of objects that match on the named indexing function
	Index(indexName string, obj interface{}) ([]interface{}, error)
	// IndexKeys returns the set of keys that match on the named indexing function.
	IndexKeys(indexName, indexKey string) ([]string, error)
	// ListIndexFuncValues returns the list of generated values of an Index func
	ListIndexFuncValues(indexName string) []string
	// ByIndex lists object that match on the named indexing function with the exact key
	ByIndex(indexName, indexKey string) ([]interface{}, error)
	// GetIndexer return the indexers
	GetIndexers() Indexers

	// AddIndexers adds more indexers to this store.  If you call this after you already have data
	// in the store, the results are undefined.
	AddIndexers(newIndexers Indexers) error
}

Clear as mud, right?  So from a SharedIndexInformer you can get its one true Indexer that is logically comprised of various Indexers instances added by way (hopefully) of that SharedIndexInformer‘s AddIndexers function, although it would also be possible to add them from the Indexer directly.

One other extremely important thing to notice here is that an Indexer is also a Store!  But pay very careful attention to how it is used in its capacity as a Store.

Specifically, let’s first recall that a non-shared informer—which has no Go language reification—is conceptually a combination of a Controller and a Store.  We have seen, for example, that the NewInformer function returns both a Controller and a Store that is attached to that Controller; the combination gives you an automatically populating cache.

In SharedIndexInformer, the Store into which Kubernetes API server events are reflected is a DeltaFIFO, not an Indexer.  But an Indexer is supplied to that DeltaFIFOand is the return value of the SharedIndexInformer‘s GetStore function!  This tells us more about the implied contract of GetStore: clearly the Store it returns must not be used for modification!

Another extremely important thing to note is that any Store is also a KeyListerGetter, a KeyLister and a KeyGetter.   A KeyListerGetter is a combination of the KeyLister and KeyGetter types.  A KeyLister is anything that can list its keys, and a KeyGetter is not something that can get a key, but something that houses other things that can be retrieved by key (like some sort of map).

So rewinding a bit, when SharedIndexInformer creates a new DeltaFIFO to use as its Store, and, in turn, supplies an Indexer to that DeltaFIFO, it is supplying that Indexer only in its capacity as a KeyListerGetter.  Similarly, its GetStore method probably should really return something closer to a KeyListerGetter than an actual Store, since calling its modification methods is prohibited.

So clearly to understand more here, we’re going to have to dive in to DeltaFIFO, which we’ve now seen is at the heart of all of this, and will be the subject of the next post.

Let’s review what we’ve come up with here so far:

  • A Controller really kind of has to behave like the reference controllerstruct-backed Controller implementation.  Specifically it needs to manage a Queue, which is often a DeltaFIFO, and both populate and drain it, and incorporate an event listener mechanism.
  • The idea of an informer exists, which is a combination of a Controller and the Store that it’s hooked up to, but does not physically exist except in “shared” form.
  • A SharedIndexInformer instantiates the concept of an informer with multiple event handlers and, by definition, a single Queue, a DeltaFIFO, that they share.

We might model this visually as follows:

KubernetesControllerBlogPart3.png

As always, we’ll refine and refactor this model as we go along.

Onwards to deltas and DeltaFIFO.

Advertisements

3 thoughts on “Understanding Kubernetes’ tools/cache package: part 3

  1. Pingback: Understanding Kubernetes’ tools/cache package: part 4 | Blame Laird

  2. Pingback: Understanding Kubernetes’ tools/cache package: part 2 | Blame Laird

  3. Pingback: Understanding Kubernetes’ tools/cache package: part 6 | Blame Laird

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s