Understanding Kubernetes’ tools/cache package: part 4

In part 3, we dug into the real contract that Controller implementations must obey in order to be useful, and looked into what informers and SharedIndexInformers are.

In this post, we’ll look at DeltaFIFO, since it is at the heart of a lot of this.  We’ll do this mostly in isolation, and then will attempt, probably in a later post, to “plug it back in” to the overall pinball machine so we can make sense of it in a larger context.

From the name DeltaFIFO, we can surmise we’ll be talking about diffs in some capacity, and in the context of queuing.  Let’s look at the contract:

// DeltaFIFO is like FIFO, but allows you to process deletes.
//
// DeltaFIFO is a producer-consumer queue, where a Reflector is
// intended to be the producer, and the consumer is whatever calls
// the Pop() method.
//
// DeltaFIFO solves this use case:
//  * You want to process every object change (delta) at most once.
//  * When you process an object, you want to see everything
//    that's happened to it since you last processed it.
//  * You want to process the deletion of objects.
//  * You might want to periodically reprocess objects.
//
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
// interface{} to satisfy the Store/Queue interfaces, but it
// will always return an object of type Deltas.
//
// A note on threading: If you call Pop() in parallel from multiple
// threads, you could end up with multiple threads processing slightly
// different versions of the same object.
//
// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
// to list keys that are "known", for the purpose of figuring out which
// items have been deleted when Replace() or Delete() are called. The deleted
// object will be included in the DeleteFinalStateUnknown markers. These objects
// could be stale.
//
// You may provide a function to compress deltas (e.g., represent a
// series of Updates as a single Update).
type DeltaFIFO struct {
	// lock/cond protects access to 'items' and 'queue'.
	lock sync.RWMutex
	cond sync.Cond

	// We depend on the property that items in the set are in
	// the queue and vice versa, and that all Deltas in this
	// map have at least one Delta.
	items map[string]Deltas
	queue []string

	// populated is true if the first batch of items inserted by Replace() has been populated
	// or Delete/Add/Update was called first.
	populated bool
	// initialPopulationCount is the number of items inserted by the first call of Replace()
	initialPopulationCount int

	// keyFunc is used to make the key used for queued item
	// insertion and retrieval, and should be deterministic.
	keyFunc KeyFunc

	// deltaCompressor tells us how to combine two or more
	// deltas. It may be nil.
	deltaCompressor DeltaCompressor

	// knownObjects list keys that are "known", for the
	// purpose of figuring out which items have been deleted
	// when Replace() or Delete() is called.
	knownObjects KeyListerGetter

	// Indication the queue is closed.
	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
	// Currently, not used to gate any of CRED operations.
	closed     bool
	closedLock sync.Mutex
}

var (
	_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
)

To boil this down further, let’s conceptually for now ignore the threading concerns, “delta compression” as represented by the deltaCompressor field, and “queue is closed” concerns.

What we’re left with is a type about which we can say:

  • A DeltaFIFO is a Queue of Deltas instances.  It turns out a Deltas type is just a collection of Delta instances.  We’ll see what those are in a moment.
  • The Deltas instances a DeltaFIFO holds are “keyable”.  A string key can be extracted from a given Deltas instance by way of a KeyFunc.
  • Certain Deltas instances inside the DeltaFIFO can be “known”.

Let’s look at what a Delta type is:

// Delta is the type stored by a DeltaFIFO. It tells you what change
// happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
//     state of the object before it was deleted.
type Delta struct {
	Type   DeltaType
	Object interface{}
}

A Delta, in other words, is an event! It is the combination of a verb (the DeltaType) and the payload (the object stored in the Object field).

So a DeltaFIFO is a thread-safe Queue of collections (Deltas) of events (Delta), indexed by some sort of key, where each event (Delta) in the collection represents the addition, modification or removal of a Kubernetes resource.

Or, to put it another way, it’s a queue that turns invocations of a function of the form Add(someObject) into effective invocations of a hypothetical AddEvent(ObjectAdded, someObject) function, and invocations of a function of the form Update(someObject) into effective invocations of a hypothetical AddEvent(ObjectUpdated, someObject) function, and so on.

It’s worth noting that in some sense we’ve kind of come full circle.  Recall that it’s possible to establish Kubernetes watches on lists of resources.  When you do this, you get a stream of WatchEvents, which represent additions, modifications or removals of Kubernetes resources.  So what are we doing now with a DeltaFIFO and Deltas and so on?  Why not just work off WatchEvents directly?

Well, also recall that you need to set up watches in conjunction with an initial listing of Kubernetes resource instances, and that listing is not comprised of WatchEvents.  So from a very high level, we’ve coalesced the concept of a list operation and the concept of a watch operation and expressed them both in terms of Delta instances, which all end up in this DeltaFIFO construct.  This can then be used to distribute events, JavaBean-style, to event handlers.

Let’s see if we can put this back in a limited amount of context.  We’re talking about a queue, so you should be able to add things to it.  It is fundamentally a queue of Delta instances (by way of Deltas instances) internally.  So how do you build a Delta?

