In part 10, we laid out what we need to do to make a simple programming model available to an end user who wants to be notified of Kubernetes events in the same style as Go programmers using the tools/cache
package. This series starts with part 0.
Here we’ll look at implementing a CDI 2.0 portable extension that enables this lean programming model.
From part 10, we sketched out the idea of pairing an observer method with a producer method, using the same qualifiers on both the producer method’s return type and on the observer method’s event type. Linked in this way, you now have the ability to build boilerplate machinery in between the Listable
-and-VersionWatchable
made by the producer method and the event observed by the observer method, and the end user doesn’t have to know about or invoke any of it: her events “just work” assuming this extension is present, opaquely, at runtime.
This post will make use of the microbean-kubernetes-controller
framework, the Java Kubernetes controller framework built up over the course of this series (you can start at the beginning if you like). This framework is CDI-independent. We’ll adapt it to CDI 2.0 so end users don’t have to deal with any boilerplate.
As discussed, our portable extension will have to:
- find beans that have
Listable
andVersionWatchable
among their bean types - find observer methods observing
Event
objects that contain Kubernetes resources in them - ensure that for any given observer method whose event parameter is qualified with a set of qualifier annotations, there exists a producer method (or any other kind of CDI bean, actually) with the right kind of type that is also qualified by those same qualifier annotations (or no event delivery will take place)
- have some way of recognizing that arbitrary qualifier annotations defined by the user are those that should be used to identify Kubernetes events—i.e. some way of meta-qualifying a qualifier annotation that appears on relevant producer methods and relevant event parameters
We can do all these things thanks to the well-defined lifecycle mandated by CDI.
First, we’ll sketch out a meta-qualifier annotation named KubernetesEventSelector
. The intent of this meta-annotation will be that if you put it on a qualifier annotation that you create, then your qualifier annotation will be treated as a qualifier that is trying to say something specifically about sets of Kubernetes events.
Here’s what that might look like:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.lang.annotation.Documented; | |
import java.lang.annotation.ElementType; | |
import java.lang.annotation.Retention; | |
import java.lang.annotation.RetentionPolicy; | |
import java.lang.annotation.Target; | |
import javax.inject.Qualifier; | |
@Documented | |
@Retention(value = RetentionPolicy.RUNTIME) | |
@Target({ ElementType.ANNOTATION_TYPE }) | |
public @interface KubernetesEventSelector { | |
} |
Now as an end user if I define my own qualifier annotation and annotate it with KubernetesEventSelector
:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Documented | |
@KubernetesEventSelector | |
@Qualifier | |
@Retention(value = RetentionPolicy.RUNTIME) | |
@Target({ ElementType.METHOD, ElementType.PARAMETER }) | |
public @interface AllConfigMapEvents { | |
} |
…I will have expressed my intent that when we “see” this annotation:
@AllConfigMapEvents
…we are talking about choosing Kubernetes events in some handwavy manner, and not something else.
So now our as-yet-to-be-written portable extension has at least the theoretical means to find event parameters and beans and see if they’re qualified with qualifiers that, in turn, are annotated with @KubernetesEventSelector
. Those will be the only things of interest to this extension.
Next, we’ll take advantage of the CDI lifecycle and the prescribed order of things and recognize that bean processing happens before observer method processing. So let’s start with looking for those beans that have Listable
and VersionWatchable
among their bean types.
We’re going to run into a problem right away. We would like to look for beans whose bean types are of type X
where X
is some type that extends Listable
and VersionWatchable
, but otherwise we don’t really care what X
actually is. But producer methods must return concrete types without type variables in them.
What we’ll do instead is look for the most general type in the fabric8 Kubernetes model hierarchy that implements or extends both Listable
and VersionWatchable
and deal with that. That type happens to be Operation
. We’ll keep track during bean processing of all Bean
s that match these types. That means it doesn’t matter if the bean in question is a producer method, a producer field, a managed bean, a synthetic bean or any of the other kinds of beans you can cause to come into existence in a CDI world: they’ll all be accounted for.
Now, obviously we’re not actually interested in all beans whose bean types include Operation
. We’re interested in those that have been explicitly “tagged” with the KubernetesEventSelector
annotation.
It would be nice if there were a convenience method of sorts in CDI that would let you ask a bean for its qualifiers but would restrict the returned set to only those qualifiers that, in turn, are meta-qualified with something else (like KubernetesEventSelector
). Unfortunately, there is not, so we’ll have to do this ourselves.
So, refined: we’ll keep track of Bean
s with Operation
(and hence by definition Listable
and VersionWatchable
) among their bean types, and with KubernetesEventSelector
-qualified qualifiers. We’ll keep track of those annotations, too.
Once we do this, we’ll have some machinery that can now pick out sources for raw materials that we’ll need to feed to a Reflector
.
But obviously we don’t really want to do any of this if there aren’t any observer methods around. Fortunately but also unfortunately, bean processing happens before observer method processing, so we will do this work, and then discard it if we don’t find any relevant observer methods.
What’s a relevant observer method? In this case it’s an observer method whose (a) AbstractEvent
-typed event parameter is (b) qualified by one or more qualifier annotations that are themselves (c) meta-qualified by KubernetesEventSelector
, and that (d) “matches” an equivalent set of qualifiers on an Operation
-typed bean type. This means that indirectly the observer method will “watch” for what the producer method is indirectly providing access to.
Here’s an example observer method:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import javax.enterprise.event.Observes; | |
import io.fabric8.kubernetes.api.model.HasMetadata; | |
import org.microbean.kubernetes.controller.Event; | |
private final void onConfigMapEvent(@Observes @AllConfigMapEvents final Event<? extends HasMetadata> event) { | |
} |
Note the usage of the @AllConfigMapEvents
qualifier we defined earlier in this blog post, and note that the event parameter so qualified is an Event
. Let’s say we found a “matching” producer method:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import javax.enterprise.context.ApplicationScoped; | |
import javax.enterprise.inject.Produces; | |
import io.fabric8.kubernetes.api.model.ConfigMap; | |
import io.fabric8.kubernetes.api.model.ConfigMapList; | |
import io.fabric8.kubernetes.api.model.DoneableConfigMap; | |
import io.fabric8.kubernetes.api.model.HasMetadata; | |
import io.fabric8.kubernetes.client.KubernetesClient; | |
import io.fabric8.kubernetes.client.dsl.Operation; | |
import io.fabric8.kubernetes.client.dsl.Resource; | |
@Produces | |
@ApplicationScoped | |
@AllConfigMapEvents | |
private static final Operation<ConfigMap, ConfigMapList, DoneableConfigMap, Resource<ConfigMap, DoneableConfigMap>> selectAllConfigMaps(final KubernetesClient client) { | |
return client.configMaps(); | |
} |
Note the Operation
return type and the matching qualifier-meta-qualified-with-KubernetesEventSelector
that decorates it (@AllConfigMapEvents
). Do you see how it “matches” the observer method’s event parameter qualifier (also @AllConfigMapEvents
)? This is the linkage that connects this producer method with the observer method that observes events that will result from the producer method’s existence.
So once we have a mapping from a Bean
to a set of relevant qualifier annotations, then we can see if, during observer method processing, we’re talking about the first observer method that should cause our boilerplate machinery to get invoked. If there are many observer methods that “match” a relevant producer method, that’s fine; that just indicates that the very same Kubernetes event might be processed by many observers. The point is, before we jump through all the hoops to set up lists and watches and so on, we should make sure that someone is actually going to take advantage of our work.
So once we “get a match”, we can build the boilerplate for all matches of the same type, once.
For each distinct match, we’ll need a new Controller
. The controller will need an instance of whatever the producer method is making (some kind of Operation
, which is by definition both a Listable
and a VersionWatchable
).
The Controller
, in turn, will need some kind of Map
representing Kubernetes objects we “know about”. That Map
will be empty to start with, since we don’t know about any Kubernetes objects yet. But if, down the road, a deletion gets skipped (maybe due to a watch timing out or a disconnect), we need to retroactively fire a delete if, say, the Kubernetes list of relevant objects is “less than” our “known objects”, and we’ll need to know what object was actually deleted.
The Controller
will also need some kind of Consumer
(a siphon) that will slurp up EventQueue
s and dispatch their events in order without blocking threads. That Consumer
will also need to update that very same Map
of “known objects” as it does so.
Finally, that Consumer
of EventQueue
s will also need a mechanism to link it to the native CDI event broadcast mechanism which will take care of actually dispatching events to observer methods.
Whew! Let’s do the easy boring stuff first.
The first lifecycle event that looks good is ProcessBean
. If we could process all beans whose types are Operation
here, in one place, regardless of whether the bean in question is a producer method, managed bean, etc. that would be great. As it turns out for various obscure reasons, we can’t. So we’ll implement a lifecycle event observer method for each type of bean, and route all their stuff to a common private method for processing:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processProducerMethod(@Observes final ProcessProducerMethod<X, ?> event, final BeanManager beanManager) { | |
if (event != null) { | |
// (We'll define this in a second.) | |
this.processPotentialEventSelectorBean(event.getBean(), beanManager); | |
} | |
} | |
private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processProducerField(@Observes final ProcessProducerField<X, ?> event, final BeanManager beanManager) { | |
if (event != null) { | |
this.processPotentialEventSelectorBean(event.getBean(), beanManager); | |
} | |
} | |
private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processManagedBean(@Observes final ProcessManagedBean event, final BeanManager beanManager) { | |
if (event != null) { | |
this.processPotentialEventSelectorBean(event.getBean(), beanManager); | |
} | |
} | |
private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processSyntheticBean(@Observes final ProcessSyntheticBean event, final BeanManager beanManager) { | |
if (event != null) { | |
this.processPotentialEventSelectorBean(event.getBean(), beanManager); | |
} | |
} |
And the processPotentialEventSelectorBean
method that these all effectively delegate to might look like this:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private final void processPotentialEventSelectorBean(final Bean<?> bean, final BeanManager beanManager) { | |
if (bean != null) { | |
final Type operationType = getOperationType(bean); | |
if (operationType != null) { | |
final Set kubernetesEventSelectors = Annotations.retainAnnotationsQualifiedWith(bean.getQualifiers(), KubernetesEventSelector.class, beanManager); | |
if (kubernetesEventSelectors != null && !kubernetesEventSelectors.isEmpty()) { | |
this.eventSelectorBeans.put(kubernetesEventSelectors, bean); | |
} | |
} | |
} | |
} |
…where the Annotations
utility method can be found elsewhere, and where the getOperationType()
method might look like this:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static final Type getOperationType(final Bean<?> bean) { | |
final Type returnValue; | |
if (bean == null) { | |
returnValue = null; | |
} else { | |
final Set<Type> beanTypes = bean.getTypes(); | |
assert beanTypes != null; | |
assert !beanTypes.isEmpty(); | |
Type candidate = null; | |
for (final Type beanType : beanTypes) { | |
if (beanType instanceof ParameterizedType) { | |
final Type rawType = ((ParameterizedType)beanType).getRawType(); | |
if (rawType instanceof Class && Operation.class.equals((Class<?>)rawType)) { | |
candidate = beanType; | |
break; | |
} | |
} | |
} | |
returnValue = candidate; | |
} | |
return returnValue; | |
} |
So at the end of all this we’ll have a Map
, eventSelectorBeans
, that has as its keys Set
s of Annotation
instances that have KubernetesEventSelector
somewhere “on” them, and has as its values Bean
s that are qualified with those annotations.
Now we’ll need to route all observer method processing to something similar (fortunately, this is a great deal simpler). Recall that we are guaranteed by contract that observer method processing happens after discovering beans, so by this point we already have knowledge of any producer methods or managed beans “making” Operation
instances of various kinds:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private final void processObserverMethod(@Observes final ProcessObserverMethod<? extends org.microbean.kubernetes.controller.Event<? extends HasMetadata>, ?> event, final BeanManager beanManager) { | |
if (event != null) { | |
this.processPotentialEventSelectorObserverMethod(event.getObserverMethod(), beanManager); | |
} | |
} | |
private final void processSyntheticObserverMethod(@Observes final ProcessSyntheticObserverMethod<? extends org.microbean.kubernetes.controller.Event<? extends HasMetadata>, ?> event, final BeanManager beanManager) { | |
if (event != null) { | |
this.processPotentialEventSelectorObserverMethod(event.getObserverMethod(), beanManager); | |
} | |
} | |
private final void processPotentialEventSelectorObserverMethod(final ObserverMethod<? extends org.microbean.kubernetes.controller.Event<? extends HasMetadata>> observerMethod, final BeanManager beanManager) { | |
if (observerMethod != null) { | |
final Set<Annotation> kubernetesEventSelectors = Annotations.retainAnnotationsQualifiedWith(observerMethod.getObservedQualifiers(), KubernetesEventSelector.class, beanManager); | |
if (kubernetesEventSelectors != null && !kubernetesEventSelectors.isEmpty()) { | |
if (observerMethod.isAsync()) { | |
if (!this.asyncNeeded) { | |
this.asyncNeeded = true; | |
} | |
} else if (!this.syncNeeded) { | |
this.syncNeeded = true; | |
} | |
this.beans.add(this.eventSelectorBeans.remove(kubernetesEventSelectors)); | |
} | |
} | |
} |
(Above, we also keep track of whether the observer method in question is synchronous or asynchronous.)
Finally, we can clean up a little bit after bean discovery is all done:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private final void processAfterBeanDiscovery(@Observes final AfterBeanDiscovery event) { | |
if (event != null) { | |
this.eventSelectorBeans.clear(); | |
} | |
} |
At the end of all of this bean discovery, we have a Set
of Bean
instances stored in a beans
instance variable that (a) “make” Operation
instances, (b) are appropriately meta-qualified to declare their interest in taking part in Kubernetes event production, and (c) have at least one observer method interested in their effects.
Let’s build some controllers and start them when the user’s program comes up!
We’ll begin by observing the initialization of the application scope, which is exactly equal to the start of a CDI application.
When this happens, our beans
instance variable will contain some of the materials to build a Controller
. For each Bean
in there, we’ll “instantiate” it (by way of a beanManager.getReference()
call, create a new Controller
, hook that Controller
up to the right kind of EventDistributor
, start the Controller
and keep track of it for later cleanup:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private final <T extends HasMetadata, X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<T>>> void startControllers(@Observes @Initialized(ApplicationScoped.class) @Priority(LIBRARY_AFTER) final Object ignored, final BeanManager beanManager) { | |
if (beanManager != null) { | |
for (final Bean<?> bean : this.beans) { | |
assert bean != null; | |
final Set<Annotation> qualifiers = bean.getQualifiers(); | |
final Map<Object, T> knownObjects = new HashMap<>(); | |
final EventDistributor<T> eventDistributor = new EventDistributor<>(knownObjects); | |
eventDistributor.addConsumer(new CDIEventDistributor<T>(qualifiers, null /* no NotificationOptions; TODO */, this.syncNeeded, this.asyncNeeded)); | |
@SuppressWarnings("unchecked") // we know an Operation is of type X | |
final X contextualReference = (X)beanManager.getReference(bean, getOperationType(bean), beanManager.createCreationalContext(bean)); | |
final Controller<T> controller = new Controller<>(contextualReference, knownObjects, eventDistributor); | |
controller.start(); | |
this.controllers.add(controller); | |
} | |
} | |
} |
In the above code example, you’ll see a reference to a CDIEventDistributor
. That private class is a Consumer
of events that simply fires them using the existing CDI broadcasting mechanism:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static final class CDIEventDistributor<T extends HasMetadata> implements Consumer<AbstractEvent<? extends T>> { | |
private static final Annotation[] EMPTY_ANNOTATION_ARRAY = new Annotation[0]; | |
private final Annotation[] qualifiers; | |
private final NotificationOptions notificationOptions; | |
private final boolean syncNeeded; | |
private final boolean asyncNeeded; | |
private CDIEventDistributor(final Set<Annotation> qualifiers, final NotificationOptions notificationOptions, final boolean syncNeeded, final boolean asyncNeeded) { | |
super(); | |
if (qualifiers == null) { | |
this.qualifiers = EMPTY_ANNOTATION_ARRAY; | |
} else { | |
this.qualifiers = qualifiers.toArray(new Annotation[qualifiers.size()]); | |
} | |
this.notificationOptions = notificationOptions; | |
this.syncNeeded = syncNeeded; | |
this.asyncNeeded = asyncNeeded; | |
} | |
@Override | |
public final void accept(final AbstractEvent<? extends T> controllerEvent) { | |
if (controllerEvent != null && (this.syncNeeded || this.asyncNeeded)) { | |
final BeanManager beanManager = CDI.current().getBeanManager(); | |
assert beanManager != null; | |
final javax.enterprise.event.Event<Object> cdiEventMachinery = beanManager.getEvent(); | |
assert cdiEventMachinery != null; | |
final TypeLiteral<AbstractEvent<? extends T>> eventTypeLiteral = new TypeLiteral<AbstractEvent<? extends T>>() { | |
private static final long serialVersionUID = 1L; | |
}; | |
final javax.enterprise.event.Event<AbstractEvent<? extends T>> broadcaster = cdiEventMachinery.select(eventTypeLiteral, this.qualifiers); | |
assert broadcaster != null; | |
if (this.asyncNeeded) { | |
if (this.notificationOptions == null) { | |
broadcaster.fireAsync(controllerEvent); | |
} else { | |
broadcaster.fireAsync(controllerEvent, this.notificationOptions); | |
} | |
} | |
if (this.syncNeeded) { | |
broadcaster.fire(controllerEvent); | |
} | |
} | |
} | |
} |
Finally, when the user’s application stops, we should clean up after ourselves. We observe the imminent destruction of the application scope and close everything we started up:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private final void stopControllers(@Observes @BeforeDestroyed(ApplicationScoped.class) @Priority(LIBRARY_BEFORE) final Object ignored) throws IOException { | |
Exception exception = null; | |
for (final Controller<?> controller : this.controllers) { | |
assert controller != null; | |
try { | |
controller.close(); | |
} catch (final IOException | RuntimeException closeException) { | |
if (exception == null) { | |
exception = closeException; | |
} else { | |
exception.addSuppressed(closeException); | |
} | |
} | |
} | |
if (exception instanceof IOException) { | |
throw (IOException)exception; | |
} else if (exception instanceof RuntimeException) { | |
throw (RuntimeException)exception; | |
} else if (exception != null) { | |
throw new IllegalStateException(exception.getMessage(), exception); | |
} | |
} |
The net effect is that if you put this portable extension and its dependencies on the classpath of your CDI 2.0 application and author a producer method and observer method pair (and nothing else), your CDI application will receive Kubernetes events and should behave semantically like any other kind of Kubernetes controller.
The fledgling project housing all this code is microbean-kubernetes-controller-cdi
and is available on Github under the Apache 2.0 license.
2 thoughts on “Understanding Kubernetes’ tools/cache package: part 11—Towards a Kubernetes controller CDI 2.0 portable extension”
Comments are closed.