Understanding Kubernetes’ tools/cache package: part 11—Towards a Kubernetes controller CDI 2.0 portable extension

In part 10, we laid out what we need to do to make a simple programming model available to an end user who wants to be notified of Kubernetes events in the same style as Go programmers using the tools/cache package.  This series starts with part 0.

Here we’ll look at implementing a CDI 2.0 portable extension that enables this lean programming model.

From part 10, we sketched out the idea of pairing an observer method with a producer method, using the same qualifiers on both the producer method’s return type and on the observer method’s event type.  Linked in this way, you now have the ability to build boilerplate machinery in between the Listable-and-VersionWatchable made by the producer method and the event observed by the observer method, and the end user doesn’t have to know about or invoke any of it: her events “just work” assuming this extension is present, opaquely, at runtime.

This post will make use of the microbean-kubernetes-controller framework, the Java Kubernetes controller framework built up over the course of this series (you can start at the beginning if you like).  This framework is CDI-independent.  We’ll adapt it to CDI 2.0 so end users don’t have to deal with any boilerplate.

As discussed, our portable extension will have to:

  • find beans that have Listable and VersionWatchable among their bean types
  • find observer methods observing Event objects that contain Kubernetes resources in them
  • ensure that for any given observer method whose event parameter is qualified with a set of qualifier annotations, there exists a producer method (or any other kind of CDI bean, actually) with the right kind of type that is also qualified by those same qualifier annotations (or no event delivery will take place)
  • have some way of recognizing that arbitrary qualifier annotations defined by the user are those that should be used to identify Kubernetes events—i.e. some way of meta-qualifying a qualifier annotation that appears on relevant producer methods and relevant event parameters

We can do all these things thanks to the well-defined lifecycle mandated by CDI.

First, we’ll sketch out a meta-qualifier annotation named KubernetesEventSelector.  The intent of this meta-annotation will be that if you put it on a qualifier annotation that you create, then your qualifier annotation will be treated as a qualifier that is trying to say something specifically about sets of Kubernetes events.

Here’s what that might look like:


import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import javax.inject.Qualifier;
@Documented
@Retention(value = RetentionPolicy.RUNTIME)
@Target({ ElementType.ANNOTATION_TYPE })
public @interface KubernetesEventSelector {
}

view raw

part11-6.java

hosted with ❤ by GitHub

Now as an end user if I define my own qualifier annotation and annotate it with KubernetesEventSelector:


@Documented
@KubernetesEventSelector
@Qualifier
@Retention(value = RetentionPolicy.RUNTIME)
@Target({ ElementType.METHOD, ElementType.PARAMETER })
public @interface AllConfigMapEvents {
}

view raw

part11-6.java

hosted with ❤ by GitHub

…I will have expressed my intent that when we “see” this annotation:

@AllConfigMapEvents

…we are talking about choosing Kubernetes events in some handwavy manner, and not something else.

So now our as-yet-to-be-written portable extension has at least the theoretical means to find event parameters and beans and see if they’re qualified with qualifiers that, in turn, are annotated with @KubernetesEventSelector.  Those will be the only things of interest to this extension.

Next, we’ll take advantage of the CDI lifecycle and the prescribed order of things and recognize that bean processing happens before observer method processing.  So let’s start with looking for those beans that have Listable and VersionWatchable among their bean types.

We’re going to run into a problem right away.  We would like to look for beans whose bean types are of type X where X is some type that extends Listable and VersionWatchable, but otherwise we don’t really care what X actually is.  But producer methods must return concrete types without type variables in them.

What we’ll do instead is look for the most general type in the fabric8 Kubernetes model hierarchy that implements or extends both Listable and VersionWatchable and deal with that.  That type happens to be Operation.  We’ll keep track during bean processing of all Beans that match these types.  That means it doesn’t matter if the bean in question is a producer method, a producer field, a managed bean, a synthetic bean or any of the other kinds of beans you can cause to come into existence in a CDI world: they’ll all be accounted for.

Now, obviously we’re not actually interested in all beans whose bean types include Operation.  We’re interested in those that have been explicitly “tagged” with the KubernetesEventSelector annotation.

It would be nice if there were a convenience method of sorts in CDI that would let you ask a bean for its qualifiers but would restrict the returned set to only those qualifiers that, in turn, are meta-qualified with something else (like KubernetesEventSelector).  Unfortunately, there is not, so we’ll have to do this ourselves.

So, refined: we’ll keep track of Beans with Operation (and hence by definition Listable and VersionWatchable) among their bean types, and with KubernetesEventSelector-qualified qualifiers.  We’ll keep track of those annotations, too.

Once we do this, we’ll have some machinery that can now pick out sources for raw materials that we’ll need to feed to a Reflector.

But obviously we don’t really want to do any of this if there aren’t any observer methods around.  Fortunately but also unfortunately, bean processing happens before observer method processing, so we will do this work, and then discard it if we don’t find any relevant observer methods.

What’s a relevant observer method?  In this case it’s an observer method whose (a) AbstractEvent-typed event parameter is (b) qualified by one or more qualifier annotations that are themselves (c) meta-qualified by KubernetesEventSelector, and that (d) “matches” an equivalent set of qualifiers on an Operation-typed bean type.  This means that indirectly the observer method will “watch” for what the producer method is indirectly providing access to.

Here’s an example observer method:


import javax.enterprise.event.Observes;
import io.fabric8.kubernetes.api.model.HasMetadata;
import org.microbean.kubernetes.controller.Event;
private final void onConfigMapEvent(@Observes @AllConfigMapEvents final Event<? extends HasMetadata> event) {
}

view raw

part11-7.java

hosted with ❤ by GitHub

Note the usage of the @AllConfigMapEvents qualifier we defined earlier in this blog post, and note that the event parameter so qualified is an Event.  Let’s say we found a “matching” producer method:


import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapList;
import io.fabric8.kubernetes.api.model.DoneableConfigMap;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.Operation;
import io.fabric8.kubernetes.client.dsl.Resource;
@Produces
@ApplicationScoped
@AllConfigMapEvents
private static final Operation<ConfigMap, ConfigMapList, DoneableConfigMap, Resource<ConfigMap, DoneableConfigMap>> selectAllConfigMaps(final KubernetesClient client) {
return client.configMaps();
}

view raw

part11-8.java

hosted with ❤ by GitHub

Note the Operation return type and the matching qualifier-meta-qualified-with-KubernetesEventSelector that decorates it (@AllConfigMapEvents).  Do you see how it “matches” the observer method’s event parameter qualifier (also @AllConfigMapEvents)?  This is the linkage that connects this producer method with the observer method that observes events that will result from the producer method’s existence.

So once we have a mapping from a Bean to a set of relevant qualifier annotations, then we can see if, during observer method processing, we’re talking about the first observer method that should cause our boilerplate machinery to get invoked.  If there are many observer methods that “match” a relevant producer method, that’s fine; that just indicates that the very same Kubernetes event might be processed by many observers.  The point is, before we jump through all the hoops to set up lists and watches and so on, we should make sure that someone is actually going to take advantage of our work.

So once we “get a match”, we can build the boilerplate for all matches of the same type, once.

For each distinct match, we’ll need a new Controller.  The controller will need an instance of whatever the producer method is making (some kind of Operation, which is by definition both a Listable and a VersionWatchable).

The Controller, in turn, will need some kind of Map representing Kubernetes objects we “know about”.  That Map will be empty to start with, since we don’t know about any Kubernetes objects yet.  But if, down the road, a deletion gets skipped (maybe due to a watch timing out or a disconnect), we need to retroactively fire a delete if, say, the Kubernetes list of relevant objects is “less than” our “known objects”, and we’ll need to know what object was actually deleted.

The Controller will also need some kind of Consumer (a siphon) that will slurp up EventQueues and dispatch their events in order without blocking threads.  That Consumer will also need to update that very same Map of “known objects” as it does so.

Finally, that Consumer of EventQueues will also need a mechanism to link it to the native CDI event broadcast mechanism which will take care of actually dispatching events to observer methods.

Whew!  Let’s do the easy boring stuff first.

The first lifecycle event that looks good is ProcessBean.  If we could process all beans whose types are Operation here, in one place, regardless of whether the bean in question is a producer method, managed bean, etc. that would be great.  As it turns out for various obscure reasons, we can’t.  So we’ll implement a lifecycle event observer method for each type of bean, and route all their stuff to a common private method for processing:


private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processProducerMethod(@Observes final ProcessProducerMethod<X, ?> event, final BeanManager beanManager) {
if (event != null) {
// (We'll define this in a second.)
this.processPotentialEventSelectorBean(event.getBean(), beanManager);
}
}
private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processProducerField(@Observes final ProcessProducerField<X, ?> event, final BeanManager beanManager) {
if (event != null) {
this.processPotentialEventSelectorBean(event.getBean(), beanManager);
}
}
private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processManagedBean(@Observes final ProcessManagedBean event, final BeanManager beanManager) {
if (event != null) {
this.processPotentialEventSelectorBean(event.getBean(), beanManager);
}
}
private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processSyntheticBean(@Observes final ProcessSyntheticBean event, final BeanManager beanManager) {
if (event != null) {
this.processPotentialEventSelectorBean(event.getBean(), beanManager);
}
}

view raw

part11-1.java

hosted with ❤ by GitHub

And the processPotentialEventSelectorBean method that these all effectively delegate to might look like this:


private final void processPotentialEventSelectorBean(final Bean<?> bean, final BeanManager beanManager) {
if (bean != null) {
final Type operationType = getOperationType(bean);
if (operationType != null) {
final Set kubernetesEventSelectors = Annotations.retainAnnotationsQualifiedWith(bean.getQualifiers(), KubernetesEventSelector.class, beanManager);
if (kubernetesEventSelectors != null && !kubernetesEventSelectors.isEmpty()) {
this.eventSelectorBeans.put(kubernetesEventSelectors, bean);
}
}
}
}

view raw

part11-3.java

hosted with ❤ by GitHub

…where the Annotations utility method can be found elsewhere, and where the getOperationType() method might look like this:


private static final Type getOperationType(final Bean<?> bean) {
final Type returnValue;
if (bean == null) {
returnValue = null;
} else {
final Set<Type> beanTypes = bean.getTypes();
assert beanTypes != null;
assert !beanTypes.isEmpty();
Type candidate = null;
for (final Type beanType : beanTypes) {
if (beanType instanceof ParameterizedType) {
final Type rawType = ((ParameterizedType)beanType).getRawType();
if (rawType instanceof Class && Operation.class.equals((Class<?>)rawType)) {
candidate = beanType;
break;
}
}
}
returnValue = candidate;
}
return returnValue;
}

view raw

part11-2.java

hosted with ❤ by GitHub

So at the end of all this we’ll have a Map, eventSelectorBeans, that has as its keys Sets of Annotation instances that have KubernetesEventSelector somewhere “on” them, and has as its values Beans that are qualified with those annotations.

Now we’ll need to route all observer method processing to something similar (fortunately, this is a great deal simpler).  Recall that we are guaranteed by contract that observer method processing happens after discovering beans, so by this point we already have knowledge of any producer methods or managed beans “making” Operation instances of various kinds:


private final void processObserverMethod(@Observes final ProcessObserverMethod<? extends org.microbean.kubernetes.controller.Event<? extends HasMetadata>, ?> event, final BeanManager beanManager) {
if (event != null) {
this.processPotentialEventSelectorObserverMethod(event.getObserverMethod(), beanManager);
}
}
private final void processSyntheticObserverMethod(@Observes final ProcessSyntheticObserverMethod<? extends org.microbean.kubernetes.controller.Event<? extends HasMetadata>, ?> event, final BeanManager beanManager) {
if (event != null) {
this.processPotentialEventSelectorObserverMethod(event.getObserverMethod(), beanManager);
}
}
private final void processPotentialEventSelectorObserverMethod(final ObserverMethod<? extends org.microbean.kubernetes.controller.Event<? extends HasMetadata>> observerMethod, final BeanManager beanManager) {
if (observerMethod != null) {
final Set<Annotation> kubernetesEventSelectors = Annotations.retainAnnotationsQualifiedWith(observerMethod.getObservedQualifiers(), KubernetesEventSelector.class, beanManager);
if (kubernetesEventSelectors != null && !kubernetesEventSelectors.isEmpty()) {
if (observerMethod.isAsync()) {
if (!this.asyncNeeded) {
this.asyncNeeded = true;
}
} else if (!this.syncNeeded) {
this.syncNeeded = true;
}
this.beans.add(this.eventSelectorBeans.remove(kubernetesEventSelectors));
}
}
}

view raw

part11-4.java

hosted with ❤ by GitHub

(Above, we also keep track of whether the observer method in question is synchronous or asynchronous.)

Finally, we can clean up a little bit after bean discovery is all done:


private final void processAfterBeanDiscovery(@Observes final AfterBeanDiscovery event) {
if (event != null) {
this.eventSelectorBeans.clear();
}
}

view raw

part11-5.java

hosted with ❤ by GitHub

At the end of all of this bean discovery, we have a Set of Bean instances stored in a beans instance variable that (a) “make” Operation instances, (b) are appropriately meta-qualified to declare their interest in taking part in Kubernetes event production, and (c) have at least one observer method interested in their effects.

Let’s build some controllers and start them when the user’s program comes up!

We’ll begin by observing the initialization of the application scope, which is exactly equal to the start of a CDI application.

