Understanding Kubernetes’ tools/cache package: part 11—Towards a Kubernetes controller CDI 2.0 portable extension

In part 10, we laid out what we need to do to make a simple programming model available to an end user who wants to be notified of Kubernetes events in the same style as Go programmers using the tools/cache package.  This series starts with part 0.

Here we’ll look at implementing a CDI 2.0 portable extension that enables this lean programming model.

From part 10, we sketched out the idea of pairing an observer method with a producer method, using the same qualifiers on both the producer method’s return type and on the observer method’s event type.  Linked in this way, you now have the ability to build boilerplate machinery in between the Listable-and-VersionWatchable made by the producer method and the event observed by the observer method, and the end user doesn’t have to know about or invoke any of it: her events “just work” assuming this extension is present, opaquely, at runtime.

This post will make use of the microbean-kubernetes-controller framework, the Java Kubernetes controller framework built up over the course of this series (you can start at the beginning if you like).  This framework is CDI-independent.  We’ll adapt it to CDI 2.0 so end users don’t have to deal with any boilerplate.

As discussed, our portable extension will have to:

  • find beans that have Listable and VersionWatchable among their bean types
  • find observer methods observing Event objects that contain Kubernetes resources in them
  • ensure that for any given observer method whose event parameter is qualified with a set of qualifier annotations, there exists a producer method (or any other kind of CDI bean, actually) with the right kind of type that is also qualified by those same qualifier annotations (or no event delivery will take place)
  • have some way of recognizing that arbitrary qualifier annotations defined by the user are those that should be used to identify Kubernetes events—i.e. some way of meta-qualifying a qualifier annotation that appears on relevant producer methods and relevant event parameters

We can do all these things thanks to the well-defined lifecycle mandated by CDI.

First, we’ll sketch out a meta-qualifier annotation named KubernetesEventSelector.  The intent of this meta-annotation will be that if you put it on a qualifier annotation that you create, then your qualifier annotation will be treated as a qualifier that is trying to say something specifically about sets of Kubernetes events.

Here’s what that might look like:


import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import javax.inject.Qualifier;
@Documented
@Retention(value = RetentionPolicy.RUNTIME)
@Target({ ElementType.ANNOTATION_TYPE })
public @interface KubernetesEventSelector {
}

view raw

part11-6.java

hosted with ❤ by GitHub

Now as an end user if I define my own qualifier annotation and annotate it with KubernetesEventSelector:


@Documented
@KubernetesEventSelector
@Qualifier
@Retention(value = RetentionPolicy.RUNTIME)
@Target({ ElementType.METHOD, ElementType.PARAMETER })
public @interface AllConfigMapEvents {
}

view raw

part11-6.java

hosted with ❤ by GitHub

…I will have expressed my intent that when we “see” this annotation:

@AllConfigMapEvents

…we are talking about choosing Kubernetes events in some handwavy manner, and not something else.

So now our as-yet-to-be-written portable extension has at least the theoretical means to find event parameters and beans and see if they’re qualified with qualifiers that, in turn, are annotated with @KubernetesEventSelector.  Those will be the only things of interest to this extension.

Next, we’ll take advantage of the CDI lifecycle and the prescribed order of things and recognize that bean processing happens before observer method processing.  So let’s start with looking for those beans that have Listable and VersionWatchable among their bean types.

We’re going to run into a problem right away.  We would like to look for beans whose bean types are of type X where X is some type that extends Listable and VersionWatchable, but otherwise we don’t really care what X actually is.  But producer methods must return concrete types without type variables in them.

What we’ll do instead is look for the most general type in the fabric8 Kubernetes model hierarchy that implements or extends both Listable and VersionWatchable and deal with that.  That type happens to be Operation.  We’ll keep track during bean processing of all Beans that match these types.  That means it doesn’t matter if the bean in question is a producer method, a producer field, a managed bean, a synthetic bean or any of the other kinds of beans you can cause to come into existence in a CDI world: they’ll all be accounted for.