As it turns out, a DeltaFIFO builds one for you:

// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *DeltaFIFO) Add(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	return f.queueActionLocked(Added, obj)
}

[snip]
// queueActionLocked appends to the delta list for the object, calling
// f.deltaCompressor if needed. Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}

	// If object is supposed to be deleted (last event is Deleted),
	// then we should ignore Sync events, because it would result in
	// recreation of this object.
	if actionType == Sync && f.willObjectBeDeletedLocked(id) {
		return nil
	}

	newDeltas := append(f.items[id], Delta{actionType, obj})
	newDeltas = dedupDeltas(newDeltas)
	if f.deltaCompressor != nil {
		newDeltas = f.deltaCompressor.Compress(newDeltas)
	}

	_, exists := f.items[id]
	if len(newDeltas) > 0 {
		if !exists {
			f.queue = append(f.queue, id)
		}
		f.items[id] = newDeltas
		f.cond.Broadcast()
	} else if exists {
		// The compression step removed all deltas, so
		// we need to remove this from our map (extra items
		// in the queue are ignored if they are not in the
		// map).
		delete(f.items, id)
	}
	return nil
}

So from a Java modeling standpoint, we have to realize that any generic type that we use in modeling a DeltaFIFO actually has to be two generic types: one would be T, the actual type of the Kubernetes resource being affected, and two would be something like Delta<T>, which would be the “event” type actually stored internally by the queue.

The innards of DeltaFIFO model a set by storing a map of keys as well as an internal slice of objects.  From this we can deduce that duplicates are not permitted in queues of this kind, so it has set semantics.  From a Java perspective, this is an important insight as underneath the covers we’ll likely use some kind of Set implementation.  Also, in Java, Objects have an equals(Object) method, which might allow us to simplify the KeyFunc semantics as well.

Visually, here is a simplistic model of the overall structure:

KubernetesControllerBlogPart4.png

As always, we’ll refine and refactor this model as we go, but it’s useful to sketch out some of the concepts.

In the next post, we’ll try to put some of these pieces back together again.

Advertisements

Understanding Kubernetes’ tools/cache package: part 3

In part 2 we encountered the Controller concept and explored how its lenient contract nevertheless “expects” that processing ability is added to the capabilities offered by its associated Reflector.  You may want to start with part 0 if you haven’t been following along in this series to understand what it’s all about.

In this post, we’ll look in a little more detail at two things:

  1. the de facto standard implementation of the Controller type (which, strictly speaking is only one of many possibilities, but which colors the Controller concept’s expectations, unfortunately)
  2. the notional idea of an informer, and the concrete concept of a SharedInformer, specifically a SharedIndexInformer

controllerstruct-Backed Controller Implementation

We’ve seen that the Controller type is extraordinarily underspecified.  All you need to do to have a Controller is to implement three undocumented functions: Run, HasSynced and LastSyncResourceVersion.  Technically speaking you could implement them any way you want.  Pragmatically speaking, though, the controllerstruct-backed implementation of this type, its de facto standard reference implementation, also shows that there is an implicit requirement that any Controller implementation will, as part of its Run function implementation, process a Queue which it is also assumed to have, which is additionally assumed to be the “target” of a Reflector it is assumed to have.  That’s a lot of assumptions; as we model this in Java we’ll formalize some of them.

Let’s look into the details of queue processing.  First, recall that a Queue is just a Store with the ability to Pop.  See part 2 for more details.

In controller.go, to keep our mental stack clean, let’s just look at the processLoop function in isolation:

// processLoop drains the work queue.
// TODO: Consider doing the processing in parallel. This will require a little thought
// to make sure that we don't end up processing the same object multiple times
// concurrently.
//
// TODO: Plumb through the stopCh here (and down to the queue) so that this can
// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would
// also be helpful.
func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == FIFOClosedError {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

Since this is Go and the function starts with a lowercase p, we know it is private to the controllerstruct-backed implementation.  For now, we’ll trust that this is called from somewhere useful within that implementation.

In this loop, we can sort of see intuitively that an object is popped off the Queue supplied at creation time to this Controller implementation, and is handed to some sort of processor function which was also supplied at creation time to this Controller implementation.  Then, if there was an error, and we’re supposed to retry, we re-enqueue the object.

More specifically, PopProcessFunc(c.config.Process) turns out to be a Go type conversion that, in this case, converts the ProcessFunc stored in the Config supplied to this Controller implementation at creation time to a PopProcessFunc defined by the fifo.go file.  Recall that c.config.Process is of type ProcessFunc.  (The two types seem, to this Go rookie, at any rate, equivalent.)

From all this, and looking at it from a Java programmer’s perspective, we’re starting to see an abstract class—not an interface—emerge.  There are certain things that a Controller implementation is really expected to do, in nearly all cases, whether they’re called out in the contract or not (they’re not).  Specifically, its mandated Run function is, clearly, “supposed to” invoke logic one way or another equivalent to that outlined in the processLoop above.  And it’s expected in one fashion or another that the Controller implementation will be working with a Queue that can be drained.  And there’s an expectation that Run will spawn parallel computation to accomplish all this.  We’ll file this away for now, but now we know in quite some detail exactly what a Controller implementation is expected to do, even if those things aren’t really called out in its contract.

One question that might occur to you at this point is: aren’t we done?  Don’t we have a mechanism to reflect the Kubernetes API server via lists and watches into a cache, and, with what we now know about Controller implementations, the ability to process that cache?  Doesn’t that give us a framework to receive and act on Kubernetes resource specification messages, as discussed in part 0?

For now, accept that the answer is no, and so push everything you’ve learned so far onto your mental stack under the umbrella term of Controller.  Recall briefly that lurking underneath it are concepts like Reflectors, Stores, Queues, ListerWatchers, ProcessFuncs and the like, but for now you can wrap them all up into Controller and free up some space for what comes next: informers.

Informers

You might have picked up on a filename naming pattern at this point: Controller is defined in controller.go; ListerWatcher is defined in listwatch.go and so on.  But you won’t find an informer.go file.  And you won’t find an Informer type.  Nevertheless, there is an informer concept, which you can piece together from logical, but not concrete, extensions of it.

First, let’s look at the NewInformer function, defined in controller.go:

// NewInformer returns a Store and a controller for populating the store
// while also providing event notifications. You should only used the returned
// Store for Get/List operations; Add/Modify/Deletes will cause the event
// notifications to be faulty.
//
// Parameters:
//  * lw is list and watch functions for the source of the resource you want to
//    be informed of.
//  * objType is an object of the type that you expect to receive.
//  * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
//    calls, even if nothing changed). Otherwise, re-list will be delayed as
//    long as possible (until the upstream source closes the watch or times out,
//    or you stop the controller).
//  * h is the object you want notifications sent to.
//
func NewInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
) (Store, Controller) {
	// This will hold the client state, as we know it.
	clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)

	// This will hold incoming changes. Note how we pass clientState in as a
	// KeyLister, that way resync operations will result in the correct set
	// of update/delete deltas.
	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState)

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    lw,
		ObjectType:       objType,
		FullResyncPeriod: resyncPeriod,
		RetryOnError:     false,

		Process: func(obj interface{}) error {
			// from oldest to newest
			for _, d := range obj.(Deltas) {
				switch d.Type {
				case Sync, Added, Updated:
					if old, exists, err := clientState.Get(d.Object); err == nil && exists {
						if err := clientState.Update(d.Object); err != nil {
							return err
						}
						h.OnUpdate(old, d.Object)
					} else {
						if err := clientState.Add(d.Object); err != nil {
							return err
						}
						h.OnAdd(d.Object)
					}
				case Deleted:
					if err := clientState.Delete(d.Object); err != nil {
						return err
					}
					h.OnDelete(d.Object)
				}
			}
			return nil
		},
	}
	return clientState, New(cfg)
}

As you can see, there’s no such thing as an informer.  But you can also see that there’s an implicit construct here: a particular kind of Controller, really, whose associated Queue is something called a DeltaFIFO (which we’ll get to later), that therefore works on deltas, and that has some sort of notion of event handlers that are notified after a given delta is “converted” back into its respective object-verb combination.  We see our old friends ListerWatcher and Config in there as well: squinting a bit, you can see that you are supplying the raw materials for a Reflector and its containing Controller as well as for a custom ProcessFunc that delegates to the event handler stuff by way of an additional transient Store representing the client’s state and permitting this pinball machine to determine whether a particular element being processed was an addition, modification or a deletion.

Physically, the NewInformer function simply leverages Go’s ability to have multiple return types and returns a Controller and its associated Store, not an Informer since there is no such thing, and adds some unenforced requirements about the usage of the Store it returns (i.e. don’t modify it).  A client could “throw away” the Controller and use just the returned Store, maybe in conjunction with the event handler it supplied, or maybe not, and that Store would serve as a convenient cache of the Kubernetes API server.

Anyway, from all this we can posit that conceptually an informer “is a” Controller together with the ability to distribute its Queue-related operations to an appropriate event handler.  This will help us when we model this in Java, and, to Java programmers, should start to look a little bit like good old JavaBeans event listeners.  This is a particularly good insight as ultimately where we’d like to get to is for some Java programmer to write some sort of method that is called when an addition, modification or deletion is found, without that programmer having to worry about all the multithreaded queue manipulation we’ve encountered so far.  It also helps because this sort of thing is one of those cases where Java can end up helping us write simpler code.

As you might expect, there are different concrete kinds of (definitionally notional) informers.  We’ll look at one in particular, but know that there are others.  The one that we’ll look at is called a SharedIndexInformer, and can be found in the shared_informer.go file.

SharedIndexInformer

A SharedIndexInformer is, itself, a SharedInformer implementation that adds the ability to index its contents.  I mention this with all of its forward references just to set the stage: you should be asking questions like: what’s shared?  What’s being indexed?  Why do we need to share things? and so on.

Let’s look first at the SharedInformer contract:

// SharedInformer has a shared data cache and is capable of distributing notifications for changes
// to the cache to multiple listeners who registered via AddEventHandler. If you use this, there is
// one behavior change compared to a standard Informer.  When you receive a notification, the cache
// will be AT LEAST as fresh as the notification, but it MAY be more fresh.  You should NOT depend
// on the contents of the cache exactly matching the notification you've received in handler
// functions.  If there was a create, followed by a delete, the cache may NOT have your item.  This
// has advantages over the broadcaster since it allows us to share a common cache across many
// controllers. Extending the broadcaster would have required us keep duplicate caches for each
// watch.
type SharedInformer interface {
	// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
	// period.  Events to a single handler are delivered sequentially, but there is no coordination
	// between different handlers.
	AddEventHandler(handler ResourceEventHandler)
	// AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
	// specified resync period.  Events to a single handler are delivered sequentially, but there is
	// no coordination between different handlers.
	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
	// GetStore returns the Store.
	GetStore() Store
	// GetController gives back a synthetic interface that "votes" to start the informer
	GetController() Controller
	// Run starts the shared informer, which will be stopped when stopCh is closed.
	Run(stopCh <-chan struct{})
	// HasSynced returns true if the shared informer's store has synced.
	HasSynced() bool
	// LastSyncResourceVersion is the resource version observed when last synced with the underlying
	// store. The value returned is not synchronized with access to the underlying store and is not
	// thread-safe.
	LastSyncResourceVersion() string
}

This tells us what any SharedInformer must do. Recall that conceptually an informer, which has no Go construct to represent it, despite the documentation’s mention of a “standard Informer“, which does not exist, is a combination of a Controller and an event distribution mechanism. A SharedInformer is a kind of informer that can support many event handlers. You may also notice that it happens to have the same function signatures as Controller, which in Go terms means that it behaves like a Controller, or, for all intents and purposes, effectively is one.  Lastly, the Shared part of the term SharedInformer somewhat clumsily refers to the fact that since this particular informer construct can have many event handlers, then the single cache that it is built with—the Queue housed by the Controller it was built with, in most cases—becomes “shared” between those handlers as a result and by default.

All that SharedIndexInformer adds to the picture is the ability to locate items in its cache by various keys:

type SharedIndexInformer interface {
	SharedInformer
	// AddIndexers add indexers to the informer before it starts.
	AddIndexers(indexers Indexers) error
	GetIndexer() Indexer
}

Here you need to pay very close attention to singular and plural nouns.  Note that what you add is an Indexers, plural, and what you get is an Indexer, singular.  Let’s look at what those are.

We’ll start with Indexers, the plural, since oddly enough it turns out to be the singular item, and is not, most notably, a bunch of Indexer instances!

This is actually simply a kind of map (singular) and can be found in the index.go file:

// Indexers maps a name to a IndexFunc
type Indexers map[string]IndexFunc

An IndexFunc in turn is just a mapping function:

// IndexFunc knows how to provide an indexed value for an object.
type IndexFunc func(obj interface{}) ([]string, error)

So hand it a Kubernetes resource, for example, and it will give you back a set (I’m assuming) of string values corresponding to it in some way.

An Indexers, a singular object with a plural name, is therefore a simple map of such functions each of which is, in turn, indexed under its own string key somewhere.

So you can add a bunch of these maps to a SharedIndexInformer for its usage.

An Indexer, a singular noun describing an aggregate concept (!), is a collection (!) of such Indexers instances with some additional aggregate behavior layered on top:

// Indexer is a storage interface that lets you list objects using multiple indexing functions
type Indexer interface {
	Store
	// Retrieve list of objects that match on the named indexing function
	Index(indexName string, obj interface{}) ([]interface{}, error)
	// IndexKeys returns the set of keys that match on the named indexing function.
	IndexKeys(indexName, indexKey string) ([]string, error)
	// ListIndexFuncValues returns the list of generated values of an Index func
	ListIndexFuncValues(indexName string) []string
	// ByIndex lists object that match on the named indexing function with the exact key
	ByIndex(indexName, indexKey string) ([]interface{}, error)
	// GetIndexer return the indexers
	GetIndexers() Indexers

	// AddIndexers adds more indexers to this store.  If you call this after you already have data
	// in the store, the results are undefined.
	AddIndexers(newIndexers Indexers) error
}

Clear as mud, right?  So from a SharedIndexInformer you can get its one true Indexer that is logically comprised of various Indexers instances added by way (hopefully) of that SharedIndexInformer‘s AddIndexers function, although it would also be possible to add them from the Indexer directly.

One other extremely important thing to notice here is that an Indexer is also a Store!  But pay very careful attention to how it is used in its capacity as a Store.

Specifically, let’s first recall that a non-shared informer—which has no Go language reification—is conceptually a combination of a Controller and a Store.  We have seen, for example, that the NewInformer function returns both a Controller and a Store that is attached to that Controller; the combination gives you an automatically populating cache.

In SharedIndexInformer, the Store into which Kubernetes API server events are reflected is a DeltaFIFO, not an Indexer.  But an Indexer is supplied to that DeltaFIFOand is the return value of the SharedIndexInformer‘s GetStore function!  This tells us more about the implied contract of GetStore: clearly the Store it returns must not be used for modification!

