6. Event Processing

The Events generated by the application need to be dispatched to the components that update the query databases, search engines or any other resources that need them: the Event Listeners. This is the responsibility of the Event Bus.

6.1. Event Bus

The EventBus is the mechanism that dispatches events to the subscribed event listeners. Axon Framework provides two implementation of the event bus: SimpleEventBus and ClusteringEventBus. Both implementations manage subscribed EventListeners and forward all incoming events to all subscribed listeners. This means that Event Listeners must be explicitly registered with the Event Bus in order for them to receive events. The registration process is thread safe. Listeners may register and unregister for events at any time.

6.1.1. Simple Event Bus

The SimpleEventBus is, as the name suggests, a very basic implementation of the EventBus interface. It just dispatches each incoming Event to each of the subscribed EventListeners sequentially. If an EventListener throws an Exception, dispatching stops and the exception is propagated to the component publising the Event.

The SimpleEventBus is suitable for most cases where dispatching is done synchronously and locally, (i.e. in a single JVM). Once you application requires Events to be published across multiple JVMs, you could consider using the ClusteringEventBus instead.

6.1.2. Clustering Event Bus

The ClusteringEventsBus allows application developers to bundle EventListeners into Clusters based on their properties and non-functional requirements. The ClusteringEventBus is also more capable to deal with Events being dispatched among different machines.

Structure of the Clustering Event Bus

Figure 6.1. Structure of the Clustering Event Bus


The ClusteringEventsBus contains two mechanisms: the ClusterSelector, which selects a Cluster instance for each of the registered EventListeners, and the EventBusTerminal, which is responsible for dispatching Events to each of the relevant clusters.

[Note]Background: Axon Terminal

In the nervous system, an Axon (one of the components of a Neuron) transports electrical signals. These Neurons are interconnected in very complex arrangements. The Axon Terminal is responsible for transmitting these signals from one Neuron to another.

More information: www.wikipedia.org/wiki/Axon_terminal.

ClusterSelector

The primary responsibility of the ClusterSelector is to, as the name suggests, select a cluster for each Event Listener that subscribes to the Event Bus. By default, all Event Listeners are placed in a single Cluster instance, which dispatches events to its members sequentially and in the calling thread (similar to how the SimpleEventBus works). By providing a custom implementation, you can arrange the Event Listeners into different Cluster instances to suit the requirements of your architecture.

A number of ClusterSelector implementations are available. The ClassNamePrefixClusterSelector, for example, uses a mapping of package prefixes to decide which cluster is (most) suitable for an Event Listener. Similarly, the ClassNamePatternClusterSelector uses pattern matching to decide whether a given cluster is suitable. You can use the CompositeClusterSelector to combine several cluster selectors into a single one.

The Cluster interface describes the behavior of a Cluster. By adding information in the Meta Data of a Cluster, the selector can provide hints to the Terminal about the expected behavior.

[Note]Clusters and Cluster Selectors in Spring

Spring users can define clusters using the <axon:cluster> element. This element allows you to define the selection criteria for Event Handlers in that cluster. These criteria are transformed into cluster selectors and used to assign each Listener to one of the clusters. By default, this creates a cluster that handles events in the publishing thread. To use a cluster with other semantics, you can define a bean inside the <axon:cluster> element that specifies the Cluster implementation to use.

The Clusters are automatically detected and connected to the Event Bus in the application context.

EventBusTerminal

The EventBusTerminal forms the bridge between the Clusters inside the Event Bus. While some terminals will dispatch within the same JVM, others are aware of messaging technologies, such as AMQP to dispatch Event Messages to clusters on remote machines. The default implementation dispatches published events to each of the (local) clusters using the publishing thread. This means that with the default terminal, and the default ClusterSelector, the behavior of the ClusteringEventBus is exactly the same as that of the SimpleEventBus.

In a typical AMQP based configuration, the EventBusTerminal would send published events to an Exchange. For each cluster, a Queue would be connected to that exchange. The EventBusTerminal will create a consumer for each cluster, which reads from its related Queue and forwards each message to that cluster. Event Listeners in a distributed environment where at most one instance should receive an Events should be placed in a separate cluster, which competes with the other instances on a single Queue. See Section 6.4, “Distributing the Event Bus” for more information.

6.2. Event Listeners

Event listeners are the component that act on incoming events. Events may be of any type. In the Axon Framework, all event listeners must implement the EventListener interface.