When this happens, our beans instance variable will contain some of the materials to build a Controller.  For each Bean in there, we’ll “instantiate” it (by way of a beanManager.getReference() call, create a new Controller, hook that Controller up to the right kind of EventDistributor, start the Controller and keep track of it for later cleanup:


private final <T extends HasMetadata, X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<T>>> void startControllers(@Observes @Initialized(ApplicationScoped.class) @Priority(LIBRARY_AFTER) final Object ignored, final BeanManager beanManager) {
if (beanManager != null) {
for (final Bean<?> bean : this.beans) {
assert bean != null;
final Set<Annotation> qualifiers = bean.getQualifiers();
final Map<Object, T> knownObjects = new HashMap<>();
final EventDistributor<T> eventDistributor = new EventDistributor<>(knownObjects);
eventDistributor.addConsumer(new CDIEventDistributor<T>(qualifiers, null /* no NotificationOptions; TODO */, this.syncNeeded, this.asyncNeeded));
@SuppressWarnings("unchecked") // we know an Operation is of type X
final X contextualReference = (X)beanManager.getReference(bean, getOperationType(bean), beanManager.createCreationalContext(bean));
final Controller<T> controller = new Controller<>(contextualReference, knownObjects, eventDistributor);
controller.start();
this.controllers.add(controller);
}
}
}

view raw

part11-9.java

hosted with ❤ by GitHub

In the above code example, you’ll see a reference to a CDIEventDistributor.  That private class is a Consumer of events that simply fires them using the existing CDI broadcasting mechanism:


private static final class CDIEventDistributor<T extends HasMetadata> implements Consumer<AbstractEvent<? extends T>> {
private static final Annotation[] EMPTY_ANNOTATION_ARRAY = new Annotation[0];
private final Annotation[] qualifiers;
private final NotificationOptions notificationOptions;
private final boolean syncNeeded;
private final boolean asyncNeeded;
private CDIEventDistributor(final Set<Annotation> qualifiers, final NotificationOptions notificationOptions, final boolean syncNeeded, final boolean asyncNeeded) {
super();
if (qualifiers == null) {
this.qualifiers = EMPTY_ANNOTATION_ARRAY;
} else {
this.qualifiers = qualifiers.toArray(new Annotation[qualifiers.size()]);
}
this.notificationOptions = notificationOptions;
this.syncNeeded = syncNeeded;
this.asyncNeeded = asyncNeeded;
}
@Override
public final void accept(final AbstractEvent<? extends T> controllerEvent) {
if (controllerEvent != null && (this.syncNeeded || this.asyncNeeded)) {
final BeanManager beanManager = CDI.current().getBeanManager();
assert beanManager != null;
final javax.enterprise.event.Event<Object> cdiEventMachinery = beanManager.getEvent();
assert cdiEventMachinery != null;
final TypeLiteral<AbstractEvent<? extends T>> eventTypeLiteral = new TypeLiteral<AbstractEvent<? extends T>>() {
private static final long serialVersionUID = 1L;
};
final javax.enterprise.event.Event<AbstractEvent<? extends T>> broadcaster = cdiEventMachinery.select(eventTypeLiteral, this.qualifiers);
assert broadcaster != null;
if (this.asyncNeeded) {
if (this.notificationOptions == null) {
broadcaster.fireAsync(controllerEvent);
} else {
broadcaster.fireAsync(controllerEvent, this.notificationOptions);
}
}
if (this.syncNeeded) {
broadcaster.fire(controllerEvent);
}
}
}
}

view raw

part11-10.java

hosted with ❤ by GitHub

Finally, when the user’s application stops, we should clean up after ourselves.  We observe the imminent destruction of the application scope and close everything we started up:


private final void stopControllers(@Observes @BeforeDestroyed(ApplicationScoped.class) @Priority(LIBRARY_BEFORE) final Object ignored) throws IOException {
Exception exception = null;
for (final Controller<?> controller : this.controllers) {
assert controller != null;
try {
controller.close();
} catch (final IOException | RuntimeException closeException) {
if (exception == null) {
exception = closeException;
} else {
exception.addSuppressed(closeException);
}
}
}
if (exception instanceof IOException) {
throw (IOException)exception;
} else if (exception instanceof RuntimeException) {
throw (RuntimeException)exception;
} else if (exception != null) {
throw new IllegalStateException(exception.getMessage(), exception);
}
}

view raw

part11-11.java

hosted with ❤ by GitHub

The net effect is that if you put this portable extension and its dependencies on the classpath of your CDI 2.0 application and author a producer method and observer method pair (and nothing else), your CDI application will receive Kubernetes events and should behave semantically like any other kind of Kubernetes controller.

The fledgling project housing all this code is microbean-kubernetes-controller-cdi and is available on Github under the Apache 2.0 license.

Understanding Kubernetes’ tools/cache package: part 10—Designing Kubernetes controllers in CDI 2.0

In part 9 of this series (you may want to start at the beginning), we put together a sketch of what has become a Kubernetes controller framework in Java.  The framework represents most of the concerns of a subset of the Kubernetes tools/cache package, including the SharedIndexInformer, Reflector, Controller, Indexer, DeltaFIFO, Store and ListerWatcher concepts.

But if you want to write a controller, you don’t (I’m presuming) want to deal with any of that boilerplate.  You want to express that you:

  • are interested in receiving notifications of additions, modifications and deletions of certain Kubernetes resources
  • know what to do when you receive such notifications

The other stuff—all the reflectors and controllers and informers and caches and whatnot—is simply a means to an end, so we should attempt to make it disappear.

CDI 2.0 gives us the means to do this.  Let’s lay out what we’ll need to do.  We’ll work backwards.

First, we’ll make use of the standard observer method mechanism to know what to do when we receive event notifications.  We’ll sketch our as-of-now incomplete observer method like this:

private final void onConfigMapEvent(@Observes final SomeKindOfEventNotYetDefined configMapEvent) {
  // configMapEvent should tell us: was it an addition? a modification? a removal?
  // Which ConfigMap?  What did it look like before it was modified? etc.
}

Let’s pretend that something somewhere is firing these events.  Then we could receive one here, open it up, and see what happened.  Sounds good.

Missing from our sketch is some indication as to which ConfigMaps we’d like to see events for.  Maybe we want to see all of them.  Maybe we only are interested in some meeting certain criteria.

Well, that’s what qualifiers are made for.  We’ll refine our hazy little sketch to look like this:

private final void onConfigMapEvent(@Observes @CertainConfigMaps final SomeKindOfEventNotYetDefined configMapEvent) {
  // configMapEvent should tell us: was it an addition? a modification? a removal?
  // Which ConfigMap?  What did it look like before it was modified? etc.
}

Note the addition of the @CertainConfigMaps qualifier annotation that I just made up.

OK, if we stare at that for a while as good CDI citizens, this looks nice and complete and idiomatic: this observer method looks for SomeKindOfEventNotYetDefined events, whatever they are, qualified with the @CertainConfigMaps qualifier and will be notified if ever they should be fired.  Presumably those events, whatever they are, will contain a ConfigMap or an old ConfigMap and a new one along with some indication of what happened to it or them.  Sounds good.  This is stuff the end user obviously has to write, and it’s stuff that she should write, and it’s probably about as concise as we’re going to get.  It’s nice and declarative and simple and straightforward.

So who’s going to fire them?  The person writing this method obviously shouldn’t have to care.

As we’ve seen in the previous parts of this series, if you look at things a certain way, Kubernetes itself is a distributed message bus.  So the short answer is: Kubernetes will fire them.

That’s glib, of course.  Kubernetes will fire them—and our Reflector will do what is necessary to reflect them speedily and robustly into an EventCache.  Then something else will have to harvest events from the cache.  That something had also better be able to distribute those events to interested listeners, like our observer method above.

Finally, all of this will have to have queues of various kinds in the mix to make sure that no given harvesting thread or distributing thread or caching thread is blocked while it goes about its business (or that if absolutely necessary it is blocked for the shortest possible time).  That, after all, is what the Kubernetes client-go tools/cache package is all about, so it’s what microbean-kubernetes-controller is all about.

OK, that’s all fine, but, if you notice, this subassembly—with Kubernetes and a Reflector at its “left” end and our observer method at its “right” end—starts with a specification of things to list and watch.  If we can provide just that part, thus defining the inputs for the “left” end, then all the other stuff in the middle is boilerplate, and all we have to do is attach our observer method to the “right” end with a little more boilerplate, and the end user might not ever even have to see the boilerplate!

So let’s look at the “left” end: we know we’re going to have to talk to Kubernetes using the fabric8 Kubernetes client, so that’s where we’ll start.

The good news about picking the fabric8 Kubernetes client is that it is built around a domain-specific language that makes expressing things relatively easy.  For example, you can tell a KubernetesClient to get you a Thing™ that can then get you a list of ConfigMap resources from Kubernetes like this:

client.configMaps();

That returns some sort of massively parameterized MixedOperation, but normally when you program against a model like this you worry very little about these intermediate types in the chain of builders and whatnot that result from this style of fluent programming.

However, in this case, do take note of the fact that a MixedOperation implements both Listable and VersionWatchable.  Note further that a MixedOperation is just a special kind of Operation.  Finally, recall that our Reflector needs some kind of thing that is both a Listable and a VersionWatchable to get started: in part 0, we learned about why this is.

So, in other words, you can create a new Reflector that will reflect Kubernetes events concerning ConfigMaps by supplying client.configMaps() as the first parameter of its constructor.

That’s our “left” end of this subassembly.  If you can get your hands on a Thing™ that is both a Listable and a VersionWatchable then you can create the whole rest of the subassembly and route reflected events to appropriate observer methods.

Now, in CDI, any time you need a Thing™, you don’t make it yourself.  You look around for it, or you declare that someone else needs to provide it.  We will treat, in other words, an Operation as a CDI bean.

An Operation implementation is extremely unlikely to be a managed bean—a plain-old-Java-object (POJO) just lying around somewhere with some bean-defining annotations on it.  Instead, as we’ve seen, you make an Operation by way of a method invocation on a KubernetesClient (like configMaps()).

CDI producer methods are one kind of CDI bean that fit this bill perfectly!  Suppose we have a producer method somewhere sketched out like this:

@Produces
@ApplicationScoped
@CertainConfigMaps
private static final Operation<ConfigMap, ConfigMapList, DoneableConfigMap, Resource<ConfigMap, DoneableConfigMap>> selectAllConfigMaps(final KubernetesClient client) {
return client.configMaps();
}

Then so long as someone somewhere has made a KubernetesClient for us, it will be injected as this producer method’s sole parameter.  That means we can use it to return an Operation.  That means we have the “left” end of our subassembly.

(As it happens, there’s a handy little project that does expose a KubernetesClient as a CDI bean; it’s named microbean-kubernetes-client-cdi.  If that’s on your CDI 2.0 classpath, you can inject KubernetesClient wherever you want it.)

Note as well here that the @CertainConfigMaps annotation that was part of our (still unfinished) observer method sketch is qualifying the return type.

From here, you can see, I hope, that if we have a machine that can

  • gather up all the CDI beans that are qualified with a given qualifier (like @CertainConfigMaps) or set of qualifiers, regardless of how those beans are made or who makes them, and
  • can also gather up all the observer methods that observe events qualified with that same qualifier or those same qualifiers, then
  • we have the means to unite a producer of raw materials needed to pump Kubernetes events into the system, and a consumer of those events to get them out of the system!

We’ll build this machine in the next post.

 

Understanding Kubernetes’ tools/cache package: part 9—Kubernetes controllers in Java!

In part 8 of this series (you can start at the beginning if you want), we looked at some of the foundational principles and stakes in the ground we’re going to establish and plant in the ground as part of our journey towards making an idiomatic Kubernetes controller framework in Java and CDI 2.0.

Let’s look at an idiomatic implementation of these concepts centered around the fabric8 Kubernetes client implementation of Kubernetes watch functionality, and (eventually) targeted at a CDI 2.0 environment.

A Reflector‘s main purpose is to reflect a certain portion of the state of the Kubernetes universe into a cache of some kind.  It needs to do this in a fault-tolerant and thread-safe way, taking care not to block the thread it’s using to talk to Kubernetes.  As we’ve seen, this involves starting by listing objects meeting certain criteria and then setting up a watch for those objects and reacting to the logical events that occur to them by offloading them quickly into some kind of cache and scampering back to Kubernetes for more.  Then periodically a synchronization of downstream consumers with known state is performed, making sure that downstream consumers never miss any events concerning Kubernetes resources they care about, even if the listing and watching machinery suffers a hiccup.

The Go code around all this is relatively complicated.  Fortunately, though, since we’re using the excellent fabric8 Kubernetes client, we can take advantage of the fact that it already has sophisticated watch functionality that, among other things, handles reconnections for us.  So we don’t have to write code around all that: we just need to set up the listing code and provide a Watcher to be notified of watch events.  That will take care of listing and watching behavior and reduce a lot of code we have to write.  Hooray for laziness.

We can also take advantage of Java’s generics to ensure that we strongly type what objects we support (Pods, Deployments, ConfigMaps, and so on).  We can ensure that type information makes it all the way into the internals of our implementation.  The Go code needs to check this at runtime since Go has no support for generics.  That should simplify our code a little bit.  Hooray for more laziness and type safety.

A Reflector, as we said, needs to update a cache in a very specific way.  The Go code models the cache as a store of objects.  If we dig in a little more, though, it turns out that even in the Go code the Reflector only needs the mutating operations to be exposed, not the getters and listers.  Specifically, a reflector will look to add, update and delete objects in the store, and will also look to resynchronize state and to do a full replace of objects in one atomic shot.  But that’s it.

We can reduce this surface area in our idiomatic Java Reflector implementation even more by recognizing that if we start out by modeling the cache as a cache of logical events rather than a cache of arbitrary objects we only need to support one addition method that takes an event as a payload.  Then we have to support a total of three operations:

  • // Make an event from the supplied raw materials; store it
    public void add(final Object source, final Event.Type eventType, final T resource)
  • // Do a full replace of our state
    public void replace(final Collection<? extends T> objects, final Object resourceVersion)
  • // When told to synchronize, look at our known objects and
    // fire synchronization events downstream for each of them
    public void synchronize()

So from the standpoint of what a Reflector needs to do its job, we know what an event cache must look like at a minimum.  In fact, it might look like this EventCache interface from my microbean-kubernetes-controller project on Github!😀

Then our Reflector might look like my Reflector class in the same project.

We can see that it takes in something that is both Listable (and when it’s listed returns a KubernetesResourceList descendant) and VersionWatchable—which can be produced from a KubernetesClient.  Objects satisfying these requirements are roughly equivalent to the Go code’s ListerWatcher with, as we’ve noted, the added benefit that reconnections are automatically handled, and if you can express them properly, then filtering is already handled.  Hooray for even more laziness.

The Reflector constructor also takes in the writable “side” of a cache of Events.  This is where it will “mirror” the events it receives from watching the Kubernetes API server.  This corresponds roughly with the Go code’s notion of a Store, and more specifically with the notion of a DeltaFIFO, but exposes only the mutating methods actually needed by a writeable store of events, not any extras related to reading or listing.  Also, where a DeltaFIFO had to turn a generic “add an arbitrary object” function into an “add an event recording an addition” invocation, we start with the notion that everything being stored is an event, so it’s one fewer step we have to take and one fewer transformation we have to make.

So, implement an EventCache, call kubernetesClient.configMaps() or something equivalent, pass them in to a new Reflector, start it, and you can begin filling your cache with events!

Of course, you’ll want to drain that cache, and also allow it to periodically inform its downstream customers of its state for synchronization purposes.  And a good implementation will sort incoming events into queues, where each queue is effectively the events that have occurred to a particular keyed Kubernetes resource over time.

Fortunately, there is a good implementation.  The stock implementation of this kind of EventCache is EventQueueCollection, which models all this and another part of the Go Store type.  EventQueueCollection lets you pass in a Consumer and start it siphoning off events in a careful, thread-safe manner, much like the Pop() function in the Go code.  Events are stored in queues per object, keyed by an overridable method that determines how keys for a given Kubernetes resource are composed.

The stock Consumer normally used here, an implementation of ResourceTrackingEventQueueConsumer, is a simple Consumer that takes a Map of objects indexed by keys at construction time.  These “known objects” are updated as events are siphoned off the upstream EventQueueCollection, and then those individual events are passed on to the abstract accept() method.  Once an event arrives here, it can be processed directly, or rebroadcast, or whatever; all housekeeping has been taken care of.  We’ll come back to this.

Back to those known objects.  EventQueueCollection takes in the same kind of Map, and this is no coincidence.  Your consumer of events and your EventQueueCollection should share this cache: when your EventQueueCollection gets instructed to synchronize(), it will send synchronization events to any downstream consumers…

…like the ResourceTrackingEventQueueConsumer we were just talking about, who will update that very same Map of known objects appropriately.

This is all tedious boilerplate to set up, so just like the Go code, we put all of this behind a Controller façade.  A Controller marries a Reflector and an EventQueueCollection together with a simple Consumer of events, and arranges for them all to dance together.  If you start the Controller, it will start the Consumer, then start the Reflector, and you will start seeing events flow through the pinball machine.  If you close the Controller, it will close the Reflector, then close the Consumer and you will stop seeing events.  All overridable methods in Reflector andResourceTrackingEventQueueConsumer are “forwarded” into the Controller class, so you can customize it in one place.

So: create a Controller, give it an expression of the “real” Kubernetes resources you want to watch and list, hand it a consumer of mirrored versions of those events, and a Map representing the known state of the world, and you can now process Kubernetes resources just like Kubernetes controllers written in Go.

This is, of course, still too much boilerplate, since in all of this we still haven’t talked about business logic.

Therefore, in a CDI world, this is all a candidate for parking behind a portable extension.  That will be the topic of the next post.  Till then, have fun looking at the microbean-kubernetes-controller project on Github as it develops further.

Understanding Kubernetes’ tools/cache package: part 8

In the previous post we looked at behavioral concerns of the tools/cache package.  You can start at the beginning of the series if you want.

In this post I’d like to begin the process of translating certain Go structures and concepts into idiomatic Java by laying out some of the guiding principles I anticipate using.

But before I do that, I wanted to insert a colorized version of my previous post’s sequence diagram.  In this version of it, I’ve color coded the threads explicitly created by invoking the Run method on sharedIndexInformer.  You’ll note that there are at least six of them, and that doesn’t include any private implementation-level threads that are used by, say, a Kubernetes client or any other machinery buried in the Reflector type:

ControllerSequenceThreadConcerns.png

Once again, you’ll have to squint (thanks, WordPress!) but I’ve also created a full-size Dropbox image as well.

My hope is that color-coding these threads will help us as we figure out the best way to translate the overall tools/cache package to Java.

OK, with that out of the way, let’s look at some linguistic elements and put some Java-related stakes in the ground.

fabric8 Kubernetes Client

First of all, as I look to put together a controller framework in Java that does what the tools/cache package componentry intends, I will be standardizing on the excellent if sparsely documented fabric8 Kubernetes client.  One of the reasons I mention it up front here is because the way that it does listing and watching actually usurps some of the ListerWatcher functionality (covered in part 1).  It also makes use of Java generic types, which is one of the things about Java that I intend to exploit in building this library.

CDI 2.0

The second feature of Java that I intend to use is the standardized CDI 2.0 specification, which, among other things, gives you asynchronous event delivery out of the box.

So on the “left side” of the problem domain, we will attach a DefaultKubernetesClient to an API server and ask it to list and watch things following the recipe that we discovered in part 1.

On the “right side” of the problem domain, we’ll squint hard and re-express a ResourceEventHandler in terms of CDI events.

Translating Go Channels

The Go code makes liberal use of Go channels.  Go channels are (unless explicitly jazzed up otherwise) effectively unbuffered synchronous blocking queues of zero length that block senders until there are receivers, and block receivers until there are senders.  For the most part, we can substitute the careful use of a SynchronousQueue for these throughout.

The Go code also makes a lot of use of “stop channels”, which are mechanisms for telling a blocked thread to stop blocking and clean up.  Java’s idiomatic analog is thread interruption, so we’ll look to see where we can use that instead.

Functional versus Object-Oriented Programming

A more controversial decision is when to ape the Go code and use functions and functional programming everywhere, and when to make use of intelligent object-oriented approaches.  This is much harder to get right, but I believe that there is room for both sorts of approaches.  We’ll lean heavily on use cases here: if one-time fundamental customization is needed, often it’s easier and simpler to subclass, baking the one-time customization into the subclass itself.  If many different kinds of users can use the same class in many different ways, then employing a strategy-like pattern where functions are supplied at construction time may make more sense.

Maps and Sets and hashCode and equals

There are several places in the Go code where items are retrieved by key, or checked for equality semantics.  There may be cases in the Java code where we can lean on equals and hashCode implementations for this sort of thing.

Simplification and Composition

Finally, we’ll be looking hard at the compositional nature of this whole project: while it might be “more correct” to have lots of tiny functional Lego bricks that can be composed into different pieces to form One Of Many True Controller Frameworks™, it may be simpler to start with fewer concepts, or at least present a simplified façade to the end user.  I know from plowing through this codebase and reading comments in the Kubernetes Slack channel for months if not years now that understanding this package puts a serious cognitive load on most developers.  It would be nice to trim that down somewhat for fellow Java developers if we can.

End Goal

What I’d like to end up with is an event-driven programming model for writing Kubernetes controllers in Java, where a developer simply declares interest in Kubernetes resource notifications—events describing lifecycle changes in Kubernetes objects—and can then react to them without having to explicitly “get” or construct anything, or worry about what thread they’re on, and so on.  I envision we can take the whole tools/cache package machinery we’ve just explored over these last several blog posts and put most of it behind a standard CDI portable extension or the equivalent.

In the next post, I hope to start on the Java portion of this journey with translating the Reflector concept using the guideposts I’ve laid out here.  Thanks for reading so far!

Understanding Kubernetes’ tools/cache package: part 7

In part 6 of this series (you can start at the beginning if you want) we arrived at a reasonably complete structural view of sharedIndexInformer and all the concepts it leads to.

Now it’s time to look at the behavioral aspects of all this.

WordPress appears to scale all image uploads, so you’re going to have to squint a lot.  I’ve also created a Dropbox link to the full-size image.

In the diagram below, I’ve used UML 2.0 constructs.  Specifically, filled arrowheads represent synchronous calls, open arrowheads on solid lines represent asynchronous calls, and open arrowheads on dotted lines represent return values.  Frames labeled alt are, following UML 2.0, conditional branch points.  Lastly I’ve simplified a few things with stereotypes that I hope are reasonably self-explanatory.

ControllerSequence

This diagram starts with someone or something calling the Run function on a sharedIndexInformer.  The Run function creates a new DeltaFIFO, passing it the MetaNamespaceKeyFunc as its KeyFunc, and the sharedIndexInformer‘s Indexer (which is also a KeyLister and a KeyGetter, but you can’t tell from just looking at the code).

Then, the Run function creates a new Controller, which I’ve looked at in some detail in part 2, and calls its run function asynchronously.  The sharedIndexInformer‘s Run function now blocks until explicitly closed.

The new Controller‘s run function creates a new Reflector which for the purposes of this series you can just handwave over: trust that by using its embedded ListerWatcher it accurately puts Kubernetes resources into its store, which happens to be the DeltaFIFO created earlier.

At this point, we have, at a high level, a Rube Goldberg machine that replicates Kubernetes resources into a DeltaFIFO.  If a new Pod shows up in Kubernetes, for example, then it shows up in the DeltaFIFO.

Now the Controller‘s run enters an endless-until-explicitly-stopped loop where it calls the Controller‘s processLoop function every second.

The processLoop function drains the DeltaFIFO of the items placed in it on a separate thread by the Reflector.  The DeltaFIFO, in other words, is (as the name states) a buffered queue where the producer is effectively Kubernetes itself by way of a Reflector, and the consumer is (effectively) whatever function was supplied to the Controller when it was built.

So what function was supplied to this particular Controller when it was built?  The sharedIndexInformer‘s handleDeltas function.

The handleDeltas function is thus a queue drainer, and for many behavioral analysis purposes, we can conveniently ignore all that came before.  We know that when this function is called, it has received a set of additions, changes, full replacements or deletions from (effectively) Kubernetes. Here’s what it looks like:

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Added, Updated:
			isSync := d.Type == Sync
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

Based on the kind of thing that it’s working on, it either adds, updates or deletes the event to or from yet another queue, this time the Indexer that was created by the sharedIndexInformer driving this whole train.  Once that Add, Update or Delete call returns, it then calls the distribute function on the sharedIndexInformer‘s associated sharedProcessor.

The sharedProcessor‘s distribute function forwards on the notification to its appropriate processorListeners, effectively multiplexing the notification out.  So if a given Delta object represents an addition, then the add function of a processorListener will be called.

The processorListener add function simply places the incoming notification onto a synchronous queue (a Go channel) named addChAt this point our notification’s journey is at a temporary end.

Meanwhile, back at the sharedIndexInformer ranch, recall that its Run method is still in play.  The third meaningful thing that it does after creating a DeltaFIFO (that’s being filled and drained now) and a Controller (that’s indirectly filling it and draining it) is to call the run function on its sharedProcessor on a separate thread.

The sharedProcessor run function spawns two more threads and then hangs around until told to shut down.  Thread number one calls the run function on each processorListener the sharedProcessor has.  Thread number two calls the pop method on each processorListener.  We’ll look at the processListener run function first.

The processListener run function at a very high level simply pulls any notification off of its nextCh synchronous queue (Go channel) and, depending on what kind it is, finally calls either OnUpdate, OnAdd or OnDelete on the user-supplied ResourceEventHandler.  It’s simple enough I can reproduce it here:

func (p *processorListener) run() {
	defer utilruntime.HandleCrash()

	for next := range p.nextCh {
		switch notification := next.(type) {
		case updateNotification:
			p.handler.OnUpdate(notification.oldObj, notification.newObj)
		case addNotification:
			p.handler.OnAdd(notification.newObj)
		case deleteNotification:
			p.handler.OnDelete(notification.oldObj)
		default:
			utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
		}
	}
}

So what puts things on the nextCh channel?  The pop function.  This function runs until told to shut down explicitly, and pulls incoming notifications off its addCh synchronous queue (Go channel) and puts them on the nextCh channel.

So what puts things on the addCh channel?  Look up a few paragraphs and recall that this was the logical end to our Kubernetes event’s journey: a notification representing the event is placed on addCh by the processorListener add function, invoked by the sharedProcessor‘s distribute function.

This seems like a good place to stop this post.  I encourage you to check out the full size version of the sequence diagram and perhaps print it out or keep it next to you as you reread the series and a lot of the constructs in this package will make more sense.

In the next post, I hope to find some places for simplification as we begin the laborious process of translating this to Java.

Understanding Kubernetes’ tools/cache package: part 6

In part 5 of this series (you can start from the beginning if you like) we put all the structural pieces of the tools/cache package together.  However, I realize I made a mistake and did not cover the sharedProcessor and processorListener structs!  I’ll do that here before moving on to looking at the behavioral aspects of the package.

Let’s look at processorListener first.

To begin with, let’s agree that processorListener is an awful name for anything in software.  Agreed?  OK, good; let’s move on.

A processsorListener is an implementation construct in the tools/cache project that buffers up a set of notifications and distributes them to a ResourceEventHandler (covered in part 3).  If you add a notification, eventually a ResourceEventHandler‘s OnAdd, OnUpdate or OnDelete function will get called on a separate thread.  Its structural code is quite simple:

type processorListener struct {
	nextCh chan interface{}
	addCh  chan interface{}

	handler ResourceEventHandler

	// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
	// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
	// added until we OOM.
	// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
	// we should try to do something better.
	pendingNotifications buffer.RingGrowing

	// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
	requestedResyncPeriod time.Duration
	// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
	// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
	// informer's overall resync check period.
	resyncPeriod time.Duration
	// nextResync is the earliest time the listener should get a full resync
	nextResync time.Time
	// resyncLock guards access to resyncPeriod and nextResync
	resyncLock sync.Mutex
}

A processorListener has a never-ending run function that pulls notifications off of its nextCh Go channel (basically a synchronous blocking queue) and forwards them to its ResourceEventHandler:

func (p *processorListener) run() {
	defer utilruntime.HandleCrash()

	for next := range p.nextCh {
		switch notification := next.(type) {
		case updateNotification:
			p.handler.OnUpdate(notification.oldObj, notification.newObj)
		case addNotification:
			p.handler.OnAdd(notification.newObj)
		case deleteNotification:
			p.handler.OnDelete(notification.oldObj)
		default:
			utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
		}
	}
}

So how do notifications get put on this nextCh?  A processorListener has a pop function that is also never-ending (somewhat surprisingly).  The code is not intuitive to me at all, but if you squint you can see that basically it’s pulling items off of its pendingNotifications ring buffer and putting them on the nextCh Go channel:

func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		case nextCh <- notification:
			// Notification dispatched
			var ok bool
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
				nextCh = nil // Disable this select case
			}
		case notificationToAdd, ok := <-p.addCh:
			if !ok {
				return
			}
			if notification == nil { // No notification to pop (and pendingNotifications is empty)
				// Optimize the case - skip adding to pendingNotifications
				notification = notificationToAdd
				nextCh = p.nextCh
			} else { // There is already a notification waiting to be dispatched
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}