Now, obviously we’re not actually interested in all beans whose bean types include Operation.  We’re interested in those that have been explicitly “tagged” with the KubernetesEventSelector annotation.

It would be nice if there were a convenience method of sorts in CDI that would let you ask a bean for its qualifiers but would restrict the returned set to only those qualifiers that, in turn, are meta-qualified with something else (like KubernetesEventSelector).  Unfortunately, there is not, so we’ll have to do this ourselves.

So, refined: we’ll keep track of Beans with Operation (and hence by definition Listable and VersionWatchable) among their bean types, and with KubernetesEventSelector-qualified qualifiers.  We’ll keep track of those annotations, too.

Once we do this, we’ll have some machinery that can now pick out sources for raw materials that we’ll need to feed to a Reflector.

But obviously we don’t really want to do any of this if there aren’t any observer methods around.  Fortunately but also unfortunately, bean processing happens before observer method processing, so we will do this work, and then discard it if we don’t find any relevant observer methods.

What’s a relevant observer method?  In this case it’s an observer method whose (a) AbstractEvent-typed event parameter is (b) qualified by one or more qualifier annotations that are themselves (c) meta-qualified by KubernetesEventSelector, and that (d) “matches” an equivalent set of qualifiers on an Operation-typed bean type.  This means that indirectly the observer method will “watch” for what the producer method is indirectly providing access to.

Here’s an example observer method:


import javax.enterprise.event.Observes;
import io.fabric8.kubernetes.api.model.HasMetadata;
import org.microbean.kubernetes.controller.Event;
private final void onConfigMapEvent(@Observes @AllConfigMapEvents final Event<? extends HasMetadata> event) {
}

view raw

part11-7.java

hosted with ❤ by GitHub

Note the usage of the @AllConfigMapEvents qualifier we defined earlier in this blog post, and note that the event parameter so qualified is an Event.  Let’s say we found a “matching” producer method:


import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapList;
import io.fabric8.kubernetes.api.model.DoneableConfigMap;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.Operation;
import io.fabric8.kubernetes.client.dsl.Resource;
@Produces
@ApplicationScoped
@AllConfigMapEvents
private static final Operation<ConfigMap, ConfigMapList, DoneableConfigMap, Resource<ConfigMap, DoneableConfigMap>> selectAllConfigMaps(final KubernetesClient client) {
return client.configMaps();
}

view raw

part11-8.java

hosted with ❤ by GitHub

Note the Operation return type and the matching qualifier-meta-qualified-with-KubernetesEventSelector that decorates it (@AllConfigMapEvents).  Do you see how it “matches” the observer method’s event parameter qualifier (also @AllConfigMapEvents)?  This is the linkage that connects this producer method with the observer method that observes events that will result from the producer method’s existence.

So once we have a mapping from a Bean to a set of relevant qualifier annotations, then we can see if, during observer method processing, we’re talking about the first observer method that should cause our boilerplate machinery to get invoked.  If there are many observer methods that “match” a relevant producer method, that’s fine; that just indicates that the very same Kubernetes event might be processed by many observers.  The point is, before we jump through all the hoops to set up lists and watches and so on, we should make sure that someone is actually going to take advantage of our work.

So once we “get a match”, we can build the boilerplate for all matches of the same type, once.

For each distinct match, we’ll need a new Controller.  The controller will need an instance of whatever the producer method is making (some kind of Operation, which is by definition both a Listable and a VersionWatchable).

The Controller, in turn, will need some kind of Map representing Kubernetes objects we “know about”.  That Map will be empty to start with, since we don’t know about any Kubernetes objects yet.  But if, down the road, a deletion gets skipped (maybe due to a watch timing out or a disconnect), we need to retroactively fire a delete if, say, the Kubernetes list of relevant objects is “less than” our “known objects”, and we’ll need to know what object was actually deleted.

The Controller will also need some kind of Consumer (a siphon) that will slurp up EventQueues and dispatch their events in order without blocking threads.  That Consumer will also need to update that very same Map of “known objects” as it does so.

