In part 3, we dug into the real contract that Controller
implementations must obey in order to be useful, and looked into what informers and SharedIndexInformer
s are.
In this post, we’ll look at DeltaFIFO
, since it is at the heart of a lot of this. We’ll do this mostly in isolation, and then will attempt, probably in a later post, to “plug it back in” to the overall pinball machine so we can make sense of it in a larger context.
From the name DeltaFIFO
, we can surmise we’ll be talking about diffs in some capacity, and in the context of queuing. Let’s look at the contract:
// DeltaFIFO is like FIFO, but allows you to process deletes. // // DeltaFIFO is a producer-consumer queue, where a Reflector is // intended to be the producer, and the consumer is whatever calls // the Pop() method. // // DeltaFIFO solves this use case: // * You want to process every object change (delta) at most once. // * When you process an object, you want to see everything // that's happened to it since you last processed it. // * You want to process the deletion of objects. // * You might want to periodically reprocess objects. // // DeltaFIFO's Pop(), Get(), and GetByKey() methods return // interface{} to satisfy the Store/Queue interfaces, but it // will always return an object of type Deltas. // // A note on threading: If you call Pop() in parallel from multiple // threads, you could end up with multiple threads processing slightly // different versions of the same object. // // A note on the KeyLister used by the DeltaFIFO: It's main purpose is // to list keys that are "known", for the purpose of figuring out which // items have been deleted when Replace() or Delete() are called. The deleted // object will be included in the DeleteFinalStateUnknown markers. These objects // could be stale. // // You may provide a function to compress deltas (e.g., represent a // series of Updates as a single Update). type DeltaFIFO struct { // lock/cond protects access to 'items' and 'queue'. lock sync.RWMutex cond sync.Cond // We depend on the property that items in the set are in // the queue and vice versa, and that all Deltas in this // map have at least one Delta. items map[string]Deltas queue []string // populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update was called first. populated bool // initialPopulationCount is the number of items inserted by the first call of Replace() initialPopulationCount int // keyFunc is used to make the key used for queued item // insertion and retrieval, and should be deterministic. keyFunc KeyFunc // deltaCompressor tells us how to combine two or more // deltas. It may be nil. deltaCompressor DeltaCompressor // knownObjects list keys that are "known", for the // purpose of figuring out which items have been deleted // when Replace() or Delete() is called. knownObjects KeyListerGetter // Indication the queue is closed. // Used to indicate a queue is closed so a control loop can exit when a queue is empty. // Currently, not used to gate any of CRED operations. closed bool closedLock sync.Mutex } var ( _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue )
To boil this down further, let’s conceptually for now ignore the threading concerns, “delta compression” as represented by the deltaCompressor
field, and “queue is closed” concerns.
What we’re left with is a type about which we can say:
- A
DeltaFIFO
is aQueue
ofDeltas
instances. It turns out aDeltas
type is just a collection ofDelta
instances. We’ll see what those are in a moment. - The
Deltas
instances aDeltaFIFO
holds are “keyable”. Astring
key can be extracted from a givenDeltas
instance by way of aKeyFunc
. - Certain
Deltas
instances inside theDeltaFIFO
can be “known”.
Let’s look at what a Delta
type is:
// Delta is the type stored by a DeltaFIFO. It tells you what change // happened, and the object's state after* that change. // // [*] Unless the change is a deletion, and then you'll get the final // state of the object before it was deleted. type Delta struct { Type DeltaType Object interface{} }
A Delta
, in other words, is an event! It is the combination of a verb (the DeltaType
) and the payload (the object stored in the Object
field).
So a DeltaFIFO
is a thread-safe Queue
of collections (Deltas
) of events (Delta
), indexed by some sort of key, where each event (Delta
) in the collection represents the addition, modification or removal of a Kubernetes resource.
Or, to put it another way, it’s a queue that turns invocations of a function of the form Add(someObject)
into effective invocations of a hypothetical AddEvent(ObjectAdded, someObject)
function, and invocations of a function of the form Update(someObject)
into effective invocations of a hypothetical AddEvent(ObjectUpdated, someObject)
function, and so on.
It’s worth noting that in some sense we’ve kind of come full circle. Recall that it’s possible to establish Kubernetes watches on lists of resources. When you do this, you get a stream of WatchEvent
s, which represent additions, modifications or removals of Kubernetes resources. So what are we doing now with a DeltaFIFO
and Deltas
and so on? Why not just work off WatchEvent
s directly?
Well, also recall that you need to set up watches in conjunction with an initial listing of Kubernetes resource instances, and that listing is not comprised of WatchEvent
s. So from a very high level, we’ve coalesced the concept of a list operation and the concept of a watch operation and expressed them both in terms of Delta
instances, which all end up in this DeltaFIFO
construct. This can then be used to distribute events, JavaBean-style, to event handlers.
Let’s see if we can put this back in a limited amount of context. We’re talking about a queue, so you should be able to add things to it. It is fundamentally a queue of Delta
instances (by way of Deltas
instances) internally. So how do you build a Delta
?
As it turns out, a DeltaFIFO
builds one for you:
// Add inserts an item, and puts it in the queue. The item is only enqueued // if it doesn't already exist in the set. func (f *DeltaFIFO) Add(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true return f.queueActionLocked(Added, obj) } [snip] // queueActionLocked appends to the delta list for the object, calling // f.deltaCompressor if needed. Caller must lock first. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } // If object is supposed to be deleted (last event is Deleted), // then we should ignore Sync events, because it would result in // recreation of this object. if actionType == Sync && f.willObjectBeDeletedLocked(id) { return nil } newDeltas := append(f.items[id], Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) if f.deltaCompressor != nil { newDeltas = f.deltaCompressor.Compress(newDeltas) } _, exists := f.items[id] if len(newDeltas) > 0 { if !exists { f.queue = append(f.queue, id) } f.items[id] = newDeltas f.cond.Broadcast() } else if exists { // The compression step removed all deltas, so // we need to remove this from our map (extra items // in the queue are ignored if they are not in the // map). delete(f.items, id) } return nil }
So from a Java modeling standpoint, we have to realize that any generic type that we use in modeling a DeltaFIFO
actually has to be two generic types: one would be T
, the actual type of the Kubernetes resource being affected, and two would be something like Delta<T>
, which would be the “event” type actually stored internally by the queue.
The innards of DeltaFIFO
model a set by storing a map
of keys as well as an internal slice of objects. From this we can deduce that duplicates are not permitted in queues of this kind, so it has set semantics. From a Java perspective, this is an important insight as underneath the covers we’ll likely use some kind of Set
implementation. Also, in Java, Object
s have an equals(Object)
method, which might allow us to simplify the KeyFunc
semantics as well.
Visually, here is a simplistic model of the overall structure:
As always, we’ll refine and refactor this model as we go, but it’s useful to sketch out some of the concepts.
In the next post, we’ll try to put some of these pieces back together again.
2 thoughts on “Understanding Kubernetes’ tools/cache package: part 4”
Comments are closed.