Another extremely important thing to note is that any Store is also a KeyListerGetter, a KeyLister and a KeyGetter.   A KeyListerGetter is a combination of the KeyLister and KeyGetter types.  A KeyLister is anything that can list its keys, and a KeyGetter is not something that can get a key, but something that houses other things that can be retrieved by key (like some sort of map).

So rewinding a bit, when SharedIndexInformer creates a new DeltaFIFO to use as its Store, and, in turn, supplies an Indexer to that DeltaFIFO, it is supplying that Indexer only in its capacity as a KeyListerGetter.  Similarly, its GetStore method probably should really return something closer to a KeyListerGetter than an actual Store, since calling its modification methods is prohibited.

So clearly to understand more here, we’re going to have to dive in to DeltaFIFO, which we’ve now seen is at the heart of all of this, and will be the subject of the next post.

Let’s review what we’ve come up with here so far:

  • A Controller really kind of has to behave like the reference controllerstruct-backed Controller implementation.  Specifically it needs to manage a Queue, which is often a DeltaFIFO, and both populate and drain it, and incorporate an event listener mechanism.
  • The idea of an informer exists, which is a combination of a Controller and the Store that it’s hooked up to, but does not physically exist except in “shared” form.
  • A SharedIndexInformer instantiates the concept of an informer with multiple event handlers and, by definition, a single Queue, a DeltaFIFO, that they share.

We might model this visually as follows:

KubernetesControllerBlogPart3.png

As always, we’ll refine and refactor this model as we go along.

Onwards to deltas and DeltaFIFO.

Understanding Kubernetes’ tools/cache package: part 2

In the previous post in this long-running series, we looked at some of the foundations underlying Kubernetes controllers, and started looking into the concepts behind the tools/cache package, specifically ListerWatcher, Store and Reflector.  In this post we’ll look at the actual concept of a Controller.

So far, I’ve been careful about capitalization.  I’ve written “Kubernetes controller”, not “Kubernetes Controller” or “Kubernetes Controller“.  That’s been on purpose.  That’s in part because the tools/cache package has a Controller type, which logically sits in front of Reflector.  You can see for yourself:

// Controller is a generic controller framework.
type controller struct {
	config         Config
	reflector      *Reflector
	reflectorMutex sync.RWMutex
	clock          clock.Clock
}

type Controller interface {
	Run(stopCh <-chan struct{})
	HasSynced() bool
	LastSyncResourceVersion() string
}

Note, interestingly, that strictly speaking all you have to do to implement the Controller type is to supply a Run() function (for other Java programmers, the stopCh “stop channel” is Go’s way of (essentially) allowing interruption), a HasSynced() function that returns true if, er, synchronization has been accomplished (we’ll look into what that means later), and a LastSyncResourceVersion() function, which returns the resourceVersion of the Kubernetes list of resources being watched and reflected.

An interesting point here is that this means that whereas Reflector was a generic interface completely decoupled from Kubernetes, this interface is conceptually coupled to Kubernetes (note the inclusion of the term Resource and the concept of a resource version, both of which are concepts from the Kubernetes ontology).  This observation can help us later on with refining our Java model.

Next, look at the controller struct, which is the state-bearing portion of a Controller implementation.  It takes in a Config representing its configuration, and has a “slot” for a Reflector, along with unimportant-to-the-end-user bits related to testing and thread safety.

So what does a Controller do, exactly, that a Reflector doesn’t already do?

A clue to the answer lies in the Config type, which is used as an implementation detail by only one particular implementation of the Controller type:

// Config contains all the settings for a Controller.
type Config struct {
	// The queue for your objects; either a FIFO or
	// a DeltaFIFO. Your Process() function should accept
	// the output of this Queue's Pop() method.
	Queue

	// Something that can list and watch your objects.
	ListerWatcher

	// Something that can process your objects.
	Process ProcessFunc

	// The type of your objects.
	ObjectType runtime.Object

	// Reprocess everything at least this often.
	// Note that if it takes longer for you to clear the queue than this
	// period, you will end up processing items in the order determined
	// by FIFO.Replace(). Currently, this is random. If this is a
	// problem, we can change that replacement policy to append new
	// things to the end of the queue instead of replacing the entire
	// queue.
	FullResyncPeriod time.Duration

	// ShouldResync, if specified, is invoked when the controller's reflector determines the next
	// periodic sync should occur. If this returns true, it means the reflector should proceed with
	// the resync.
	ShouldResync ShouldResyncFunc

	// If true, when Process() returns an error, re-enqueue the object.
	// TODO: add interface to let you inject a delay/backoff or drop
	//       the object completely if desired. Pass the object in
	//       question to this interface as a parameter.
	RetryOnError bool
}

// ShouldResyncFunc is a type of function that indicates if a reflector should perform a
// resync or not. It can be used by a shared informer to support multiple event handlers with custom
// resync periods.
type ShouldResyncFunc func() bool

// ProcessFunc processes a single object.
type ProcessFunc func(obj interface{}) error