So something has to kick off the run and pop functions.  That job falls to the sharedProcessor.  A sharedProcessor is really simple:

type sharedProcessor struct {
	listenersLock    sync.RWMutex
	listeners        []*processorListener
	syncingListeners []*processorListener
	clock            clock.Clock
	wg               wait.Group
}

It too has a never-ending Run function.  The first thing it does is to kick off its processorListeners’ run and pop functions on separate threads.  Then it blocks and waits for a signal to close:

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	func() {
		p.listenersLock.RLock()
		defer p.listenersLock.RUnlock()
		for _, listener := range p.listeners {
			p.wg.Start(listener.run)
			p.wg.Start(listener.pop)
		}
	}()
	<-stopCh
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	for _, listener := range p.listeners {
		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
	}
	p.wg.Wait() // Wait for all .pop() and .run() to stop
}

OK, so who tells the sharedProcessor‘s run method to do its thing?  The sharedIndexInformer‘s run method.  In there, you’ll find this line:

wg.StartWithChannel(processorStopCh, s.processor.run)

That spawns the sharedIndexInformer‘s sharedProcessor‘s run function in a new thread (and if a signal is sent down the processorStopCh channel then it will stop).

Stepping back for a moment, why all these threads? Why all this complexity?

The best I can do is this:

A processorListener is effectively the guts of a thread that could be blocked for a while by a badly-behaved ResourceEventListener, which is under the end-user’s control.  So you want its “dequeuing” behavior to be on its own thread so that a badly-behaved ResourceEventListener doesn’t accidentally cause the whole pinball machine to stop working while Kubernetes continues to deliver events at a frantic pace.

