Understanding Kubernetes’ tools/cache package: part 4

In part 3, we dug into the real contract that Controller implementations must obey in order to be useful, and looked into what informers and SharedIndexInformers are.

In this post, we’ll look at DeltaFIFO, since it is at the heart of a lot of this.  We’ll do this mostly in isolation, and then will attempt, probably in a later post, to “plug it back in” to the overall pinball machine so we can make sense of it in a larger context.

From the name DeltaFIFO, we can surmise we’ll be talking about diffs in some capacity, and in the context of queuing.  Let’s look at the contract:

// DeltaFIFO is like FIFO, but allows you to process deletes.
//
// DeltaFIFO is a producer-consumer queue, where a Reflector is
// intended to be the producer, and the consumer is whatever calls
// the Pop() method.
//
// DeltaFIFO solves this use case:
//  * You want to process every object change (delta) at most once.
//  * When you process an object, you want to see everything
//    that's happened to it since you last processed it.
//  * You want to process the deletion of objects.
//  * You might want to periodically reprocess objects.
//
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
// interface{} to satisfy the Store/Queue interfaces, but it
// will always return an object of type Deltas.
//
// A note on threading: If you call Pop() in parallel from multiple
// threads, you could end up with multiple threads processing slightly
// different versions of the same object.
//
// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
// to list keys that are "known", for the purpose of figuring out which
// items have been deleted when Replace() or Delete() are called. The deleted
// object will be included in the DeleteFinalStateUnknown markers. These objects
// could be stale.
//
// You may provide a function to compress deltas (e.g., represent a
// series of Updates as a single Update).
type DeltaFIFO struct {
	// lock/cond protects access to 'items' and 'queue'.
	lock sync.RWMutex
	cond sync.Cond

	// We depend on the property that items in the set are in
	// the queue and vice versa, and that all Deltas in this
	// map have at least one Delta.
	items map[string]Deltas
	queue []string

	// populated is true if the first batch of items inserted by Replace() has been populated
	// or Delete/Add/Update was called first.
	populated bool
	// initialPopulationCount is the number of items inserted by the first call of Replace()
	initialPopulationCount int

	// keyFunc is used to make the key used for queued item
	// insertion and retrieval, and should be deterministic.
	keyFunc KeyFunc

	// deltaCompressor tells us how to combine two or more
	// deltas. It may be nil.
	deltaCompressor DeltaCompressor

	// knownObjects list keys that are "known", for the
	// purpose of figuring out which items have been deleted
	// when Replace() or Delete() is called.
	knownObjects KeyListerGetter

	// Indication the queue is closed.
	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
	// Currently, not used to gate any of CRED operations.
	closed     bool
	closedLock sync.Mutex
}

var (
	_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
)

To boil this down further, let’s conceptually for now ignore the threading concerns, “delta compression” as represented by the deltaCompressor field, and “queue is closed” concerns.

What we’re left with is a type about which we can say:

  • A DeltaFIFO is a Queue of Deltas instances.  It turns out a Deltas type is just a collection of Delta instances.  We’ll see what those are in a moment.
  • The Deltas instances a DeltaFIFO holds are “keyable”.  A string key can be extracted from a given Deltas instance by way of a KeyFunc.
  • Certain Deltas instances inside the DeltaFIFO can be “known”.

Let’s look at what a Delta type is:

// Delta is the type stored by a DeltaFIFO. It tells you what change
// happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
//     state of the object before it was deleted.
type Delta struct {
	Type   DeltaType
	Object interface{}
}

A Delta, in other words, is an event! It is the combination of a verb (the DeltaType) and the payload (the object stored in the Object field).

So a DeltaFIFO is a thread-safe Queue of collections (Deltas) of events (Delta), indexed by some sort of key, where each event (Delta) in the collection represents the addition, modification or removal of a Kubernetes resource.

Or, to put it another way, it’s a queue that turns invocations of a function of the form Add(someObject) into effective invocations of a hypothetical AddEvent(ObjectAdded, someObject) function, and invocations of a function of the form Update(someObject) into effective invocations of a hypothetical AddEvent(ObjectUpdated, someObject) function, and so on.

It’s worth noting that in some sense we’ve kind of come full circle.  Recall that it’s possible to establish Kubernetes watches on lists of resources.  When you do this, you get a stream of WatchEvents, which represent additions, modifications or removals of Kubernetes resources.  So what are we doing now with a DeltaFIFO and Deltas and so on?  Why not just work off WatchEvents directly?

Well, also recall that you need to set up watches in conjunction with an initial listing of Kubernetes resource instances, and that listing is not comprised of WatchEvents.  So from a very high level, we’ve coalesced the concept of a list operation and the concept of a watch operation and expressed them both in terms of Delta instances, which all end up in this DeltaFIFO construct.  This can then be used to distribute events, JavaBean-style, to event handlers.

Let’s see if we can put this back in a limited amount of context.  We’re talking about a queue, so you should be able to add things to it.  It is fundamentally a queue of Delta instances (by way of Deltas instances) internally.  So how do you build a Delta?

As it turns out, a DeltaFIFO builds one for you:

// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *DeltaFIFO) Add(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	return f.queueActionLocked(Added, obj)
}

[snip]
// queueActionLocked appends to the delta list for the object, calling
// f.deltaCompressor if needed. Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}

	// If object is supposed to be deleted (last event is Deleted),
	// then we should ignore Sync events, because it would result in
	// recreation of this object.
	if actionType == Sync && f.willObjectBeDeletedLocked(id) {
		return nil
	}

	newDeltas := append(f.items[id], Delta{actionType, obj})
	newDeltas = dedupDeltas(newDeltas)
	if f.deltaCompressor != nil {
		newDeltas = f.deltaCompressor.Compress(newDeltas)
	}

	_, exists := f.items[id]
	if len(newDeltas) > 0 {
		if !exists {
			f.queue = append(f.queue, id)
		}
		f.items[id] = newDeltas
		f.cond.Broadcast()
	} else if exists {
		// The compression step removed all deltas, so
		// we need to remove this from our map (extra items
		// in the queue are ignored if they are not in the
		// map).
		delete(f.items, id)
	}
	return nil
}

So from a Java modeling standpoint, we have to realize that any generic type that we use in modeling a DeltaFIFO actually has to be two generic types: one would be T, the actual type of the Kubernetes resource being affected, and two would be something like Delta<T>, which would be the “event” type actually stored internally by the queue.

The innards of DeltaFIFO model a set by storing a map of keys as well as an internal slice of objects.  From this we can deduce that duplicates are not permitted in queues of this kind, so it has set semantics.  From a Java perspective, this is an important insight as underneath the covers we’ll likely use some kind of Set implementation.  Also, in Java, Objects have an equals(Object) method, which might allow us to simplify the KeyFunc semantics as well.

Visually, here is a simplistic model of the overall structure:

KubernetesControllerBlogPart4.png

As always, we’ll refine and refactor this model as we go, but it’s useful to sketch out some of the concepts.

In the next post, we’ll try to put some of these pieces back together again.

Advertisements

2 thoughts on “Understanding Kubernetes’ tools/cache package: part 4

  1. Pingback: Understanding Kubernetes’ tools/cache package: part 3 | Blame Laird

  2. Pingback: Understanding Kubernetes’ tools/cache package: part 5 | Blame Laird

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s