(As you read the excerpt above, bear in mind that it is in the controller.go file, but nevertheless contains lots of documentation forward references to types and concepts from other files we haven’t encountered yet.)

So not all Controller implementations have to use this.  In fact, Controller is completely undocumented!  But realistically, the only Controller implementation that matters, the one returned by the tersely-named New() function, backed by a controller structdoes use it, so we’d better understand it thoroughly.

The first thing to notice is that a Config struct contains a Queue.  We can track down the definition for Queue in fifo.go:

// Queue is exactly like a Store, but has a Pop() method too.
type Queue interface {
	Store

	// Pop blocks until it has something to process.
	// It returns the object that was process and the result of processing.
	// The PopProcessFunc may return an ErrRequeue{...} to indicate the item
	// should be requeued before releasing the lock on the queue.
	Pop(PopProcessFunc) (interface{}, error)

	// AddIfNotPresent adds a value previously
	// returned by Pop back into the queue as long
	// as nothing else (presumably more recent)
	// has since been added.
	AddIfNotPresent(interface{}) error

	// Return true if the first batch of items has been popped
	HasSynced() bool

	// Close queue
	Close()
}

So loosely speaking any Queue implementation must also satisfy the Store contract.  In Java terms, this means a hypothetical Queue interface would extend Store.  Let’s file that away for later.

We can also see some concept leakage here: recall that Controller implementations must have a HasSynced function, but its purpose and reason for being are undocumented.  When we look back at the controllerstruct-backed implementation of Controller, one possible implementation of the Controller type, we can see that the implementation of its HasSynced function merely delegates to that of the Queue contained by its Config.  So there is a tacit assumption that a Controller implementation will most likely be backed by a Queue, though this is not strictly speaking required, since that would be the easiest way to implement the HasSynced function.  This also serves as the only documentation we’re going to get about what that function is supposed to do: Return true if the first batch of items has been popped.

Back to the Config.  It also contains a ListerWatcher.  Hey!  We’ve seen one of these before.  We had realized that it is a core component of a Reflector.  So why doesn’t a Config merely have a Reflector?  Why is encapsulation broken here—isn’t ListerWatcher basically an implementation detail of a Reflector?  Yes, and there doesn’t seem to be a good reason.  We can tell from some source code later on that when the controllerstruct-backed implementation of Controller‘s Run function is called, only one possible implementation of such a function, it creates a Reflector just-in-time using the ListerWatcher housed by the Config.  Why the Reflector isn’t passed in as part of the Config is an open question.  At any rate, logically speaking, part of a controllerstruct-backed implementation of the Controller interface is a Reflector.

Next up is the first actually interesting part that we really haven’t encountered before, which gives us a better idea of what a Controller is supposed to do: the ProcessFunc.  A ProcessFunc appears, from its documentation, to do something to a Kubernetes resource:

ProcessFunc processes a single object.

So even from this little bit of documentation we can see that ultimately a Controller implementation that happens to use a Config (remember, it’s not required to) will not only cause caching of Kubernetes resources into a Store (remember, it’s not required to), but will presumably work on objects found in that Store as well.  These are shaky assumptions, and not enforced by any contract, but turn out to be quite important, not just to the de facto standard implementation of the Controller type (the controllerstruct-backed one that uses a Config), but to the implied, but not technically specified, contract of the Controller type itself.

Summed up, most Controller implementations probably should use a Reflector to populate a Queue (a particular kind of Store) with Kubernetes resources of a particular kind, and then also processes the contents of that Queue, presumably by Popping objects off of it.

Again, it is worth noting that this summary is based on one particular implementation of the type (the de facto standard controllerstruct-backed one), and not on the type’s contract itself, but elsewhere in the package you will be able to see that this is expected, if not enforced, behavior of any Controller implementation.

We might visually model all this like so:

KubernetesControllerBlogPart2.png

Here, a Controller uses a Reflector to populate a Queue, and has a protected process method that knows how to do something with a given object.  It also has a protected​ shouldResync() method, a public hasSynced() method, and the ability to report what the last Kubernetes resource version was after a synchronization.  We’ll greatly refine and refactor this model over time.

In the next post, we’ll look at informers and shared informers which build on top of these foundations, again with an eye towards modeling all this idiomatically in Java.

 

Understanding Kubernetes’ tools/cache package: part 1

Part 0 gave the general context for what we’re going to do in the next series of posts.  Here we’ll look at the foundations underlying any program that wants to react to changes in a Kubernetes cluster (a Kubernetes controller).

To understand how to write a Kubernetes controller—in my case, in Java—we have to understand what the source of events and truth is.  If (as noted in the first post in this series) Kubernetes can be seen as a distributed messaging system where the messages you react to are desired state specifications, then where do the “messages” come from?

Any time you kubectl create or kubectl apply or kubectl delete something, you are publishing a message.  The message, the Kubernetes resource you’re creating, applying or deleting, has three high-level parts to it: the kind, which says what kind of thing the message is about, the specification, which describes what you want to be true of the thing you’re creating or applying or deleting, and the status, which describes how things really are.  Typically if you’re editing YAML and using kubectl and are in other ways an ordinary end user, you are only concerned with the specification (since only Kubernetes itself can tell you how things really are).