Finally, that Consumer of EventQueues will also need a mechanism to link it to the native CDI event broadcast mechanism which will take care of actually dispatching events to observer methods.

Whew!  Let’s do the easy boring stuff first.

The first lifecycle event that looks good is ProcessBean.  If we could process all beans whose types are Operation here, in one place, regardless of whether the bean in question is a producer method, managed bean, etc. that would be great.  As it turns out for various obscure reasons, we can’t.  So we’ll implement a lifecycle event observer method for each type of bean, and route all their stuff to a common private method for processing:


private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processProducerMethod(@Observes final ProcessProducerMethod<X, ?> event, final BeanManager beanManager) {
if (event != null) {
// (We'll define this in a second.)
this.processPotentialEventSelectorBean(event.getBean(), beanManager);
}
}
private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processProducerField(@Observes final ProcessProducerField<X, ?> event, final BeanManager beanManager) {
if (event != null) {
this.processPotentialEventSelectorBean(event.getBean(), beanManager);
}
}
private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processManagedBean(@Observes final ProcessManagedBean event, final BeanManager beanManager) {
if (event != null) {
this.processPotentialEventSelectorBean(event.getBean(), beanManager);
}
}
private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processSyntheticBean(@Observes final ProcessSyntheticBean event, final BeanManager beanManager) {
if (event != null) {
this.processPotentialEventSelectorBean(event.getBean(), beanManager);
}
}

view raw

part11-1.java

hosted with ❤ by GitHub

And the processPotentialEventSelectorBean method that these all effectively delegate to might look like this:


private final void processPotentialEventSelectorBean(final Bean<?> bean, final BeanManager beanManager) {
if (bean != null) {
final Type operationType = getOperationType(bean);
if (operationType != null) {
final Set kubernetesEventSelectors = Annotations.retainAnnotationsQualifiedWith(bean.getQualifiers(), KubernetesEventSelector.class, beanManager);
if (kubernetesEventSelectors != null && !kubernetesEventSelectors.isEmpty()) {
this.eventSelectorBeans.put(kubernetesEventSelectors, bean);
}
}
}
}

view raw

part11-3.java

hosted with ❤ by GitHub

…where the Annotations utility method can be found elsewhere, and where the getOperationType() method might look like this:


private static final Type getOperationType(final Bean<?> bean) {
final Type returnValue;
if (bean == null) {
returnValue = null;
} else {
final Set<Type> beanTypes = bean.getTypes();
assert beanTypes != null;
assert !beanTypes.isEmpty();
Type candidate = null;
for (final Type beanType : beanTypes) {
if (beanType instanceof ParameterizedType) {
final Type rawType = ((ParameterizedType)beanType).getRawType();
if (rawType instanceof Class && Operation.class.equals((Class<?>)rawType)) {
candidate = beanType;
break;
}
}
}
returnValue = candidate;
}
return returnValue;
}

view raw

part11-2.java

hosted with ❤ by GitHub

So at the end of all this we’ll have a Map, eventSelectorBeans, that has as its keys Sets of Annotation instances that have KubernetesEventSelector somewhere “on” them, and has as its values Beans that are qualified with those annotations.

Now we’ll need to route all observer method processing to something similar (fortunately, this is a great deal simpler).  Recall that we are guaranteed by contract that observer method processing happens after discovering beans, so by this point we already have knowledge of any producer methods or managed beans “making” Operation instances of various kinds:


private final void processObserverMethod(@Observes final ProcessObserverMethod<? extends org.microbean.kubernetes.controller.Event<? extends HasMetadata>, ?> event, final BeanManager beanManager) {
if (event != null) {
this.processPotentialEventSelectorObserverMethod(event.getObserverMethod(), beanManager);
}
}
private final void processSyntheticObserverMethod(@Observes final ProcessSyntheticObserverMethod<? extends org.microbean.kubernetes.controller.Event<? extends HasMetadata>, ?> event, final BeanManager beanManager) {
if (event != null) {
this.processPotentialEventSelectorObserverMethod(event.getObserverMethod(), beanManager);
}
}
private final void processPotentialEventSelectorObserverMethod(final ObserverMethod<? extends org.microbean.kubernetes.controller.Event<? extends HasMetadata>> observerMethod, final BeanManager beanManager) {
if (observerMethod != null) {
final Set<Annotation> kubernetesEventSelectors = Annotations.retainAnnotationsQualifiedWith(observerMethod.getObservedQualifiers(), KubernetesEventSelector.class, beanManager);
if (kubernetesEventSelectors != null && !kubernetesEventSelectors.isEmpty()) {
if (observerMethod.isAsync()) {
if (!this.asyncNeeded) {
this.asyncNeeded = true;
}
} else if (!this.syncNeeded) {
this.syncNeeded = true;
}
this.beans.add(this.eventSelectorBeans.remove(kubernetesEventSelectors));
}
}
}

view raw

part11-4.java

hosted with ❤ by GitHub

(Above, we also keep track of whether the observer method in question is synchronous or asynchronous.)

Finally, we can clean up a little bit after bean discovery is all done:


private final void processAfterBeanDiscovery(@Observes final AfterBeanDiscovery event) {
if (event != null) {
this.eventSelectorBeans.clear();
}
}

view raw

part11-5.java

hosted with ❤ by GitHub

At the end of all of this bean discovery, we have a Set of Bean instances stored in a beans instance variable that (a) “make” Operation instances, (b) are appropriately meta-qualified to declare their interest in taking part in Kubernetes event production, and (c) have at least one observer method interested in their effects.

Let’s build some controllers and start them when the user’s program comes up!

We’ll begin by observing the initialization of the application scope, which is exactly equal to the start of a CDI application.

