The Events generated by the application need to be dispatched to the components that update the query databases, search engines or any other resources that need them: the Event Listeners. This is the responsibility of the Event Bus.
The EventBus
is the mechanism that dispatches events to the subscribed
event listeners. Axon Framework provides two implementation of the event bus:
SimpleEventBus
and ClusteringEventBus
. Both
implementations manage subscribed EventListeners
and forward all incoming
events to all subscribed listeners. This means that Event Listeners must be explicitly
registered with the Event Bus in order for them to receive events. The registration
process is thread safe. Listeners may register and unregister for events at any
time.
The SimpleEventBus
is, as the name suggests, a very basic
implementation of the EventBus
interface. It just dispatches each
incoming Event
to each of the subscribed EventListeners
sequentially. If an EventListener throws an Exception
, dispatching
stops and the exception is propagated to the component publising the
Event
.
The SimpleEventBus
is suitable for most cases where dispatching is
done synchronously and locally, (i.e. in a single JVM). Once you application
requires Events
to be published across multiple JVMs, you could
consider using the ClusteringEventBus
instead.
The ClusteringEventsBus
allows application developers to bundle
EventListener
s into Cluster
s based on their properties
and non-functional requirements. The ClusteringEventBus is also more capable to deal
with Events being dispatched among different machines.
The ClusteringEventsBus contains two mechanisms: the ClusterSelector
,
which selects a Cluster
instance for each of the registered
EventListener
s, and the EventBusTerminal
, which is
responsible for dispatching Events to each of the relevant clusters.
Background: Axon Terminal | |
---|---|
In the nervous system, an Axon (one of the components of a Neuron) transports electrical signals. These Neurons are interconnected in very complex arrangements. The Axon Terminal is responsible for transmitting these signals from one Neuron to another. More information: www.wikipedia.org/wiki/Axon_terminal. |
The primary responsibility of the ClusterSelector
is to, as the
name suggests, select a cluster for each Event Listener that subscribes to the
Event Bus. By default, all Event Listeners are placed in a single Cluster
instance, which dispatches events to its members sequentially and in the calling
thread (similar to how the SimpleEventBus
works). By providing a
custom implementation, you can arrange the Event Listeners into different
Cluster instances to suit the requirements of your architecture.
A number of ClusterSelector
implementations are available. The
ClassNamePrefixClusterSelector
, for example, uses a mapping of
package prefixes to decide which cluster is (most) suitable for an Event
Listener. Similarly, the ClassNamePatternClusterSelector
uses
pattern matching to decide whether a given cluster is suitable. You can use the
CompositeClusterSelector
to combine several cluster selectors
into a single one.
The Cluster
interface describes the behavior of a Cluster. By
adding information in the Meta Data of a Cluster, the selector can provide hints
to the Terminal about the expected behavior.
Clusters and Cluster Selectors in Spring | |
---|---|
Spring users can define clusters using the <axon:cluster> element. This element allows you to define the selection criteria for Event Handlers in that cluster. These criteria are transformed into cluster selectors and used to assign each Listener to one of the clusters. By default, this creates a cluster that handles events in the publishing thread. To use a cluster with other semantics, you can define a bean inside the <axon:cluster> element that specifies the Cluster implementation to use. The Clusters are automatically detected and connected to the Event Bus in the application context. |
The EventBusTerminal
forms the bridge between the Clusters inside
the Event Bus. While some terminals will dispatch within the same JVM, others
are aware of messaging technologies, such as AMQP to dispatch Event Messages to
clusters on remote machines. The default implementation dispatches published
events to each of the (local) clusters using the publishing thread. This means
that with the default terminal, and the default ClusterSelector
,
the behavior of the ClusteringEventBus
is exactly the same as that
of the SimpleEventBus
.
In a typical AMQP based configuration, the EventBusTerminal
would
send published events to an Exchange. For each cluster, a Queue would be
connected to that exchange. The EventBusTerminal
will create a
consumer for each cluster, which reads from its related Queue and forwards each
message to that cluster. Event Listeners in a distributed environment where at
most one instance should receive an Events should be placed in a separate
cluster, which competes with the other instances on a single Queue. See Section 6.4, “Distributing the Event Bus”
for more information.
Event listeners are the component that act on incoming events. Events may be of any
type. In the Axon Framework, all event listeners must implement the
EventListener
interface.
Event listeners need to be registered with an event bus (see Section 6.1, “Event Bus”) to be notified of events. The EventListener interface prescribes a single method to be implemented. This method is invoked for each Event Message passed on the Event Bus that it is subscribed to:
public class MyEventListener implements EventListener { public void handle(EventMessage message) { if (SomeEvent.class.isAssignableFrom(message.getPayloadType) { // handle SomeEvent } else if (OtherEvent.class.isAssignableFrom(message.getPayloadType) { // handle SomeOtherEvent } } }
Implementing the EventListener interface can produce a large if-statement and verbose plumbing code. Using annotations to demarcate Event Handler methods is a cleaner alternative.
The AnnotationEventListenerAdapter
can wrap any object into an
event listener. The adapter will invoke the most appropriate event handler
method available. These event handler methods must be annotated with the
@EventHandler
annotation.
The AnnotationEventListenerAdapter
, as well as the
AbstractAnnotatedAggregateRoot
, use
ParameterResolver
s to resolve the value that should be passed
in the parameters of methods annotated with @EventHandler
. By
default, Axon provides a number of parameter resolvers that allow you to use the
following parameter types:
The first parameter is always the payload of the Event message
Parameters annotated with @MetaData
will resolve to
the Meta Data value with the key as indicated on the annotation. If
required
is false
(default),
null
is passed when the meta data value is not
present. If required
is true
, the resolver
will not match and prevent the method from being invoked when the
meta data value is not present.
Parameters of type MetaData
will have the entire
MetaData
of an EventMessage
injected.
Parameters annotated with @Timestamp
and of type
org.joda.time.DateTime
will resolve to the
timestamp of the EventMessage
. This is the time at
which the Event was generated.
Parameters assignable to Message will have the entire
EventMessage
injected (if the message is assignable
to that parameter). If the first parameter is of type message, it
effectively matches an Event of any type, even if generic parameters
would suggest otherwise. Due to type erasure, Axon cannot detect
what parameter is expected. In such case, it is best to declare a
parameter of the payload type, followed by a parameter of type
Message.
When using Spring and <axon:annotation-config/>
is
declared, any other parameters will resolve to autowired beans, if
exactly one autowirable candidate is available in the application
context. This allows you to inject resources directly into
@EventHandler
annotated methods.
You can configure additional ParameterResolver
s by extending the
ParameterResolverFactory
class and creating a file named
/META-INF/service/org.axonframework.common.annotation.ParameterResolverFactory
containing the fully qualified name of the implementing class. Alternatively,
you can register your implementation at runtime using
ParameterResolverFactory.registerFactory()
. Make sure to do so
before any adapters are created, otherwise handlers may have been initialized
without those parameter resolvers.
In all circumstances, exactly one event handler method is invoked per listener instance. Axon will search the most specific method to invoke, in the following order:
On the actual instance level of the class hierarchy (as returned
by this.getClass()
), all annotated methods are
evaluated
If one or more methods are found of which all parameters can be resolved to a value, the method with the most specific type is chosen and invoked
If no methods are found on this level of the class hierarchy, the super type is evaluated the same way
When the top level of the hierarchy is reached, and no suitable event handler is found, the event is ignored.
// assume EventB extends EventA // and EventC extends EventB public class TopListener { @EventHandler public void handle(EventA event) { } @EventHandler public void handle(EventC event) { } } public class SubListener extends TopListener { @EventHandler public void handle(EventB event) { } }
In
the example above, the SubListener
will receive all instances of
EventB
as well as EventC
(as it extends
EventB
). In other words, the TopListener
will not
receive any invocations for EventC
at all. Since
EventA
is not assignable to EventB
(it's its
superclass), those will be processed by TopListener
.
The constructor of the AnnotationEventListenerAdapter
takes two
parameters: the annotated bean, and the EventBus
, to which the
listener should subscribe. You can subscribe and unsubscribe the event listener
using the subscribe()
and unsubscribe()
methods on the
adapter. Alternatively, you can use
AnnotationEventListenerAdapter.subscribe(listener, eventBus)
to
create and subscribe the listener in one invocation.
Tip | |
---|---|
If you use Spring, you can automatically wrap all annotated event
listeners with an adapter automatically by adding
|
By default, event listeners process events in the thread that dispatches them. This
means that the thread that executes the command will have to wait untill all event
handling has finished. For some types of event listeners this is not the optimal form of
processing. Asynchronous event processing improves the scalability of the application,
with the penalty of added complexity to deal with "eventual consistency". Axon Framework
provides the AsynchronousCluster
implementation, which dispatches Events to
Event Listeners asynchronously from the thread that published them to the
cluster.
Configuring the Asynchronous Cluster in Spring | |
---|---|
In Spring, you can place a Spring |
The AsynchronousCluster
needs an Executor
, for example a
ThreadPoolExecutor
and a SequencingPolicy
, a definition of
which events may be processed in parallel, and which sequentially. Finally a
TransactionManager
can be provided to process events within a
transaction, such as a database transaction.
The Executor
is responsible for executing the event processing. The
actual implementation most likely depends on the environment that the application runs
in and the SLA of the event handler. An example is the ThreadPoolExecutor
,
which maintains a pool of threads for the event handlers to use to process events. The
AsynchronousCluster
will manage the processing of incoming events in
the provided executor. If an instance of a ScheduledThreadPoolExecutor
is
provided, the AsynchronousCluster
will automatically leverage its ability
to schedule processing in the cases of delayed retries.
The SequencingPolicy
defines whether events must be handled sequentially,
in parallel or a combination of both. Policies return a sequence identifier of a given
event. If the policy returns an equal itentifier for two events, this means that they
must be handled sequentially be the event handler. A null
sequence
identifier means the event may be processed in parallel with any other event.
Axon provides a number of common policies you can use:
The FullConcurrencyPolicy
will tell Axon that this event
handler may handle all events concurrently. This means that there is no
relationship between the events that require them to be processed in a
particular order.
The SequentialPolicy
tells Axon that all events must be
processed sequentially. Handling of an event will start when the handling of
a previous event is finished.
SequentialPerAggregatePolicy
will force domain events that were
raised from the same aggregate to be handled sequentially. However, events
from different aggregates may be handled concurrently. This is typically a
suitable policy to use for event listeners that update details from
aggregates in database tables.
Besides these provided policies, you can define your own. All policies
must implement the EventSequencingPolicy
interface. This interface defines
a single method, getSequenceIdentifierFor
, that returns the identifier
sequence identifier for a given event. Events for which an equal sequence identifer is
returned must be processed sequentially. Events that produce a different sequence
identifier may be processed concurrently. For performance reasons, policy
implementations should return null
if the event may be processed in
parallel to any other event. This is faster, because Axon does not have to check for any
restrictions on event processing.
A TransactionManager
can be assigned to a
AsynchronousCluster
to add transactional processing of events. To
optimize processing, events can be processed in small batches inside a transaction. When
using Spring, you can use the SpringTransactionManager
to manage
transactions with Spring's PlatformTransactionManager
. For more
customization of transactional behavior, you can alternatively configure a
UnitOfWorkFactory
. That factory will be used to generate the Unit of
Work wrapping the Event Handling process. By default, a
DefaultUnitOfWorkFactory
is used, which uses the provided
TransactionManager
, if any, to manage the backing Transactions.
The AsynchronousCluster
uses an ErrorHandler
to decide
what needs to be done when an Event Listener or Unit of Work throws an Exception.
The default behavior depends on the availability of a TransactionManager. If a
TransactionManager
is provided, the default
ErrorHandler
will request a rollback and retry the Event Handling
after 2 seconds. If no TransactionManager
is provided, the defautl
ErrorHandler
will simply log the Exception and proceed with the
next Event Listener, guaranteeing that each Event Listener will receive each Event.
In any situation, a rollback will not be requested when the
exception is explicitly non-transient (i.e. is caused by an
AxonNonTransientException
).
You can change this behavior by configuring another ErrorHandler, or by creating your own. The ErrorHandler interface has a single method, which provides the Exception that occurred, the EventMessage being processed and a reference to the EventListener throwing the exception. The return value is of type RetryPolicy. The RetryPolicy tells the Event Processor what it needs to do with the failure. There are three static methods on RetryPolicy for the most common scenarios:
retryAfter(int timeout, TimeUnit unit)
tells the
scheduler that the Unit of Work should be rolled back, and the Event
Message should be rescheduled for handling after the given amount of
time. This means that some Event Listeners may have received the Event
more than once.
proceed()
tells the scheduler to ignore the Exception and
proceed with the processing of the Event Message. This may be with the
intent to skip the Event on the Event Listener, or because the
ErrorHandler
has managed to resolve the problem by
retrying invoking the EventHandler
itself.
skip()
tells the scheduler to rollback the Unit of Work
and proceed with the next Event Message. If all Event Listeners properly
support Transactions, will effectively mean that the Event is skipped
altogether.
If the RetryPolicy
you wish to use does not depend on
the type of Exception
, EventMessage
or
EventListener
, you can use the DefaultErrorHandler
and
pass the desired RetryPolicy
as its constructor parameter. It will
return that RetryPolicy
on each exception, unless it requests a retry
of an Event that caused an explicitly non-transient exception.
In a distributed environment, it may be necessary to transport Event Messages between
JVM's. The ClusteringEventBus
has a possiblity to define an
EventBusTerminal
. This is an interface to a mechansim that publishes
Events to all relevant clusters. Some EventBusTerminal implementations allow
distribution of Events over multiple JVM's.
Background of the name "Terminal" | |
---|---|
While most developers association the word "terminal" to a thin client computer connected to a mainframe, the association to make here is slightly different. In Neurology, an Axon Terminal is an endpoint of an Axon that transmits electronic impulses from one Neuron to another. For more detailed information, see http://en.wikipedia.org/wiki/Axon_terminal. |
The Spring AMQP Terminal uses the Spring AMQP module to transmit events to an AMQP compatible message broker, such as Rabbit MQ. It also connects local clusters to queues on that message broker.
The axon-amqp
namespace
(http://www.axonframework.org/schema/amqp
) allows you to configure
an AMQP Terminal by adding the <axon-amqp:terminal>
element to the
Spring application context. On this element, you can define different properties for
the terminal, as well as a configuration containing defaults to use to connect each
cluster to an AMQP Queue.
The example below shows an example configuration for an AMQP Terminal. The
default-configuration
element specified the defaults for the
Clusters if they don't provide their own
values.
<axon-amqp:terminal id="terminal" connection-factory="amqpConnection" serializer="serializer" exchange-name="AxonEventBusExchange"> <axon-amqp:default-configuration transaction-manager="transactionManager" transaction-size="25" prefetch="200" error-handler="loggingErrorHandler"/> </axon-amqp:terminal> <bean id="amqpConnection" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"/>
The configure the Spring AMQP Terminal "manually", you need to specify a number of beans in your application context:
The ListenerContainerLifecycleManager
is responsible for
creating ListenerContainers. These are the Spring classes that listen for
messages on the AMQP Queues and forward them to the processing components.
The ListenerContainerLifecycleManager
allows you to configure
the number of messages to process in a single transaction, the number of
messages it may read ahead, etc.
Note that the ListenerContainerLifecycleManager
must be
defined as a top-level bean.
An AMQP ConnectionFactory
, which creates the connections to
the AMQP Message Broker. Spring provides the
CachingConnectionFactory
, which is a sensible
default.
The Spring AMQPTerminal itself, which connects the aforementioned components to the event publishers and event listeners. There is a large number of configuration options that allow you to tweak the terminal's behavior:
transactional: indicates whether the messages should be dispatched to the AMQP Broker inside a transaction. This is especially useful when multiple events need to be sent either completely, or not at all.
durable: indicates whether messsages should be durable (i.e. survive a Broker shutdown) or not. Obviously, message durability involves a performance impact.
connectionFactory: configures the ConnectionFactory to use. Useful when the application context contains more than one instance. Otherwise, the only available instance is autowired.
serializer: the serializer to serialize the MetaData and Payload of EventMessages with. Defaults to an autowired serializer.
exchangeName or exchange: defines the exchange (either defined by the name, or by a reference to an Exchange bean) to which published Event Messages should be sent. Defaults to "Axon.EventBus"
queueNameResolver: defines the mechanism that chooses the Queue that each Cluster should be connected to. By default, the resolver will use the configuration provided in each Cluster's Meta Data under the name "AMQP.Config". Otherwise, it uses the Cluster's name as Queue Name.
routingKeyResolver: defines the mechanism that generates an AMQP routing key for an outgoing Message. Defaults to a routing key resolver that returns the package name of the Message's payload. Routing keys can be used by echanges to define which queues should receive a (copy of a) Message
listenerContainerLifecycleManager: when the application context contains more than one, defines which listenerContainerLifecycleManager instance to use.
exclusive: indicates whether this Cluster accepts to share a
Queue with other Cluster instances. Default to
true
. If a second cluster is attampting to connect
exclusively to a queue, an exception is thrown. The Connector
catches this exception and reattempts to connect each 2 seconds.
This allows for automatic failover when a machine drops its
connection.
When a cluster is selected for an Event Listener, it will be registered with the
terminal. At that point, the Spring AMQP terminal will check if there is any
cluster-specific configuration available. It does so by checking the
AMQP.Config
MetaData value. If that value is an instance of
AMQPConsumerConfiguration
(such as
SpringAMQPConsumerConfiguration
) any settings configured there will
override the defaults set on the terminal itself. This allows you to specify
different behavior (such as transaction size) for different clusters.
// XML Configuration for a Cluster with AMQPConsumerConfiguration <axon:cluster id="myDefaultCluster" default="true"> <axon:meta-data> <entry key="AMQP.Config"> <axon-amqp:configuration transaction-size="20000"/> </entry> </axon:meta-data> </axon:cluster>
One of the advantages of Event Sourcing is that you keep track of the entire history of the application. This history allows you to extract valuable information and build new models out of them. For example, when a screen is added to the application, you can create a new query model and database tables, and have these filled using events you have collected in the past. Sometimes, replays of the Event Store are also needed to fix data that has been corrupted due to a bug.
Axon provides the ReplayingCluster
, a wrapper around another
Cluster
implementation that adds the replay capability. The
ReplayingCluster is initialized with a number of resources. First of all, it needs
another Cluster implementation. That other cluster is the actual cluster that takes
care of dispatching Event Messages to subscribed listeners. It also needs a
reference to the Event Store (implementing EventStoreManagement
) that
will supply the Events for the replay. A transaction manager is used to wrap the
replay in a transaction. Since a single transaction may be too large to be
efficient, you can configure a "commit threshold" to indicate the number of messages
that should be processed before performing an intermediate commit. Finally, you need
to supply an IncomingMessageHandler
. The
IncomingMessageHandler
tells the ReplayingCluster what to do when
an Event Message is published to the cluster while it is replaying.
Warning | |
---|---|
Make sure not to replay onto Clusters that contain Event Listeners that do not support replaying. A typical example is an Event Listener that sends emails. Replaying on a cluster that contains such an Event Listener can have nasty side-effects. |
Axon provides two IncomingMessageHandler
implementations. The
BackloggingIncomingMessageHandler
simply backlogs any incoming
events (in-memory) and postpones processing of these events until the replay is
finished. If an event is backlogged, but was also part of the replay, it is
automatically removed from the backlog to prevent duplicate processing. The other
implementation is the DiscardingIncomingMessageHandler
. As the name
suggests, it simply discards any messages published during a replay. This
implementation will ensure the fastest replay, but is not safe to use when you
expect messages to be published to the cluster during the replay. You can also
create your own implementation. The JavaDoc describes the requirements (incl. thread
safety) for each method.
Note | |
---|---|
Although it is technically possible to do a full replay at runtime, it should be considered a maintenance operation and be executed while the cluster is not in active use by the application. |
In Spring, a Cluster can be marked as replayable by adding the
<axon:replay-config>
element as a child if the
<axon:cluster>
element. When the replay-config element is
present, Axon will automatically wrap the cluster in a ReplayingCluster
using the provided configuration. This also means that
applicationContext.getBean(clusterName)
will return a bean of type
ReplayingCluster
.
In many cases, the data source used by Event Listeners needs to be prepared for a
replay. Database tables, for example, typically need to be cleared. Event Listeners
can implement the ReplayAware
interface. When they do, their
beforeReplay
and afterReplay
will be invoked before
and after the replay respectively. Both methods are invoked within the scope of a
transaction.
Axon does not automatically trigger a replay. The ReplayingCluster provides two
methods that can start a replay: startReplay()
and
startReplay(Executor)
. The first will execute the replay in the
calling thread, meaning that the call will return when the replay is finished. The
latter will execute the replay using the given executor and return a Future object
that allows the caller to check if the replay is finished.
Note | |
---|---|
Note that a replay may take a long time to finish, depending on the number of Events that need to be processed. Therefore, ensure that it is not possible to rebuild the model using other models already available, which is typically faster. Also make sure to properly test a replay before applying it in a production environment. |