So to whom are you publishing the resource specification message?  The short answer is: the API server.

The API server is named this way because when you look at it through a certain lens, it is the place where Kubernetes concepts are exposed as HTTP endpoints, or APIs (in the modern web-centric usage of the term, not necessarily the programming language sense of the term).  Hence it serves up APIs.

While this is strictly speaking true, it doesn’t necessarily add a whole lot of information to the conceptual landscape!

Let’s look at it a different way.  At a slightly higher level, it is also a message broker.  kubectl POSTs or PUTs or PATCHes or DELETEs a resource specification, and the API server makes that specification available at other endpoints it serves up, effectively “broadcasting” the “message” to any “listeners” who want to react to it.  To “listen” for a “specification message”, you make use of Kubernetes watches.

(Somewhat surprisingly, the “watch” feature of the Kubernetes API is not documented on its own in the reference documentation.  For example, you’ll find documentation about how to “do” a watch in the, say, CronJob reference documentation (which actually also incorrectly recommends a deprecated way of doing it!), but you won’t find a section in the reference documentation telling you that you can “do” watches on every (plural) resource in the Kubernetes ecosystem.  Well, you can; you Just Have To Know™ that it’s documented correctly in the API conventions).

When you do, you will get back a stream of WatchEvents describing Events happening to resources of the appropriate type.  Or, if you like, a message channel that filters messages by kind!  Kubernetes watches are built on top of etcd watches and inherit their conceptual properties.  (For more on this feature, you may wish to see my Kubernetes Events Can Be Complicated post.)

So to write a message listener that reacts to incoming specification messages concerning resources of a particular kind, all we have to do is just implement an appropriate Kubernetes watch, right?  Not quite.  While watches are lighter than they have been in the past, they aren’t exactly lightweight, and, more critically, if you’re truly going to react to all specification changes over time, you may be jumping in to the middle of a logical event stream.  So you’ll need to list what’s gone on first, then establish your watch from that point forward.

That’s a good jumping-off point for (finally) digging into the tools/cache package, the Go-language-specific framework for writing Kubernetes controllers.  Somewhere in this package something has to be listing all the resources of a given type—Pods, Deployments, whatever—and then setting up watches on them: the combination will give you a logical collection of objects and modifications to them.

Sure enough, if you dig around enough, you eventually end up looking at the listwatch.go file.  In there, you’ll find the ListerWatcher type, declared like so:

// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
	// List should return a list type object; the Items field will be extracted, and the
	// ResourceVersion field will be used to start the watch in the right place.
	List(options metav1.ListOptions) (runtime.Object, error)
	// Watch should begin a watch at the specified version.
	Watch(options metav1.ListOptions) (watch.Interface, error)
}

We’d like to find an implementation of this type that is backed by a Kubernetes client or an HTTP client or something, so that the List() function is actually attached to the Kubernetes API server.  Oh, look, there is one:

// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
	listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
		options.FieldSelector = fieldSelector.String()
		return c.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			Do().
			Get()
	}
	watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
		options.Watch = true
		options.FieldSelector = fieldSelector.String()
		return c.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			Watch()
	}
	return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

(You can dig further in the source code to discover that references to c are basically references to a Kubernetes client.)

So we have a little component that can list and watch things of a particular type.  Note as well the fieldSelector reference, which in messaging system terms allows us to filter the message stream by various criteria.  Field selectors are quite primitive and mostly undocumented, unfortunately, but they’re better than nothing.

Assuming we have a ListerWatcher backed by a Kubernetes client, we now have a logical collection of Kubernetes resources of a particular kind and a certain number of events concerning those resources.  It would be handy to put these things somewhere.  How about in a Store?

type Store interface {
	Add(obj interface{}) error
	Update(obj interface{}) error
	Delete(obj interface{}) error
	List() []interface{}
	ListKeys() []string
	Get(obj interface{}) (item interface{}, exists bool, err error)
	GetByKey(key string) (item interface{}, exists bool, err error)

	// Replace will delete the contents of the store, using instead the
	// given list. Store takes ownership of the list, you should not reference
	// it after calling this function.
	Replace([]interface{}, string) error
	Resync() error
}

Store seems nice and generic (no parameters of any types other than Go primitives).  It would be nice to either extend our ListerWatcher to take in a Store where it could dump its stuff.  For various reasons, the tools/cache package decided to create another concept called a Reflector, which is simply the grouping together of a ListerWatcher with a Store together with the necessary plumbing to route the return value of the ListerWatcher‘s List() function “into” the Store, as well as the necessary plumbing to turn incoming WatchEvents into additions, updates and removals of items in the Store.

Rising up above the mud for a moment, think about why this concept is called a reflector.  Looked at one way, it reflects the contents of a Kubernetes message channel into a cache.  Then if you want to do something in your Kubernetes controller program with resource specifications and statuses of a particular kind, you can look to the cache rather than to the API server itself.