6.2.1. Basic implementation

Event listeners need to be registered with an event bus (see Section 6.1, “Event Bus”) to be notified of events. The EventListener interface prescribes a single method to be implemented. This method is invoked for each Event Message passed on the Event Bus that it is subscribed to:

public class MyEventListener implements EventListener {

    public void handle(EventMessage message) {
        if (SomeEvent.class.isAssignableFrom(message.getPayloadType) {
            // handle SomeEvent
        } else if (OtherEvent.class.isAssignableFrom(message.getPayloadType) {
            // handle SomeOtherEvent
        }
    }
}

6.2.2. Annotated Event Handler

Implementing the EventListener interface can produce a large if-statement and verbose plumbing code. Using annotations to demarcate Event Handler methods is a cleaner alternative.

AnnotationEventListenerAdapter

The AnnotationEventListenerAdapter can wrap any object into an event listener. The adapter will invoke the most appropriate event handler method available. These event handler methods must be annotated with the @EventHandler annotation.

The AnnotationEventListenerAdapter, as well as the AbstractAnnotatedAggregateRoot, use ParameterResolvers to resolve the value that should be passed in the parameters of methods annotated with @EventHandler. By default, Axon provides a number of parameter resolvers that allow you to use the following parameter types:

  • The first parameter is always the payload of the Event message

  • Parameters annotated with @MetaData will resolve to the Meta Data value with the key as indicated on the annotation. If required is false (default), null is passed when the meta data value is not present. If required is true, the resolver will not match and prevent the method from being invoked when the meta data value is not present.

  • Parameters of type MetaData will have the entire MetaData of an EventMessage injected.

  • Parameters annotated with @Timestamp and of type org.joda.time.DateTime will resolve to the timestamp of the EventMessage. This is the time at which the Event was generated.

  • Parameters assignable to Message will have the entire EventMessage injected (if the message is assignable to that parameter). If the first parameter is of type message, it effectively matches an Event of any type, even if generic parameters would suggest otherwise. Due to type erasure, Axon cannot detect what parameter is expected. In such case, it is best to declare a parameter of the payload type, followed by a parameter of type Message.

  • When using Spring and <axon:annotation-config/> is declared, any other parameters will resolve to autowired beans, if exactly one autowirable candidate is available in the application context. This allows you to inject resources directly into @EventHandler annotated methods.

You can configure additional ParameterResolvers by extending the ParameterResolverFactory class and creating a file named /META-INF/service/org.axonframework.common.annotation.ParameterResolverFactory containing the fully qualified name of the implementing class. Alternatively, you can register your implementation at runtime using ParameterResolverFactory.registerFactory(). Make sure to do so before any adapters are created, otherwise handlers may have been initialized without those parameter resolvers.

In all circumstances, exactly one event handler method is invoked per listener instance. Axon will search the most specific method to invoke, in the following order:

  1. On the actual instance level of the class hierarchy (as returned by this.getClass()), all annotated methods are evaluated

  2. If one or more methods are found of which all parameters can be resolved to a value, the method with the most specific type is chosen and invoked

  3. If no methods are found on this level of the class hierarchy, the super type is evaluated the same way

  4. When the top level of the hierarchy is reached, and no suitable event handler is found, the event is ignored.

// assume EventB extends EventA 
// and    EventC extends EventB

public class TopListener {

    @EventHandler
    public void handle(EventA event) {
    }

    @EventHandler
    public void handle(EventC event) {
    }
}

public class SubListener extends TopListener {

    @EventHandler
    public void handle(EventB event) {
    }

}

In the example above, the SubListener will receive all instances of EventB as well as EventC (as it extends EventB). In other words, the TopListener will not receive any invocations for EventC at all. Since EventA is not assignable to EventB (it's its superclass), those will be processed by TopListener.

The constructor of the AnnotationEventListenerAdapter takes two parameters: the annotated bean, and the EventBus, to which the listener should subscribe. You can subscribe and unsubscribe the event listener using the subscribe() and unsubscribe() methods on the adapter. Alternatively, you can use AnnotationEventListenerAdapter.subscribe(listener, eventBus) to create and subscribe the listener in one invocation.

[Tip]Tip

If you use Spring, you can automatically wrap all annotated event listeners with an adapter automatically by adding <axon:annotation-config/> to your application context. Axon will automatically find and wrap annotated event listeners in the Application Context with an AnnotationEventListenerAdapter and register them with the Event Bus.

6.3. Asynchronous Event Processing

By default, event listeners process events in the thread that dispatches them. This means that the thread that executes the command will have to wait untill all event handling has finished. For some types of event listeners this is not the optimal form of processing. Asynchronous event processing improves the scalability of the application, with the penalty of added complexity to deal with "eventual consistency". Axon Framework provides the AsynchronousCluster implementation, which dispatches Events to Event Listeners asynchronously from the thread that published them to the cluster.

[Note]Configuring the Asynchronous Cluster in Spring

In Spring, you can place a Spring <bean> element inside the <axon:cluster> element, to indicate which cluster implementation you wish to use. Simply specify the bean configuration of an AsynchronousCluster implementation to make a Cluster asynchronous.

The AsynchronousCluster needs an Executor, for example a ThreadPoolExecutor and a SequencingPolicy, a definition of which events may be processed in parallel, and which sequentially. Finally a TransactionManager can be provided to process events within a transaction, such as a database transaction.

The Executor is responsible for executing the event processing. The actual implementation most likely depends on the environment that the application runs in and the SLA of the event handler. An example is the ThreadPoolExecutor, which maintains a pool of threads for the event handlers to use to process events. The AsynchronousCluster will manage the processing of incoming events in the provided executor. If an instance of a ScheduledThreadPoolExecutor is provided, the AsynchronousCluster will automatically leverage its ability to schedule processing in the cases of delayed retries.

The SequencingPolicy defines whether events must be handled sequentially, in parallel or a combination of both. Policies return a sequence identifier of a given event. If the policy returns an equal itentifier for two events, this means that they must be handled sequentially be the event handler. A null sequence identifier means the event may be processed in parallel with any other event.

Axon provides a number of common policies you can use:

  • The FullConcurrencyPolicy will tell Axon that this event handler may handle all events concurrently. This means that there is no relationship between the events that require them to be processed in a particular order.

  • The SequentialPolicy tells Axon that all events must be processed sequentially. Handling of an event will start when the handling of a previous event is finished.

  • SequentialPerAggregatePolicy will force domain events that were raised from the same aggregate to be handled sequentially. However, events from different aggregates may be handled concurrently. This is typically a suitable policy to use for event listeners that update details from aggregates in database tables.

Besides these provided policies, you can define your own. All policies must implement the EventSequencingPolicy interface. This interface defines a single method, getSequenceIdentifierFor, that returns the identifier sequence identifier for a given event. Events for which an equal sequence identifer is returned must be processed sequentially. Events that produce a different sequence identifier may be processed concurrently. For performance reasons, policy implementations should return null if the event may be processed in parallel to any other event. This is faster, because Axon does not have to check for any restrictions on event processing.

A TransactionManager can be assigned to a AsynchronousCluster to add transactional processing of events. To optimize processing, events can be processed in small batches inside a transaction. When using Spring, you can use the SpringTransactionManager to manage transactions with Spring's PlatformTransactionManager . For more customization of transactional behavior, you can alternatively configure a UnitOfWorkFactory. That factory will be used to generate the Unit of Work wrapping the Event Handling process. By default, a DefaultUnitOfWorkFactory is used, which uses the provided TransactionManager, if any, to manage the backing Transactions.

Error handling

The AsynchronousCluster uses an ErrorHandler to decide what needs to be done when an Event Listener or Unit of Work throws an Exception. The default behavior depends on the availability of a TransactionManager. If a TransactionManager is provided, the default ErrorHandler will request a rollback and retry the Event Handling after 2 seconds. If no TransactionManager is provided, the defautl ErrorHandler will simply log the Exception and proceed with the next Event Listener, guaranteeing that each Event Listener will receive each Event. In any situation, a rollback will not be requested when the exception is explicitly non-transient (i.e. is caused by an AxonNonTransientException).

You can change this behavior by configuring another ErrorHandler, or by creating your own. The ErrorHandler interface has a single method, which provides the Exception that occurred, the EventMessage being processed and a reference to the EventListener throwing the exception. The return value is of type RetryPolicy. The RetryPolicy tells the Event Processor what it needs to do with the failure. There are three static methods on RetryPolicy for the most common scenarios:

  • retryAfter(int timeout, TimeUnit unit) tells the scheduler that the Unit of Work should be rolled back, and the Event Message should be rescheduled for handling after the given amount of time. This means that some Event Listeners may have received the Event more than once.

  • proceed() tells the scheduler to ignore the Exception and proceed with the processing of the Event Message. This may be with the intent to skip the Event on the Event Listener, or because the ErrorHandler has managed to resolve the problem by retrying invoking the EventHandler itself.

  • skip() tells the scheduler to rollback the Unit of Work and proceed with the next Event Message. If all Event Listeners properly support Transactions, will effectively mean that the Event is skipped altogether.

If the RetryPolicy you wish to use does not depend on the type of Exception, EventMessage or EventListener, you can use the DefaultErrorHandler and pass the desired RetryPolicy as its constructor parameter. It will return that RetryPolicy on each exception, unless it requests a retry of an Event that caused an explicitly non-transient exception.

6.4. Distributing the Event Bus

In a distributed environment, it may be necessary to transport Event Messages between JVM's. The ClusteringEventBus has a possiblity to define an EventBusTerminal. This is an interface to a mechansim that publishes Events to all relevant clusters. Some EventBusTerminal implementations allow distribution of Events over multiple JVM's.

[Note]Background of the name "Terminal"

While most developers association the word "terminal" to a thin client computer connected to a mainframe, the association to make here is slightly different. In Neurology, an Axon Terminal is an endpoint of an Axon that transmits electronic impulses from one Neuron to another.

For more detailed information, see http://en.wikipedia.org/wiki/Axon_terminal.

6.4.1. Spring AMQP Terminal

The Spring AMQP Terminal uses the Spring AMQP module to transmit events to an AMQP compatible message broker, such as Rabbit MQ. It also connects local clusters to queues on that message broker.

The axon-amqp namespace (http://www.axonframework.org/schema/amqp) allows you to configure an AMQP Terminal by adding the <axon-amqp:terminal> element to the Spring application context. On this element, you can define different properties for the terminal, as well as a configuration containing defaults to use to connect each cluster to an AMQP Queue.

The example below shows an example configuration for an AMQP Terminal. The default-configuration element specified the defaults for the Clusters if they don't provide their own values.

<axon-amqp:terminal id="terminal" 
                    connection-factory="amqpConnection" 
                    serializer="serializer"
                    exchange-name="AxonEventBusExchange">
    <axon-amqp:default-configuration transaction-manager="transactionManager"
                                     transaction-size="25" prefetch="200"
                                     error-handler="loggingErrorHandler"/>
</axon-amqp:terminal>

<bean id="amqpConnection" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"/>

The configure the Spring AMQP Terminal "manually", you need to specify a number of beans in your application context:

  • The ListenerContainerLifecycleManager is responsible for creating ListenerContainers. These are the Spring classes that listen for messages on the AMQP Queues and forward them to the processing components. The ListenerContainerLifecycleManager allows you to configure the number of messages to process in a single transaction, the number of messages it may read ahead, etc.

    Note that the ListenerContainerLifecycleManager must be defined as a top-level bean.

  • An AMQP ConnectionFactory, which creates the connections to the AMQP Message Broker. Spring provides the CachingConnectionFactory, which is a sensible default.

  • The Spring AMQPTerminal itself, which connects the aforementioned components to the event publishers and event listeners. There is a large number of configuration options that allow you to tweak the terminal's behavior:

    • transactional: indicates whether the messages should be dispatched to the AMQP Broker inside a transaction. This is especially useful when multiple events need to be sent either completely, or not at all.

    • durable: indicates whether messsages should be durable (i.e. survive a Broker shutdown) or not. Obviously, message durability involves a performance impact.

    • connectionFactory: configures the ConnectionFactory to use. Useful when the application context contains more than one instance. Otherwise, the only available instance is autowired.

    • serializer: the serializer to serialize the MetaData and Payload of EventMessages with. Defaults to an autowired serializer.

    • exchangeName or exchange: defines the exchange (either defined by the name, or by a reference to an Exchange bean) to which published Event Messages should be sent. Defaults to "Axon.EventBus"

    • queueNameResolver: defines the mechanism that chooses the Queue that each Cluster should be connected to. By default, the resolver will use the configuration provided in each Cluster's Meta Data under the name "AMQP.Config". Otherwise, it uses the Cluster's name as Queue Name.

    • routingKeyResolver: defines the mechanism that generates an AMQP routing key for an outgoing Message. Defaults to a routing key resolver that returns the package name of the Message's payload. Routing keys can be used by echanges to define which queues should receive a (copy of a) Message

    • listenerContainerLifecycleManager: when the application context contains more than one, defines which listenerContainerLifecycleManager instance to use.

    • exclusive: indicates whether this Cluster accepts to share a Queue with other Cluster instances. Default to true. If a second cluster is attampting to connect exclusively to a queue, an exception is thrown. The Connector catches this exception and reattempts to connect each 2 seconds. This allows for automatic failover when a machine drops its connection.

When a cluster is selected for an Event Listener, it will be registered with the terminal. At that point, the Spring AMQP terminal will check if there is any cluster-specific configuration available. It does so by checking the AMQP.Config MetaData value. If that value is an instance of AMQPConsumerConfiguration (such as SpringAMQPConsumerConfiguration) any settings configured there will override the defaults set on the terminal itself. This allows you to specify different behavior (such as transaction size) for different clusters.

// XML Configuration for a Cluster with AMQPConsumerConfiguration
<axon:cluster id="myDefaultCluster" default="true">
    <axon:meta-data>
        <entry key="AMQP.Config">
            <axon-amqp:configuration transaction-size="20000"/>
        </entry>
    </axon:meta-data>
</axon:cluster>

6.5. Replaying Events on a Cluster

One of the advantages of Event Sourcing is that you keep track of the entire history of the application. This history allows you to extract valuable information and build new models out of them. For example, when a screen is added to the application, you can create a new query model and database tables, and have these filled using events you have collected in the past. Sometimes, replays of the Event Store are also needed to fix data that has been corrupted due to a bug.

Configuration of Replays

Axon provides the ReplayingCluster, a wrapper around another Cluster implementation that adds the replay capability. The ReplayingCluster is initialized with a number of resources. First of all, it needs another Cluster implementation. That other cluster is the actual cluster that takes care of dispatching Event Messages to subscribed listeners. It also needs a reference to the Event Store (implementing EventStoreManagement) that will supply the Events for the replay. A transaction manager is used to wrap the replay in a transaction. Since a single transaction may be too large to be efficient, you can configure a "commit threshold" to indicate the number of messages that should be processed before performing an intermediate commit. Finally, you need to supply an IncomingMessageHandler. The IncomingMessageHandler tells the ReplayingCluster what to do when an Event Message is published to the cluster while it is replaying.

[Warning]Warning

Make sure not to replay onto Clusters that contain Event Listeners that do not support replaying. A typical example is an Event Listener that sends emails. Replaying on a cluster that contains such an Event Listener can have nasty side-effects.

Axon provides two IncomingMessageHandler implementations. The BackloggingIncomingMessageHandler simply backlogs any incoming events (in-memory) and postpones processing of these events until the replay is finished. If an event is backlogged, but was also part of the replay, it is automatically removed from the backlog to prevent duplicate processing. The other implementation is the DiscardingIncomingMessageHandler. As the name suggests, it simply discards any messages published during a replay. This implementation will ensure the fastest replay, but is not safe to use when you expect messages to be published to the cluster during the replay. You can also create your own implementation. The JavaDoc describes the requirements (incl. thread safety) for each method.

[Note]Note

Although it is technically possible to do a full replay at runtime, it should be considered a maintenance operation and be executed while the cluster is not in active use by the application.

ReplayCluster configuration in Spring

In Spring, a Cluster can be marked as replayable by adding the <axon:replay-config> element as a child if the <axon:cluster> element. When the replay-config element is present, Axon will automatically wrap the cluster in a ReplayingCluster using the provided configuration. This also means that applicationContext.getBean(clusterName) will return a bean of type ReplayingCluster.

Preparing for a replay

In many cases, the data source used by Event Listeners needs to be prepared for a replay. Database tables, for example, typically need to be cleared. Event Listeners can implement the ReplayAware interface. When they do, their beforeReplay and afterReplay will be invoked before and after the replay respectively. Both methods are invoked within the scope of a transaction.

Triggering a Replay

Axon does not automatically trigger a replay. The ReplayingCluster provides two methods that can start a replay: startReplay() and startReplay(Executor). The first will execute the replay in the calling thread, meaning that the call will return when the replay is finished. The latter will execute the replay using the given executor and return a Future object that allows the caller to check if the replay is finished.

[Note]Note

Note that a replay may take a long time to finish, depending on the number of Events that need to be processed. Therefore, ensure that it is not possible to rebuild the model using other models already available, which is typically faster. Also make sure to properly test a replay before applying it in a production environment.