A sharedProcessor is really a kind of façade that bundles a bunch of processorListeners together and can spray a single notification across all of them, in addition to managing their threading concerns.  In Java, for example, where we have things like the ability to interrupt a thread built in, we could probably blend these two concerns together.  A better name for this thing might be something more like EventDistributor.

A sharedIndexInformer, as previously discussed, has its own threading concerns so as not to slow down the reception of Kubernetes events.

So now that we’ve added the processorListener and sharedProcessor types into the mix, let’s amend our overall structural diagram to include them:

KubernetesControllerBlogPart5

In the next post, we’ll cover the behavioral aspects of all this.

Understanding Kubernetes’ tools/cache package: part 5

In part 4, we looked in some detail at the DeltaFIFO struct, in isolation, which turns logical “add this object, please” requests into “add an event representing an object addition” operation.  That concluded our structural walkthrough of the tools/cache package.  There’s obviously a lot more to look at in terms of behavior, but it’s worth pausing at this point to show the big picture.

In the following diagram, you’ll see an unapologetically rough-and-ready mix of plain-Jane, vanilla UML, some Java concepts, some Go type names—in short, a hodgepodge of notation that should prove helpful in displaying the overall package at a high level.  Again, this is not a one-for-one representation of Go structures, or Java classes, or anything else, but a mix of helpful nouns and boxes and lines that sketches out the general shape of the tools/cache package and hopefully gives a serviceable structural overview of the whole thing:

KubernetesControllerBlogPart5.png

In general, and I’ve probably been inconsistent, orange boxes represent concepts or structures that comprise the tools/cache framework which users are expected to understand or use directly.  Pink boxes represent implementation concerns.  Light blue boxes represent things directly related to Kubernetes.  I’ve tried to use UML properly for everything else.  Most, but not all, names are close enough to the Go code that searching for them in one fashion or another should help you navigate all the .go files.

To process this big picture, you can print it out and read the previous posts in this series starting with part 0.  Start pictorially with the SharedIndexInformer concept/class and follow the arrows.

Textually, a SharedIndexInformer “is a” SharedInformer, which manages a Controller, and, indirectly a Reflector, that uses a Kubernetes client and a ListerWatcher implementation to update a particular kind of Store, namely a DeltaFIFO, with logical events representing modifications, additions and deletions made to listed and watched Kubernetes resources.  The Store it returns from its getStore() method, is actually an Indexer, implemented via the cache type in the Go code.  (Note in particular that its getStore() method returns the return value of its getIndexer() method, which means notably that the Store that the caller has access to is not the DeltaFIFO that it updates internally as an implementation concern.  Also remember that at any given point if you have a Store of any kind in your hands there may be only certain functions you are allowed to call.)

In the next part in this series, we’ll look at the threading and behavioral concerns of the overall tools/cache package.

 

Understanding Kubernetes’ tools/cache package: part 4

In part 3, we dug into the real contract that Controller implementations must obey in order to be useful, and looked into what informers and SharedIndexInformers are.

In this post, we’ll look at DeltaFIFO, since it is at the heart of a lot of this.  We’ll do this mostly in isolation, and then will attempt, probably in a later post, to “plug it back in” to the overall pinball machine so we can make sense of it in a larger context.