One important thing to notice is that this subsystem is standalone and self-contained: if you have a ListerWatcher backed by a Kubernetes client, a Store and a Reflector that bundles them together, you have a component that can be useful in a number of scenarios down the line.  More importantly, conceptually we can, when necessary, clear some mental space by simply referring to the Reflector, understanding that effectively it is a cache of events we want to react to, and that the other machinery we’ve delved into above are its implementation details.

If we take advantage of concepts from object-oriented programming and a little bit of Java (after all, this series is about deriving a Java implementation of a Kubernetes controller framework) we can dispense with the ListerWatcher concept and fold it into the Reflector concept as an implementation detail.  If we do this, our visual model of all of this might start to look like this:

KubernetesControllerBlogPart1.png

Here, a KubernetesResourceReflector is a (hypothetical) concrete implementation of Reflector, set up for resources of a particular kind as represented by the T parameter.  Give it a KubernetesClient and a Store implementation at construction time and run() it, and it will set about causing the Store to contain a cached copy of Kubernetes resources of a particular kind (you and I know that under the covers some sort of mechanism resembling a ListerWatcher will be used, but the mechanism by which the actual listing and watching occurs is not of interest to the end user, only that the cache is populated).  We’ll greatly refine and refactor this model over time.

In the next post, we’ll look at some more concepts from the tools/cache package.  This is just the tip of the iceberg!

Understanding Kubernetes’ tools/cache package: part 0

Let us grant first that any time you say something like “looked at one way” you’re about to emphasize one facet of a system and downplay or outright ignore others.  Let us also grant that such an exercise is to dig into the part that’s emphasized through the lens of the emphasis.  So then: onwards.

Looked at one way, Kubernetes is a distributed messaging system, whose message broker of sorts is the API server, and whose messages are specifications of desired state.  When you use this overly simplistic lens, kubectl and other clients can be seen as creators (broadcasters) of these specifications, and certain Kubernetes programs can be seen as listeners for and reactors to the creation, modification and deletion of these specifications, creating or starting or stopping or deleting other programs, and possibly creating, modifying or deleting other specification messages.  One hopes that all these gyrations end up realizing the state demanded by the specifications.

(Pursuant to the first paragraph, there are also first-class events in Kubernetes.  I’m not emphasizing those here.)

Suppose you wanted to write a program that could listen for these specification “messages” and do something as a reaction to them.  CoreOS calls these kinds of programs “operators”; Kubernetes itself calls them “controllers”, or sometimes simply “control loops”.  Conceptually, there’s nothing magic about them: listen for some conceptual events, react to them, possibly fire others.

Nevertheless, writing controllers (I like the Kubernetes term, not the CoreOS term) has taken on a certain kind of mystique.  This is due in part to the fact that Kubernetes is a language-independent platform.  This means if you want to be notified of these “specification events”, you have to arrange to effectively write a distributed messaging client yourself that ultimately hooks up to the right HTTP endpoints of the API server.  This involves doing caching right, accounting for failures in the pinball machine, being efficient in terms of memory usage, deciding whether your messaging client is going to multiplex events to other downstream messaging clients, and so on.

Because this is all complicated, and because programmers (yours truly included) are lazy, people tend to want to use an implementation of such a “messaging client” that has already been built.  To date, only one such (widely used) implementation has been built, and it is written in Go, and it is part of the Kubernetes codebase itself, and it is in my opinion phenomenally complicated, or—not mutually exclusively—perhaps I’m just thick-headed.  Thankfully, there are some excellent blog posts and videos to help with all this.

So you could, if you wanted, simply follow the recipes outlined in those blog posts and videos, and write a Go program that uses their constructs appropriately, and off you go.

But if you’re like me, you want to fully understand how the whole pseudo-messaging-client-library-underlying-Kubernetes-controllers actually works.  Also, in my case, as an incorrigible Java programmer, I want to reverse engineer the abstractions so I can express them better in Java, and hopefully derive a Java implementation of this pseudo-messaging-client-library-underlying-Kubernetes-controllers library.

Over the next few posts, we’ll head off on the journey of disentangling and hopefully demystifying this tools/cache package, and hopefully at the end of the road will end up with an idiomatic Java way of writing controllers.  We’ll start with the next post, which talks about some of the underpinnings.

microBean Helm: Using Maven Repositories

About half a year ago, I wrote microBean Helm, a port of the Helm ecosystem to Java.  I was motivated in part to do this by the recognition that Helm chart repositories could be seen as fledgling, partial implementations of Maven repositories.  What if a Maven repository ecosystem—complete with caching, mirroring and all the rest—could be used as a chart repository?

I’m proud to introduce microBean Helm Maven Integration, a project that implements an AbstractChartResolver in terms of Maven’s repository ecosystem.

The MavenRepositoryChartResolver class will produce a Java object representing a Helm chart in memory from Maven repository coordinates, resolving it if necessary from a remote Maven repository, taking into account a user’s .m2/settings.xml file.  From there, it can be plugged into the other Helm-related tooling in the microBean Helm ecosystem.