When this happens, our beans instance variable will contain some of the materials to build a Controller.  For each Bean in there, we’ll “instantiate” it (by way of a beanManager.getReference() call, create a new Controller, hook that Controller up to the right kind of EventDistributor, start the Controller and keep track of it for later cleanup:


private final <T extends HasMetadata, X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<T>>> void startControllers(@Observes @Initialized(ApplicationScoped.class) @Priority(LIBRARY_AFTER) final Object ignored, final BeanManager beanManager) {
if (beanManager != null) {
for (final Bean<?> bean : this.beans) {
assert bean != null;
final Set<Annotation> qualifiers = bean.getQualifiers();
final Map<Object, T> knownObjects = new HashMap<>();
final EventDistributor<T> eventDistributor = new EventDistributor<>(knownObjects);
eventDistributor.addConsumer(new CDIEventDistributor<T>(qualifiers, null /* no NotificationOptions; TODO */, this.syncNeeded, this.asyncNeeded));
@SuppressWarnings("unchecked") // we know an Operation is of type X
final X contextualReference = (X)beanManager.getReference(bean, getOperationType(bean), beanManager.createCreationalContext(bean));
final Controller<T> controller = new Controller<>(contextualReference, knownObjects, eventDistributor);
controller.start();
this.controllers.add(controller);
}
}
}

view raw

part11-9.java

hosted with ❤ by GitHub

In the above code example, you’ll see a reference to a CDIEventDistributor.  That private class is a Consumer of events that simply fires them using the existing CDI broadcasting mechanism:


private static final class CDIEventDistributor<T extends HasMetadata> implements Consumer<AbstractEvent<? extends T>> {
private static final Annotation[] EMPTY_ANNOTATION_ARRAY = new Annotation[0];
private final Annotation[] qualifiers;
private final NotificationOptions notificationOptions;
private final boolean syncNeeded;
private final boolean asyncNeeded;
private CDIEventDistributor(final Set<Annotation> qualifiers, final NotificationOptions notificationOptions, final boolean syncNeeded, final boolean asyncNeeded) {
super();
if (qualifiers == null) {
this.qualifiers = EMPTY_ANNOTATION_ARRAY;
} else {
this.qualifiers = qualifiers.toArray(new Annotation[qualifiers.size()]);
}
this.notificationOptions = notificationOptions;
this.syncNeeded = syncNeeded;
this.asyncNeeded = asyncNeeded;
}
@Override
public final void accept(final AbstractEvent<? extends T> controllerEvent) {
if (controllerEvent != null && (this.syncNeeded || this.asyncNeeded)) {
final BeanManager beanManager = CDI.current().getBeanManager();
assert beanManager != null;
final javax.enterprise.event.Event<Object> cdiEventMachinery = beanManager.getEvent();
assert cdiEventMachinery != null;
final TypeLiteral<AbstractEvent<? extends T>> eventTypeLiteral = new TypeLiteral<AbstractEvent<? extends T>>() {
private static final long serialVersionUID = 1L;
};
final javax.enterprise.event.Event<AbstractEvent<? extends T>> broadcaster = cdiEventMachinery.select(eventTypeLiteral, this.qualifiers);
assert broadcaster != null;
if (this.asyncNeeded) {
if (this.notificationOptions == null) {
broadcaster.fireAsync(controllerEvent);
} else {
broadcaster.fireAsync(controllerEvent, this.notificationOptions);
}
}
if (this.syncNeeded) {
broadcaster.fire(controllerEvent);
}
}
}
}

view raw

part11-10.java

hosted with ❤ by GitHub

Finally, when the user’s application stops, we should clean up after ourselves.  We observe the imminent destruction of the application scope and close everything we started up:


private final void stopControllers(@Observes @BeforeDestroyed(ApplicationScoped.class) @Priority(LIBRARY_BEFORE) final Object ignored) throws IOException {
Exception exception = null;
for (final Controller<?> controller : this.controllers) {
assert controller != null;
try {
controller.close();
} catch (final IOException | RuntimeException closeException) {
if (exception == null) {
exception = closeException;
} else {
exception.addSuppressed(closeException);
}
}
}
if (exception instanceof IOException) {
throw (IOException)exception;
} else if (exception instanceof RuntimeException) {
throw (RuntimeException)exception;
} else if (exception != null) {
throw new IllegalStateException(exception.getMessage(), exception);
}
}

view raw

part11-11.java

hosted with ❤ by GitHub

The net effect is that if you put this portable extension and its dependencies on the classpath of your CDI 2.0 application and author a producer method and observer method pair (and nothing else), your CDI application will receive Kubernetes events and should behave semantically like any other kind of Kubernetes controller.

The fledgling project housing all this code is microbean-kubernetes-controller-cdi and is available on Github under the Apache 2.0 license.

Understanding Kubernetes’ tools/cache package: part 10—Designing Kubernetes controllers in CDI 2.0

In part 9 of this series (you may want to start at the beginning), we put together a sketch of what has become a Kubernetes controller framework in Java.  The framework represents most of the concerns of a subset of the Kubernetes tools/cache package, including the SharedIndexInformer, Reflector, Controller, Indexer, DeltaFIFO, Store and ListerWatcher concepts.

But if you want to write a controller, you don’t (I’m presuming) want to deal with any of that boilerplate.  You want to express that you:

  • are interested in receiving notifications of additions, modifications and deletions of certain Kubernetes resources
  • know what to do when you receive such notifications

The other stuff—all the reflectors and controllers and informers and caches and whatnot—is simply a means to an end, so we should attempt to make it disappear.

CDI 2.0 gives us the means to do this.  Let’s lay out what we’ll need to do.  We’ll work backwards.

First, we’ll make use of the standard observer method mechanism to know what to do when we receive event notifications.  We’ll sketch our as-of-now incomplete observer method like this:

private final void onConfigMapEvent(@Observes final SomeKindOfEventNotYetDefined configMapEvent) {
  // configMapEvent should tell us: was it an addition? a modification? a removal?
  // Which ConfigMap?  What did it look like before it was modified? etc.
}

Let’s pretend that something somewhere is firing these events.  Then we could receive one here, open it up, and see what happened.  Sounds good.

Missing from our sketch is some indication as to which ConfigMaps we’d like to see events for.  Maybe we want to see all of them.  Maybe we only are interested in some meeting certain criteria.

Well, that’s what qualifiers are made for.  We’ll refine our hazy little sketch to look like this:

private final void onConfigMapEvent(@Observes @CertainConfigMaps final SomeKindOfEventNotYetDefined configMapEvent) {
  // configMapEvent should tell us: was it an addition? a modification? a removal?
  // Which ConfigMap?  What did it look like before it was modified? etc.
}

Note the addition of the @CertainConfigMaps qualifier annotation that I just made up.

OK, if we stare at that for a while as good CDI citizens, this looks nice and complete and idiomatic: this observer method looks for SomeKindOfEventNotYetDefined events, whatever they are, qualified with the @CertainConfigMaps qualifier and will be notified if ever they should be fired.  Presumably those events, whatever they are, will contain a ConfigMap or an old ConfigMap and a new one along with some indication of what happened to it or them.  Sounds good.  This is stuff the end user obviously has to write, and it’s stuff that she should write, and it’s probably about as concise as we’re going to get.  It’s nice and declarative and simple and straightforward.

So who’s going to fire them?  The person writing this method obviously shouldn’t have to care.

As we’ve seen in the previous parts of this series, if you look at things a certain way, Kubernetes itself is a distributed message bus.  So the short answer is: Kubernetes will fire them.

That’s glib, of course.  Kubernetes will fire them—and our Reflector will do what is necessary to reflect them speedily and robustly into an EventCache.  Then something else will have to harvest events from the cache.  That something had also better be able to distribute those events to interested listeners, like our observer method above.

Finally, all of this will have to have queues of various kinds in the mix to make sure that no given harvesting thread or distributing thread or caching thread is blocked while it goes about its business (or that if absolutely necessary it is blocked for the shortest possible time).  That, after all, is what the Kubernetes client-go tools/cache package is all about, so it’s what microbean-kubernetes-controller is all about.

OK, that’s all fine, but, if you notice, this subassembly—with Kubernetes and a Reflector at its “left” end and our observer method at its “right” end—starts with a specification of things to list and watch.  If we can provide just that part, thus defining the inputs for the “left” end, then all the other stuff in the middle is boilerplate, and all we have to do is attach our observer method to the “right” end with a little more boilerplate, and the end user might not ever even have to see the boilerplate!

So let’s look at the “left” end: we know we’re going to have to talk to Kubernetes using the fabric8 Kubernetes client, so that’s where we’ll start.

The good news about picking the fabric8 Kubernetes client is that it is built around a domain-specific language that makes expressing things relatively easy.  For example, you can tell a KubernetesClient to get you a Thing™ that can then get you a list of ConfigMap resources from Kubernetes like this:

client.configMaps();

That returns some sort of massively parameterized MixedOperation, but normally when you program against a model like this you worry very little about these intermediate types in the chain of builders and whatnot that result from this style of fluent programming.

However, in this case, do take note of the fact that a MixedOperation implements both Listable and VersionWatchable.  Note further that a MixedOperation is just a special kind of Operation.  Finally, recall that our Reflector needs some kind of thing that is both a Listable and a VersionWatchable to get started: in part 0, we learned about why this is.

So, in other words, you can create a new Reflector that will reflect Kubernetes events concerning ConfigMaps by supplying client.configMaps() as the first parameter of its constructor.

That’s our “left” end of this subassembly.  If you can get your hands on a Thing™ that is both a Listable and a VersionWatchable then you can create the whole rest of the subassembly and route reflected events to appropriate observer methods.

Now, in CDI, any time you need a Thing™, you don’t make it yourself.  You look around for it, or you declare that someone else needs to provide it.  We will treat, in other words, an Operation as a CDI bean.

An Operation implementation is extremely unlikely to be a managed bean—a plain-old-Java-object (POJO) just lying around somewhere with some bean-defining annotations on it.  Instead, as we’ve seen, you make an Operation by way of a method invocation on a KubernetesClient (like configMaps()).

CDI producer methods are one kind of CDI bean that fit this bill perfectly!  Suppose we have a producer method somewhere sketched out like this:

@Produces
@ApplicationScoped
@CertainConfigMaps
private static final Operation<ConfigMap, ConfigMapList, DoneableConfigMap, Resource<ConfigMap, DoneableConfigMap>> selectAllConfigMaps(final KubernetesClient client) {
return client.configMaps();
}

Then so long as someone somewhere has made a KubernetesClient for us, it will be injected as this producer method’s sole parameter.  That means we can use it to return an Operation.  That means we have the “left” end of our subassembly.

(As it happens, there’s a handy little project that does expose a KubernetesClient as a CDI bean; it’s named microbean-kubernetes-client-cdi.  If that’s on your CDI 2.0 classpath, you can inject KubernetesClient wherever you want it.)

Note as well here that the @CertainConfigMaps annotation that was part of our (still unfinished) observer method sketch is qualifying the return type.

From here, you can see, I hope, that if we have a machine that can

  • gather up all the CDI beans that are qualified with a given qualifier (like @CertainConfigMaps) or set of qualifiers, regardless of how those beans are made or who makes them, and
  • can also gather up all the observer methods that observe events qualified with that same qualifier or those same qualifiers, then
  • we have the means to unite a producer of raw materials needed to pump Kubernetes events into the system, and a consumer of those events to get them out of the system!

We’ll build this machine in the next post.

 

Understanding Kubernetes’ tools/cache package: part 9—Kubernetes controllers in Java!

In part 8 of this series (you can start at the beginning if you want), we looked at some of the foundational principles and stakes in the ground we’re going to establish and plant in the ground as part of our journey towards making an idiomatic Kubernetes controller framework in Java and CDI 2.0.

Let’s look at an idiomatic implementation of these concepts centered around the fabric8 Kubernetes client implementation of Kubernetes watch functionality, and (eventually) targeted at a CDI 2.0 environment.

A Reflector‘s main purpose is to reflect a certain portion of the state of the Kubernetes universe into a cache of some kind.  It needs to do this in a fault-tolerant and thread-safe way, taking care not to block the thread it’s using to talk to Kubernetes.  As we’ve seen, this involves starting by listing objects meeting certain criteria and then setting up a watch for those objects and reacting to the logical events that occur to them by offloading them quickly into some kind of cache and scampering back to Kubernetes for more.  Then periodically a synchronization of downstream consumers with known state is performed, making sure that downstream consumers never miss any events concerning Kubernetes resources they care about, even if the listing and watching machinery suffers a hiccup.

The Go code around all this is relatively complicated.  Fortunately, though, since we’re using the excellent fabric8 Kubernetes client, we can take advantage of the fact that it already has sophisticated watch functionality that, among other things, handles reconnections for us.  So we don’t have to write code around all that: we just need to set up the listing code and provide a Watcher to be notified of watch events.  That will take care of listing and watching behavior and reduce a lot of code we have to write.  Hooray for laziness.

We can also take advantage of Java’s generics to ensure that we strongly type what objects we support (Pods, Deployments, ConfigMaps, and so on).  We can ensure that type information makes it all the way into the internals of our implementation.  The Go code needs to check this at runtime since Go has no support for generics.  That should simplify our code a little bit.  Hooray for more laziness and type safety.

A Reflector, as we said, needs to update a cache in a very specific way.  The Go code models the cache as a store of objects.  If we dig in a little more, though, it turns out that even in the Go code the Reflector only needs the mutating operations to be exposed, not the getters and listers.  Specifically, a reflector will look to add, update and delete objects in the store, and will also look to resynchronize state and to do a full replace of objects in one atomic shot.  But that’s it.

We can reduce this surface area in our idiomatic Java Reflector implementation even more by recognizing that if we start out by modeling the cache as a cache of logical events rather than a cache of arbitrary objects we only need to support one addition method that takes an event as a payload.  Then we have to support a total of three operations:

  • // Make an event from the supplied raw materials; store it
    public void add(final Object source, final Event.Type eventType, final T resource)
  • // Do a full replace of our state
    public void replace(final Collection<? extends T> objects, final Object resourceVersion)
  • // When told to synchronize, look at our known objects and
    // fire synchronization events downstream for each of them
    public void synchronize()

So from the standpoint of what a Reflector needs to do its job, we know what an event cache must look like at a minimum.  In fact, it might look like this EventCache interface from my microbean-kubernetes-controller project on Github!😀

Then our Reflector might look like my Reflector class in the same project.

We can see that it takes in something that is both Listable (and when it’s listed returns a KubernetesResourceList descendant) and VersionWatchable—which can be produced from a KubernetesClient.  Objects satisfying these requirements are roughly equivalent to the Go code’s ListerWatcher with, as we’ve noted, the added benefit that reconnections are automatically handled, and if you can express them properly, then filtering is already handled.  Hooray for even more laziness.

The Reflector constructor also takes in the writable “side” of a cache of Events.  This is where it will “mirror” the events it receives from watching the Kubernetes API server.  This corresponds roughly with the Go code’s notion of a Store, and more specifically with the notion of a DeltaFIFO, but exposes only the mutating methods actually needed by a writeable store of events, not any extras related to reading or listing.  Also, where a DeltaFIFO had to turn a generic “add an arbitrary object” function into an “add an event recording an addition” invocation, we start with the notion that everything being stored is an event, so it’s one fewer step we have to take and one fewer transformation we have to make.

So, implement an EventCache, call kubernetesClient.configMaps() or something equivalent, pass them in to a new Reflector, start it, and you can begin filling your cache with events!

Of course, you’ll want to drain that cache, and also allow it to periodically inform its downstream customers of its state for synchronization purposes.  And a good implementation will sort incoming events into queues, where each queue is effectively the events that have occurred to a particular keyed Kubernetes resource over time.

Fortunately, there is a good implementation.  The stock implementation of this kind of EventCache is EventQueueCollection, which models all this and another part of the Go Store type.  EventQueueCollection lets you pass in a Consumer and start it siphoning off events in a careful, thread-safe manner, much like the Pop() function in the Go code.  Events are stored in queues per object, keyed by an overridable method that determines how keys for a given Kubernetes resource are composed.

The stock Consumer normally used here, an implementation of ResourceTrackingEventQueueConsumer, is a simple Consumer that takes a Map of objects indexed by keys at construction time.  These “known objects” are updated as events are siphoned off the upstream EventQueueCollection, and then those individual events are passed on to the abstract accept() method.  Once an event arrives here, it can be processed directly, or rebroadcast, or whatever; all housekeeping has been taken care of.  We’ll come back to this.

Back to those known objects.  EventQueueCollection takes in the same kind of Map, and this is no coincidence.  Your consumer of events and your EventQueueCollection should share this cache: when your EventQueueCollection gets instructed to synchronize(), it will send synchronization events to any downstream consumers…

…like the ResourceTrackingEventQueueConsumer we were just talking about, who will update that very same Map of known objects appropriately.

This is all tedious boilerplate to set up, so just like the Go code, we put all of this behind a Controller façade.  A Controller marries a Reflector and an EventQueueCollection together with a simple Consumer of events, and arranges for them all to dance together.  If you start the Controller, it will start the Consumer, then start the Reflector, and you will start seeing events flow through the pinball machine.  If you close the Controller, it will close the Reflector, then close the Consumer and you will stop seeing events.  All overridable methods in Reflector andResourceTrackingEventQueueConsumer are “forwarded” into the Controller class, so you can customize it in one place.

So: create a Controller, give it an expression of the “real” Kubernetes resources you want to watch and list, hand it a consumer of mirrored versions of those events, and a Map representing the known state of the world, and you can now process Kubernetes resources just like Kubernetes controllers written in Go.

This is, of course, still too much boilerplate, since in all of this we still haven’t talked about business logic.

Therefore, in a CDI world, this is all a candidate for parking behind a portable extension.  That will be the topic of the next post.  Till then, have fun looking at the microbean-kubernetes-controller project on Github as it develops further.