From the name DeltaFIFO, we can surmise we’ll be talking about diffs in some capacity, and in the context of queuing.  Let’s look at the contract:

// DeltaFIFO is like FIFO, but allows you to process deletes.
//
// DeltaFIFO is a producer-consumer queue, where a Reflector is
// intended to be the producer, and the consumer is whatever calls
// the Pop() method.
//
// DeltaFIFO solves this use case:
//  * You want to process every object change (delta) at most once.
//  * When you process an object, you want to see everything
//    that's happened to it since you last processed it.
//  * You want to process the deletion of objects.
//  * You might want to periodically reprocess objects.
//
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
// interface{} to satisfy the Store/Queue interfaces, but it
// will always return an object of type Deltas.
//
// A note on threading: If you call Pop() in parallel from multiple
// threads, you could end up with multiple threads processing slightly
// different versions of the same object.
//
// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
// to list keys that are "known", for the purpose of figuring out which
// items have been deleted when Replace() or Delete() are called. The deleted
// object will be included in the DeleteFinalStateUnknown markers. These objects
// could be stale.
//
// You may provide a function to compress deltas (e.g., represent a
// series of Updates as a single Update).
type DeltaFIFO struct {
	// lock/cond protects access to 'items' and 'queue'.
	lock sync.RWMutex
	cond sync.Cond

	// We depend on the property that items in the set are in
	// the queue and vice versa, and that all Deltas in this
	// map have at least one Delta.
	items map[string]Deltas
	queue []string

	// populated is true if the first batch of items inserted by Replace() has been populated
	// or Delete/Add/Update was called first.
	populated bool
	// initialPopulationCount is the number of items inserted by the first call of Replace()
	initialPopulationCount int

	// keyFunc is used to make the key used for queued item
	// insertion and retrieval, and should be deterministic.
	keyFunc KeyFunc

	// deltaCompressor tells us how to combine two or more
	// deltas. It may be nil.
	deltaCompressor DeltaCompressor

	// knownObjects list keys that are "known", for the
	// purpose of figuring out which items have been deleted
	// when Replace() or Delete() is called.
	knownObjects KeyListerGetter

	// Indication the queue is closed.
	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
	// Currently, not used to gate any of CRED operations.
	closed     bool
	closedLock sync.Mutex
}

var (
	_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
)

To boil this down further, let’s conceptually for now ignore the threading concerns, “delta compression” as represented by the deltaCompressor field, and “queue is closed” concerns.

What we’re left with is a type about which we can say:

  • A DeltaFIFO is a Queue of Deltas instances.  It turns out a Deltas type is just a collection of Delta instances.  We’ll see what those are in a moment.
  • The Deltas instances a DeltaFIFO holds are “keyable”.  A string key can be extracted from a given Deltas instance by way of a KeyFunc.
  • Certain Deltas instances inside the DeltaFIFO can be “known”.

Let’s look at what a Delta type is:

// Delta is the type stored by a DeltaFIFO. It tells you what change
// happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
//     state of the object before it was deleted.
type Delta struct {
	Type   DeltaType
	Object interface{}
}

A Delta, in other words, is an event! It is the combination of a verb (the DeltaType) and the payload (the object stored in the Object field).

So a DeltaFIFO is a thread-safe Queue of collections (Deltas) of events (Delta), indexed by some sort of key, where each event (Delta) in the collection represents the addition, modification or removal of a Kubernetes resource.

Or, to put it another way, it’s a queue that turns invocations of a function of the form Add(someObject) into effective invocations of a hypothetical AddEvent(ObjectAdded, someObject) function, and invocations of a function of the form Update(someObject) into effective invocations of a hypothetical AddEvent(ObjectUpdated, someObject) function, and so on.

It’s worth noting that in some sense we’ve kind of come full circle.  Recall that it’s possible to establish Kubernetes watches on lists of resources.  When you do this, you get a stream of WatchEvents, which represent additions, modifications or removals of Kubernetes resources.  So what are we doing now with a DeltaFIFO and Deltas and so on?  Why not just work off WatchEvents directly?

Well, also recall that you need to set up watches in conjunction with an initial listing of Kubernetes resource instances, and that listing is not comprised of WatchEvents.  So from a very high level, we’ve coalesced the concept of a list operation and the concept of a watch operation and expressed them both in terms of Delta instances, which all end up in this DeltaFIFO construct.  This can then be used to distribute events, JavaBean-style, to event handlers.

Let’s see if we can put this back in a limited amount of context.  We’re talking about a queue, so you should be able to add things to it.  It is fundamentally a queue of Delta instances (by way of Deltas instances) internally.  So how do you build a Delta?

As it turns out, a DeltaFIFO builds one for you:

// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *DeltaFIFO) Add(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	return f.queueActionLocked(Added, obj)
}

[snip]
// queueActionLocked appends to the delta list for the object, calling
// f.deltaCompressor if needed. Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}

	// If object is supposed to be deleted (last event is Deleted),
	// then we should ignore Sync events, because it would result in
	// recreation of this object.
	if actionType == Sync && f.willObjectBeDeletedLocked(id) {
		return nil
	}

	newDeltas := append(f.items[id], Delta{actionType, obj})
	newDeltas = dedupDeltas(newDeltas)
	if f.deltaCompressor != nil {
		newDeltas = f.deltaCompressor.Compress(newDeltas)
	}

	_, exists := f.items[id]
	if len(newDeltas) > 0 {
		if !exists {
			f.queue = append(f.queue, id)
		}
		f.items[id] = newDeltas
		f.cond.Broadcast()
	} else if exists {
		// The compression step removed all deltas, so
		// we need to remove this from our map (extra items
		// in the queue are ignored if they are not in the
		// map).
		delete(f.items, id)
	}
	return nil
}

So from a Java modeling standpoint, we have to realize that any generic type that we use in modeling a DeltaFIFO actually has to be two generic types: one would be T, the actual type of the Kubernetes resource being affected, and two would be something like Delta<T>, which would be the “event” type actually stored internally by the queue.

The innards of DeltaFIFO model a set by storing a map of keys as well as an internal slice of objects.  From this we can deduce that duplicates are not permitted in queues of this kind, so it has set semantics.  From a Java perspective, this is an important insight as underneath the covers we’ll likely use some kind of Set implementation.  Also, in Java, Objects have an equals(Object) method, which might allow us to simplify the KeyFunc semantics as well.

Visually, here is a simplistic model of the overall structure:

KubernetesControllerBlogPart4.png

As always, we’ll refine and refactor this model as we go, but it’s useful to sketch out some of the concepts.

In the next post, we’ll try to put some of these pieces back together again.

Understanding Kubernetes’ tools/cache package: part 3

In part 2 we encountered the Controller concept and explored how its lenient contract nevertheless “expects” that processing ability is added to the capabilities offered by its associated Reflector.  You may want to start with part 0 if you haven’t been following along in this series to understand what it’s all about.

In this post, we’ll look in a little more detail at two things:

  1. the de facto standard implementation of the Controller type (which, strictly speaking is only one of many possibilities, but which colors the Controller concept’s expectations, unfortunately)
  2. the notional idea of an informer, and the concrete concept of a SharedInformer, specifically a SharedIndexInformer

controllerstruct-Backed Controller Implementation

We’ve seen that the Controller type is extraordinarily underspecified.  All you need to do to have a Controller is to implement three undocumented functions: Run, HasSynced and LastSyncResourceVersion.  Technically speaking you could implement them any way you want.  Pragmatically speaking, though, the controllerstruct-backed implementation of this type, its de facto standard reference implementation, also shows that there is an implicit requirement that any Controller implementation will, as part of its Run function implementation, process a Queue which it is also assumed to have, which is additionally assumed to be the “target” of a Reflector it is assumed to have.  That’s a lot of assumptions; as we model this in Java we’ll formalize some of them.

Let’s look into the details of queue processing.  First, recall that a Queue is just a Store with the ability to Pop.  See part 2 for more details.

In controller.go, to keep our mental stack clean, let’s just look at the processLoop function in isolation:

// processLoop drains the work queue.
// TODO: Consider doing the processing in parallel. This will require a little thought
// to make sure that we don't end up processing the same object multiple times
// concurrently.
//
// TODO: Plumb through the stopCh here (and down to the queue) so that this can
// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would
// also be helpful.
func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == FIFOClosedError {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

Since this is Go and the function starts with a lowercase p, we know it is private to the controllerstruct-backed implementation.  For now, we’ll trust that this is called from somewhere useful within that implementation.

In this loop, we can sort of see intuitively that an object is popped off the Queue supplied at creation time to this Controller implementation, and is handed to some sort of processor function which was also supplied at creation time to this Controller implementation.  Then, if there was an error, and we’re supposed to retry, we re-enqueue the object.

More specifically, PopProcessFunc(c.config.Process) turns out to be a Go type conversion that, in this case, converts the ProcessFunc stored in the Config supplied to this Controller implementation at creation time to a PopProcessFunc defined by the fifo.go file.  Recall that c.config.Process is of type ProcessFunc.  (The two types seem, to this Go rookie, at any rate, equivalent.)

From all this, and looking at it from a Java programmer’s perspective, we’re starting to see an abstract class—not an interface—emerge.  There are certain things that a Controller implementation is really expected to do, in nearly all cases, whether they’re called out in the contract or not (they’re not).  Specifically, its mandated Run function is, clearly, “supposed to” invoke logic one way or another equivalent to that outlined in the processLoop above.  And it’s expected in one fashion or another that the Controller implementation will be working with a Queue that can be drained.  And there’s an expectation that Run will spawn parallel computation to accomplish all this.  We’ll file this away for now, but now we know in quite some detail exactly what a Controller implementation is expected to do, even if those things aren’t really called out in its contract.

One question that might occur to you at this point is: aren’t we done?  Don’t we have a mechanism to reflect the Kubernetes API server via lists and watches into a cache, and, with what we now know about Controller implementations, the ability to process that cache?  Doesn’t that give us a framework to receive and act on Kubernetes resource specification messages, as discussed in part 0?

For now, accept that the answer is no, and so push everything you’ve learned so far onto your mental stack under the umbrella term of Controller.  Recall briefly that lurking underneath it are concepts like Reflectors, Stores, Queues, ListerWatchers, ProcessFuncs and the like, but for now you can wrap them all up into Controller and free up some space for what comes next: informers.

Informers

You might have picked up on a filename naming pattern at this point: Controller is defined in controller.go; ListerWatcher is defined in listwatch.go and so on.  But you won’t find an informer.go file.  And you won’t find an Informer type.  Nevertheless, there is an informer concept, which you can piece together from logical, but not concrete, extensions of it.

First, let’s look at the NewInformer function, defined in controller.go:

// NewInformer returns a Store and a controller for populating the store
// while also providing event notifications. You should only used the returned
// Store for Get/List operations; Add/Modify/Deletes will cause the event
// notifications to be faulty.
//
// Parameters:
//  * lw is list and watch functions for the source of the resource you want to
//    be informed of.
//  * objType is an object of the type that you expect to receive.
//  * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
//    calls, even if nothing changed). Otherwise, re-list will be delayed as
//    long as possible (until the upstream source closes the watch or times out,
//    or you stop the controller).
//  * h is the object you want notifications sent to.
//
func NewInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
) (Store, Controller) {
	// This will hold the client state, as we know it.
	clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)

	// This will hold incoming changes. Note how we pass clientState in as a
	// KeyLister, that way resync operations will result in the correct set
	// of update/delete deltas.
	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState)

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    lw,
		ObjectType:       objType,
		FullResyncPeriod: resyncPeriod,
		RetryOnError:     false,

		Process: func(obj interface{}) error {
			// from oldest to newest
			for _, d := range obj.(Deltas) {
				switch d.Type {
				case Sync, Added, Updated:
					if old, exists, err := clientState.Get(d.Object); err == nil && exists {
						if err := clientState.Update(d.Object); err != nil {
							return err
						}
						h.OnUpdate(old, d.Object)
					} else {
						if err := clientState.Add(d.Object); err != nil {
							return err
						}
						h.OnAdd(d.Object)
					}
				case Deleted:
					if err := clientState.Delete(d.Object); err != nil {
						return err
					}
					h.OnDelete(d.Object)
				}
			}
			return nil
		},
	}
	return clientState, New(cfg)
}

As you can see, there’s no such thing as an informer.  But you can also see that there’s an implicit construct here: a particular kind of Controller, really, whose associated Queue is something called a DeltaFIFO (which we’ll get to later), that therefore works on deltas, and that has some sort of notion of event handlers that are notified after a given delta is “converted” back into its respective object-verb combination.  We see our old friends ListerWatcher and Config in there as well: squinting a bit, you can see that you are supplying the raw materials for a Reflector and its containing Controller as well as for a custom ProcessFunc that delegates to the event handler stuff by way of an additional transient Store representing the client’s state and permitting this pinball machine to determine whether a particular element being processed was an addition, modification or a deletion.

Physically, the NewInformer function simply leverages Go’s ability to have multiple return types and returns a Controller and its associated Store, not an Informer since there is no such thing, and adds some unenforced requirements about the usage of the Store it returns (i.e. don’t modify it).  A client could “throw away” the Controller and use just the returned Store, maybe in conjunction with the event handler it supplied, or maybe not, and that Store would serve as a convenient cache of the Kubernetes API server.

Anyway, from all this we can posit that conceptually an informer “is a” Controller together with the ability to distribute its Queue-related operations to an appropriate event handler.  This will help us when we model this in Java, and, to Java programmers, should start to look a little bit like good old JavaBeans event listeners.  This is a particularly good insight as ultimately where we’d like to get to is for some Java programmer to write some sort of method that is called when an addition, modification or deletion is found, without that programmer having to worry about all the multithreaded queue manipulation we’ve encountered so far.  It also helps because this sort of thing is one of those cases where Java can end up helping us write simpler code.

As you might expect, there are different concrete kinds of (definitionally notional) informers.  We’ll look at one in particular, but know that there are others.  The one that we’ll look at is called a SharedIndexInformer, and can be found in the shared_informer.go file.

SharedIndexInformer

A SharedIndexInformer is, itself, a SharedInformer implementation that adds the ability to index its contents.  I mention this with all of its forward references just to set the stage: you should be asking questions like: what’s shared?  What’s being indexed?  Why do we need to share things? and so on.

Let’s look first at the SharedInformer contract:

// SharedInformer has a shared data cache and is capable of distributing notifications for changes
// to the cache to multiple listeners who registered via AddEventHandler. If you use this, there is
// one behavior change compared to a standard Informer.  When you receive a notification, the cache
// will be AT LEAST as fresh as the notification, but it MAY be more fresh.  You should NOT depend
// on the contents of the cache exactly matching the notification you've received in handler
// functions.  If there was a create, followed by a delete, the cache may NOT have your item.  This
// has advantages over the broadcaster since it allows us to share a common cache across many
// controllers. Extending the broadcaster would have required us keep duplicate caches for each
// watch.
type SharedInformer interface {
	// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
	// period.  Events to a single handler are delivered sequentially, but there is no coordination
	// between different handlers.
	AddEventHandler(handler ResourceEventHandler)
	// AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
	// specified resync period.  Events to a single handler are delivered sequentially, but there is
	// no coordination between different handlers.
	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
	// GetStore returns the Store.
	GetStore() Store
	// GetController gives back a synthetic interface that "votes" to start the informer
	GetController() Controller
	// Run starts the shared informer, which will be stopped when stopCh is closed.
	Run(stopCh <-chan struct{})
	// HasSynced returns true if the shared informer's store has synced.
	HasSynced() bool
	// LastSyncResourceVersion is the resource version observed when last synced with the underlying
	// store. The value returned is not synchronized with access to the underlying store and is not
	// thread-safe.
	LastSyncResourceVersion() string
}

This tells us what any SharedInformer must do. Recall that conceptually an informer, which has no Go construct to represent it, despite the documentation’s mention of a “standard Informer“, which does not exist, is a combination of a Controller and an event distribution mechanism. A SharedInformer is a kind of informer that can support many event handlers. You may also notice that it happens to have the same function signatures as Controller, which in Go terms means that it behaves like a Controller, or, for all intents and purposes, effectively is one.  Lastly, the Shared part of the term SharedInformer somewhat clumsily refers to the fact that since this particular informer construct can have many event handlers, then the single cache that it is built with—the Queue housed by the Controller it was built with, in most cases—becomes “shared” between those handlers as a result and by default.

All that SharedIndexInformer adds to the picture is the ability to locate items in its cache by various keys:

type SharedIndexInformer interface {
	SharedInformer
	// AddIndexers add indexers to the informer before it starts.
	AddIndexers(indexers Indexers) error
	GetIndexer() Indexer
}

Here you need to pay very close attention to singular and plural nouns.  Note that what you add is an Indexers, plural, and what you get is an Indexer, singular.  Let’s look at what those are.

We’ll start with Indexers, the plural, since oddly enough it turns out to be the singular item, and is not, most notably, a bunch of Indexer instances!

This is actually simply a kind of map (singular) and can be found in the index.go file:

// Indexers maps a name to a IndexFunc
type Indexers map[string]IndexFunc

An IndexFunc in turn is just a mapping function:

// IndexFunc knows how to provide an indexed value for an object.
type IndexFunc func(obj interface{}) ([]string, error)

So hand it a Kubernetes resource, for example, and it will give you back a set (I’m assuming) of string values corresponding to it in some way.

An Indexers, a singular object with a plural name, is therefore a simple map of such functions each of which is, in turn, indexed under its own string key somewhere.

So you can add a bunch of these maps to a SharedIndexInformer for its usage.

An Indexer, a singular noun describing an aggregate concept (!), is a collection (!) of such Indexers instances with some additional aggregate behavior layered on top:

// Indexer is a storage interface that lets you list objects using multiple indexing functions
type Indexer interface {
	Store
	// Retrieve list of objects that match on the named indexing function
	Index(indexName string, obj interface{}) ([]interface{}, error)
	// IndexKeys returns the set of keys that match on the named indexing function.
	IndexKeys(indexName, indexKey string) ([]string, error)
	// ListIndexFuncValues returns the list of generated values of an Index func
	ListIndexFuncValues(indexName string) []string
	// ByIndex lists object that match on the named indexing function with the exact key
	ByIndex(indexName, indexKey string) ([]interface{}, error)
	// GetIndexer return the indexers
	GetIndexers() Indexers

	// AddIndexers adds more indexers to this store.  If you call this after you already have data
	// in the store, the results are undefined.
	AddIndexers(newIndexers Indexers) error
}

Clear as mud, right?  So from a SharedIndexInformer you can get its one true Indexer that is logically comprised of various Indexers instances added by way (hopefully) of that SharedIndexInformer‘s AddIndexers function, although it would also be possible to add them from the Indexer directly.

One other extremely important thing to notice here is that an Indexer is also a Store!  But pay very careful attention to how it is used in its capacity as a Store.

