The Events generated by the application need to be dispatched to the components that update the query databases, search engines or any other resources that need them: the Event Listeners. This is the responsibility of the Event Bus. Axon Framework provides an Event Bus and some base classes to help you implement Event Listeners.
The EventBus
is the mechanism that dispatches events to the subscribed
event listeners. Axon Framework provides an implementation of the event bus:
SimpleEventBus
. The SimpleEventBus
manages subscribed
EventListeners
and forwards all incoming events to all subscribed
listeners. This means that Event Listeners must be explicitly registered with the Event
Bus in order for them to receive events. The registration process of the
SimpleEventBus
is thread safe. Listeners may register and unregister
for events at any time.
Event listeners are the component that act on incoming events. These events may be of
any type of the events mentioned in Section 4.1, “Events”. In the Axon Framework, all
event listeners must implement the EventListener
interface.
Event listeners need to be registered with an event bus (see Section 6.1, “Event Bus”) to be notified of events. Axon provides a base implementation that take care of this, and other things, for you.
The
AnnotationEventListenerAdapter
can wrap any object into an
event listener. The adapter will invoke the most appropriate event handler
method available. These event handler methods must be annotated with the
@EventHandler
annotation and are resolved according to the same
rules that count for annotated aggregate roots (see the section called “
AbstractAnnotatedAggregateRoot
”).
The constructor of the
AnnotationEventListenerAdapter
takes two
parameters: the annotated bean, and the EventBus
, to which the
listener should subscribe. You can subscribe and unsubscribe the event listener
using the
subscribe()
and
unsubscribe()
methods on the
adapter.
Tip | |
---|---|
If you use Spring, you can automatically wrap all annotated event
listeners with an adapter automatically by adding
|
By default, event listeners process events in the thread that dispatches them.
This means that the thread that executes the command will have to wait untill all
event handling has finished. For some types of event listeners this is not the
optimal form of processing. Asynchronous event processing improves the scalability
of the application, with the penalty of added complexity to deal with "eventual
consistency". With the Axon Framework, you can easily convert any event handler into
an asynchronous event handler by wrapping it in an
AsynchronousEventHandlerWrapper
or, when using annotations, adding
the type-level
AsynchronousEventListener
annotation.
The
AsynchronousEventHandlerWrapper
needs some extra configuration to
make an event handler asynchronous. The first thing that the wrapper needs is an
Executor
, for example a ThreadPoolExecutor
. The second
is the SequencingPolicy
, a definition of which events may be processed
in parallel, and which sequentially. The last one is optional: the
TransactionManager
, which enables you to run event processing
within a transaction. The next pragraphs will provide more details about the
configuration options.
The
Executor
is responsible for executing the event processing. The
actual implementation most likely depends on the environment that the application
runs in and the SLA of the event handler. An example is the
ThreadPoolExecutor
, which maintains a pool of threads for the event
handlers to use to process events. The
AsynchonousEventHandlerWrapper
will manage the processing of incoming events in the provided executor. If an
instance of a
ScheduledThreadPoolExecutor
is provided, the
AsynchronousEventHandlerWrapper
will automatically leverage its
ability to schedule processing in the cases of delayed retries. See
Section 6.2.3, “Managing transactions in asynchronous event handling”
for more information about transactions.
The
SequencingPolicy
defines whether events must be handled
sequentially, in parallel or a combination of both. Policies return a sequence
identifier of a given event. If two events have the same sequence identifier, this
means that they must be handled sequentially be the event handler. A
null
sequence identifier means the event may be processed in
parallel with any other event.
Axon provides a number of common policies you can use:
The FullConcurrencyPolicy
will tell Axon that this event
handler may handle all events concurrently. This means that there is no
relationship between the events that require them to be processed in a
particular order.
The SequentialPolicy
tells Axon that all events must be
processed sequentially. Handling of an event will start when the
handling of a previous event is finished. For annotated event handlers,
this is the default policy.
SequentialPerAggregatePolicy
will force domain events that
were raised from the same aggregate to be handled sequentially. However,
events from different aggregates may be handled concurrently. This is
typically a suitable policy to use for event listeners that update
details from aggregates in database tables.
Besides these provided policies, you can define your own. All
policies must implement the EventSequencingPolicy
interface. This
interface defines a single method, getSequenceIdentifierFor
, that
returns the identifier sequence identifier for a given event. Events for which an
equals sequence identifer is returned must be processed sequentially. Events that
produce a different sequence identifier may be processed concurrently. For
performance reasons, policy implementations should return null
if the
event may be processed in parallel to any other event. This is faster, because Axon
does not have to check for any restrictions on event processing.
A
TransactionManager
can be assigned to a
AsynchronousEventHandlerWrapper
to add transactional processing of
events. To optimize processing, events can be processed in small batches inside a
transaction. The transaction manager has the ability to influence the size of these
batches and can decide to either commit, skip or retry event processing based on the
result of a batch. See
Section 6.2.3, “Managing transactions in asynchronous event handling”
for more
information.
If you use the AnnotationEventListenerAdapter
, or
<axon:annotation-config/>
, an Executor
must be
configured to allow asynchronous processing of events.
You can configure the event sequencing policy on the
@AsynchronousEventListener
annotation. You then set the
sequencePolicyClass
to the type of policy you like to use. Note
that you can only choose policy classes that provide a public no-arg
constructor.
@AsynchronousEventListener(sequencingPolicyClass = MyCustomPolicy.class) public class MyEventListener() { @EventHandler public void onSomeImportantEvent(MyEvent event) { // eventProcessing logic } } public class MyCustomPolicy implements EventSequencingPolicy { public Object getSequenceIdentifierFor(Event event) { if (event instanceof MyEvent) { // let's assume that we do processing based on the someProperty field. return ((MyEvent) event).someProperty(); } return null; } }
With annotation support, the event handler bean must also act as a transaction manager in order to support transactions. There is annotation support for transaction management, too (see Section 6.2.3, “Managing transactions in asynchronous event handling”).
In some cases, your event handlers have to store data in systems that use transactions. Starting and committing a transaction for each single event has a big performance impact. In Axon, events are processed in batches. The batch size depends of the number of events that need to be processed and the settings provided by the event handler. By default, the batch size is set to the number of events available in the processing queue at the time a batch starts.
Note | |
---|---|
Typically, when using synchronous event handling, the transaction boundary is managed at the Command Bus level. Asynchronous event handlers, on the other hand, run in another thread and are often unable to act within the same transaction. The transaction managers used by event handlers should not be confused with the transaction interceptors, which are used with the Command Bus. See Section 3.5.1, “Transaction management” for more information about transactions in the command bus. |
In most cases, event handling is done using a thread pool executor, or scheduler. The scheduler will schedule batches of event processing as soon as event become available. When a batch is completed, the scheduler will reschedule processing of the next batch, as long as more events are available. The smaller a batch, the more "fair" the distribution of event handler processing is, but also the more scheduling overhead you create.
When an event listener is wrapped with the
AsynchronousEventHandlerWrapper
, you can configure a
TransactionManager
to handle transactions for the event listener.
The transaction manager can, based on the information in the
TransactionStatus
object, decide to start, commit or rollback a
transaction to an external system.
The
beforeTransaction(TransactionStatus)
method is invoked just
before Axon will start handling an event batch. You can use the TransactionStatus
object to configure the batch before it is started. For example, you can change the
maximum number of events that may run in the batch.
The
afterTransaction(TransactionStatus)
method is invoked after the
batch has been processed, but before the scheduler has scheduled the next batch.
Based on the value of isSuccessful()
, you can decide to commit or
rollback the underlying transaction.
There are a number of settings you can use on the
TransactionStatus
object.
You can configure a yielding policy, which gives the scheduler an indication
of that to do when a batch has finished, but more events are available for
processing. Use
DO_NOT_YIELD
if you want the scheduler to continue
processing immediately as long as new events are available for processing. The
YIELD_AFTER_TRANSACTION
policy will tell the scheduler to
reschedule the next batch for processing when a thread is available. The first
will make sure events are processed earlier, while the latter provides a fairer
execution of events, as yielding provides waiting thread a chance to start
processing. The choice of yielding policy should be driven by the SLA of the
event listener.
You can set the maximum number of events to handle within a transaction using
setMaxTransactionSize(int)
. The default of this value is the
number of events ready for processing at the moment the transaction started.
When an event handler throws an exception, for example because a data source
is not available, the transaction is marked as failed. In that case,
isSuccessful()
on the
TransactionStatus
object
will return
false
and
getException()
will return the
exception that the scheduler caught. It is the responsibility of the event
listener to rollback or commit any active underlying transactions, based on the
information provided by these methods.
The event handler can provide a policy
setRetryPolicy(RetryPolicy)
to tell the scheduler what to do in
such case. There are three policies, each for a specific scenario:
RETRY_TRANSACTION
tells the event handler scheduler
that the entire transaction should be retried. It will reschedule
all the events in the current transaction for processing. This
policy is suitable when the event listener processes events to a
transactional data source that rolls back an entire transaction.
RETRY_LAST_EVENT
is the policy that tells the scheduler
to only retry the last event in the transaction. This is suitable if
the underlying data source does not support transactions or if the
transaction was committed without the last event.
SKIP_FAILED_EVENT
will tell the scheduler to ignore the
exception and continue processing with the next event. The event
listener can still try to commit the underlying transaction to
persist any changed made while processing other events in this
transaction. This is the default policy.
Note that the
SKIP_FAILED_EVENT
is the default policy. For event
handlers that use an underlying mechanism to perform actions, this might not be
a suitable policy. Exceptions resulting from errors in these underlying systems
(such as databases or email clients) would cause events to be ignored when the
underlying system is unavailable. In error situations, the event listener should
inspect the exception (using the
getException()
method) and decide
whether it makes sense to retry processing of this event. If that is the case,
it should set the
RETRY_LAST_EVENT
or
RETRY_TRANSACTION
policy, depending on the transactional
behavior of the underlying system.
When the chosen policy forces a retry of event processing, the processing is
delayed by the number of milliseconds defined in the
retryInterval
property. The default interval is 5 seconds.
You can change transaction semantics event during event processing. This can be done in one of two ways, depending on the type of event handler you use.
If you use the
@EventHandler
annotation to mark event handler
methods, you may use a second parameter of type TransactionStatus
.
If such parameter is available on the annotated method, the current
TransactionStatus
object is passed as a parameter.
Alternatively, you can use the static
TransactionStatus.current()
accessor to gain access to the status of the current transaction. Note that this
method returns null
if there is no active transaction.
With the current transaction status, you can use the
requestImmediateYield()
and
requestImmediateCommit()
methods to end the transaction after
processing of the event. The former will also tell the scheduler to reschedule
the remainder of the events for another batch. The latter will use the yield
policy to see what needs to be done. Since the default yielding policy is
YIELD_AFTER_TRANSACTION
, the behavior of both methods is
identical when using these defaults.
As with many of the other supported features in Axon, there is also annotation support for transaction management. You have several options to configure transactions.
The first is to annotate methods on your EventListener with
@BeforeTransaction
and @AfterTransaction
. These
methods will be called before and after the execution of a transactional batch,
respectively. The annotated methods may accept a single parameter of type
TransactionStatus
, which provides access to transaction
details, such as current status and configuration.
Alternatively, you can use an external Transaction Manager, which you assign
to a field. If you annotate that field with @TransactionManager
,
Axon will autodetect it and use it as transaction manager for that listener. The
transaction manager may be either one that implements the TransactionManager
interface, or any other type that uses annotations.
Currently, Axon Framework provides one TransactionManager implementation, the
SpringTransactionManager
. This implemenation uses Spring's
PlatformTransactionManager
as underlying transaction mechanism.
That means the SpringTransactionManager
can manage any transactions
in resources that Spring supports.