Specifically, let’s first recall that a non-shared informer—which has no Go language reification—is conceptually a combination of a Controller and a Store.  We have seen, for example, that the NewInformer function returns both a Controller and a Store that is attached to that Controller; the combination gives you an automatically populating cache.

In SharedIndexInformer, the Store into which Kubernetes API server events are reflected is a DeltaFIFO, not an Indexer.  But an Indexer is supplied to that DeltaFIFOand is the return value of the SharedIndexInformer‘s GetStore function!  This tells us more about the implied contract of GetStore: clearly the Store it returns must not be used for modification!

Another extremely important thing to note is that any Store is also a KeyListerGetter, a KeyLister and a KeyGetter.   A KeyListerGetter is a combination of the KeyLister and KeyGetter types.  A KeyLister is anything that can list its keys, and a KeyGetter is not something that can get a key, but something that houses other things that can be retrieved by key (like some sort of map).

So rewinding a bit, when SharedIndexInformer creates a new DeltaFIFO to use as its Store, and, in turn, supplies an Indexer to that DeltaFIFO, it is supplying that Indexer only in its capacity as a KeyListerGetter.  Similarly, its GetStore method probably should really return something closer to a KeyListerGetter than an actual Store, since calling its modification methods is prohibited.

So clearly to understand more here, we’re going to have to dive in to DeltaFIFO, which we’ve now seen is at the heart of all of this, and will be the subject of the next post.

Let’s review what we’ve come up with here so far:

  • A Controller really kind of has to behave like the reference controllerstruct-backed Controller implementation.  Specifically it needs to manage a Queue, which is often a DeltaFIFO, and both populate and drain it, and incorporate an event listener mechanism.
  • The idea of an informer exists, which is a combination of a Controller and the Store that it’s hooked up to, but does not physically exist except in “shared” form.
  • A SharedIndexInformer instantiates the concept of an informer with multiple event handlers and, by definition, a single Queue, a DeltaFIFO, that they share.

We might model this visually as follows:

KubernetesControllerBlogPart3.png

As always, we’ll refine and refactor this model as we go along.

Onwards to deltas and DeltaFIFO.

Understanding Kubernetes’ tools/cache package: part 2

In the previous post in this long-running series, we looked at some of the foundations underlying Kubernetes controllers, and started looking into the concepts behind the tools/cache package, specifically ListerWatcher, Store and Reflector.  In this post we’ll look at the actual concept of a Controller.

So far, I’ve been careful about capitalization.  I’ve written “Kubernetes controller”, not “Kubernetes Controller” or “Kubernetes Controller“.  That’s been on purpose.  That’s in part because the tools/cache package has a Controller type, which logically sits in front of Reflector.  You can see for yourself:

// Controller is a generic controller framework.
type controller struct {
	config         Config
	reflector      *Reflector
	reflectorMutex sync.RWMutex
	clock          clock.Clock
}

type Controller interface {
	Run(stopCh <-chan struct{})
	HasSynced() bool
	LastSyncResourceVersion() string
}

Note, interestingly, that strictly speaking all you have to do to implement the Controller type is to supply a Run() function (for other Java programmers, the stopCh “stop channel” is Go’s way of (essentially) allowing interruption), a HasSynced() function that returns true if, er, synchronization has been accomplished (we’ll look into what that means later), and a LastSyncResourceVersion() function, which returns the resourceVersion of the Kubernetes list of resources being watched and reflected.

An interesting point here is that this means that whereas Reflector was a generic interface completely decoupled from Kubernetes, this interface is conceptually coupled to Kubernetes (note the inclusion of the term Resource and the concept of a resource version, both of which are concepts from the Kubernetes ontology).  This observation can help us later on with refining our Java model.

Next, look at the controller struct, which is the state-bearing portion of a Controller implementation.  It takes in a Config representing its configuration, and has a “slot” for a Reflector, along with unimportant-to-the-end-user bits related to testing and thread safety.

So what does a Controller do, exactly, that a Reflector doesn’t already do?

A clue to the answer lies in the Config type, which is used as an implementation detail by only one particular implementation of the Controller type:

// Config contains all the settings for a Controller.
type Config struct {
	// The queue for your objects; either a FIFO or
	// a DeltaFIFO. Your Process() function should accept
	// the output of this Queue's Pop() method.
	Queue

	// Something that can list and watch your objects.
	ListerWatcher

	// Something that can process your objects.
	Process ProcessFunc

	// The type of your objects.
	ObjectType runtime.Object

	// Reprocess everything at least this often.
	// Note that if it takes longer for you to clear the queue than this
	// period, you will end up processing items in the order determined
	// by FIFO.Replace(). Currently, this is random. If this is a
	// problem, we can change that replacement policy to append new
	// things to the end of the queue instead of replacing the entire
	// queue.
	FullResyncPeriod time.Duration

	// ShouldResync, if specified, is invoked when the controller's reflector determines the next
	// periodic sync should occur. If this returns true, it means the reflector should proceed with
	// the resync.
	ShouldResync ShouldResyncFunc

	// If true, when Process() returns an error, re-enqueue the object.
	// TODO: add interface to let you inject a delay/backoff or drop
	//       the object completely if desired. Pass the object in
	//       question to this interface as a parameter.
	RetryOnError bool
}

// ShouldResyncFunc is a type of function that indicates if a reflector should perform a
// resync or not. It can be used by a shared informer to support multiple event handlers with custom
// resync periods.
type ShouldResyncFunc func() bool

// ProcessFunc processes a single object.
type ProcessFunc func(obj interface{}) error

(As you read the excerpt above, bear in mind that it is in the controller.go file, but nevertheless contains lots of documentation forward references to types and concepts from other files we haven’t encountered yet.)

So not all Controller implementations have to use this.  In fact, Controller is completely undocumented!  But realistically, the only Controller implementation that matters, the one returned by the tersely-named New() function, backed by a controller structdoes use it, so we’d better understand it thoroughly.

The first thing to notice is that a Config struct contains a Queue.  We can track down the definition for Queue in fifo.go:

// Queue is exactly like a Store, but has a Pop() method too.
type Queue interface {
	Store

	// Pop blocks until it has something to process.
	// It returns the object that was process and the result of processing.
	// The PopProcessFunc may return an ErrRequeue{...} to indicate the item
	// should be requeued before releasing the lock on the queue.
	Pop(PopProcessFunc) (interface{}, error)

	// AddIfNotPresent adds a value previously
	// returned by Pop back into the queue as long
	// as nothing else (presumably more recent)
	// has since been added.
	AddIfNotPresent(interface{}) error

	// Return true if the first batch of items has been popped
	HasSynced() bool

	// Close queue
	Close()
}

So loosely speaking any Queue implementation must also satisfy the Store contract.  In Java terms, this means a hypothetical Queue interface would extend Store.  Let’s file that away for later.

We can also see some concept leakage here: recall that Controller implementations must have a HasSynced function, but its purpose and reason for being are undocumented.  When we look back at the controllerstruct-backed implementation of Controller, one possible implementation of the Controller type, we can see that the implementation of its HasSynced function merely delegates to that of the Queue contained by its Config.  So there is a tacit assumption that a Controller implementation will most likely be backed by a Queue, though this is not strictly speaking required, since that would be the easiest way to implement the HasSynced function.  This also serves as the only documentation we’re going to get about what that function is supposed to do: Return true if the first batch of items has been popped.

Back to the Config.  It also contains a ListerWatcher.  Hey!  We’ve seen one of these before.  We had realized that it is a core component of a Reflector.  So why doesn’t a Config merely have a Reflector?  Why is encapsulation broken here—isn’t ListerWatcher basically an implementation detail of a Reflector?  Yes, and there doesn’t seem to be a good reason.  We can tell from some source code later on that when the controllerstruct-backed implementation of Controller‘s Run function is called, only one possible implementation of such a function, it creates a Reflector just-in-time using the ListerWatcher housed by the Config.  Why the Reflector isn’t passed in as part of the Config is an open question.  At any rate, logically speaking, part of a controllerstruct-backed implementation of the Controller interface is a Reflector.

Next up is the first actually interesting part that we really haven’t encountered before, which gives us a better idea of what a Controller is supposed to do: the ProcessFunc.  A ProcessFunc appears, from its documentation, to do something to a Kubernetes resource:

ProcessFunc processes a single object.

So even from this little bit of documentation we can see that ultimately a Controller implementation that happens to use a Config (remember, it’s not required to) will not only cause caching of Kubernetes resources into a Store (remember, it’s not required to), but will presumably work on objects found in that Store as well.  These are shaky assumptions, and not enforced by any contract, but turn out to be quite important, not just to the de facto standard implementation of the Controller type (the controllerstruct-backed one that uses a Config), but to the implied, but not technically specified, contract of the Controller type itself.

Summed up, most Controller implementations probably should use a Reflector to populate a Queue (a particular kind of Store) with Kubernetes resources of a particular kind, and then also processes the contents of that Queue, presumably by Popping objects off of it.

Again, it is worth noting that this summary is based on one particular implementation of the type (the de facto standard controllerstruct-backed one), and not on the type’s contract itself, but elsewhere in the package you will be able to see that this is expected, if not enforced, behavior of any Controller implementation.

We might visually model all this like so:

KubernetesControllerBlogPart2.png

Here, a Controller uses a Reflector to populate a Queue, and has a protected process method that knows how to do something with a given object.  It also has a protected​ shouldResync() method, a public hasSynced() method, and the ability to report what the last Kubernetes resource version was after a synchronization.  We’ll greatly refine and refactor this model over time.

In the next post, we’ll look at informers and shared informers which build on top of these foundations, again with an eye towards modeling all this idiomatically in Java.