Not every command is able to completely execute in a single ACID transaction. A very common example that pops up quite often as an argument for transactions is the money transfer. It is often believed that an atomic and consistent transaction is absolutely required to transfer money from one account to another. Well, it's not. On the contrary, it is quite impossible to do. What if money is transferred from an account on Bank A, to another account on Bank B? Does Bank A acquire a lock in Bank B's database? If the transfer is in progress, is it strange that Bank A has deducted the amount, but Bank B hasn't deposited it yet? Not really, it's "underway". On the other hand, if something goes wrong while depositing the money on Bank B's account, Bank A's customer would want his money back. So we do expect some form of consistency, ultimately.
While ACID transactions are not necessary or even impossible in some cases, some form of transaction management is still required. Typically, these transactions are referred to as BASE transactions: Basic Availability, Soft state, Eventual consistency. Contrary to ACID, BASE transactions cannot be easily rolled back. To roll back, compensating actions need to be taken to revert anything that has occurred as part of the transaction. In the money transfer example, a failure at Bank B to deposit the money, will refund the money in Bank A.
In CQRS, Sagas are responsible for managing these BASE transactions. They respond on Events produced by Commands and may produce new commands, invoke external applications, etc. In the context of Domain Driven Design, it is not uncommon for Sagas to be used as coordination mechanism between several bounded contexts.
A Saga is a special type of Event Listener: one that manages a business transaction. Some transactions could be running for days or even weeks, while others are completed within a few milliseconds. In Axon, each instance of a Saga is responsible for managing a single business transaction. That means a Saga maintains state necessary to manage that transaction, continuing it or taking compensating actions to roll back any actions already taken. Typically, and contrary to regular Event Listeners, a Saga has a starting point and an end, both triggered by Events. While the starting point of a Saga is usually very clear, while there could be many ways for a Saga to end.
In Axon, all Sagas must implement the Saga
interface. As with Aggregates,
there is a Saga implementation that allows you to annotate event handling methods with
@SagaEventHandler
: the AbstractAnnotatedSaga
.
As a single Saga instance is responsible for managing a single transaction. That means you need to be able to indicate the start and end of a Saga's Life Cycle.
The AbstractAnnotatedSaga
allows you to annotate Event Handlers with
an annotation (@SagaEventHandler
). If a specific Event signifies the
start of a transaction, add another annotation to that same method:
@StartSaga
. This annotation will create a new saga and invoke its
event handler method when a matching Event is published.
By default, a new Saga is only started if no suitable existing Saga (of the same
type) can be found. You can also force the creation of a new Saga instance by
setting the forceNew
property on the @StartSaga
annotation
to true
.
Ending a Saga can be done in two ways. If a certain Event always indicates the end
of a Saga's life cycle, annotate that Event's handler on the Saga with
@EndSaga
. The Saga's Life Cycle will be ended after the invocation
of the handler. Alternatively, you can call end()
from inside the Saga
to end the life cycle. This allows you to conditionally end the Saga.
Note | |
---|---|
If you don't use annotation support, you need to properly configure your Saga
Manager (see Section 7.2.1, “SagaManager” below). To end a Saga's life cycle,
make sure the |
Event Handling in a Saga is quite comparable to that of a regular Event Listener. There is one major difference, though. While there is a single instance of an Event Listener that deals will all incoming events, multiple instances of a Saga may exist, each interested in different Events. For example, a Saga that manages a transaction around an Order with Id "1" will not be interested in Events regarding Order "2", and vice versa.
Instead of publishing all Events to all Saga instances (which would be a complete
waste of resources), Axon will only publish Events containing properties that the
Saga has been associated with. This is done using AssociationValue
s. An
AssociationValue
consists of a key and a value. The key represents
the type of identifier used, for example "orderId" or "order". The value represents
the corresponding value, "1" or "2" in the previous example.
The @SagaEventHandler
annotation has two attributes, of which
associationProperty
is the most important one. This is the name of
the property on the incoming Event that should be used to find associated Sagas. The
key of the association value is the name of the property. The value is the value
returned by property's getter method.
For example, consider an incoming Event with a method "String
getOderId()
", which returns "123". If a method accepting this Event
is annotated with @SagaEventHandler(associationProperty="orderId")
,
this Event is routed to all Sagas that have been associated with an
AssociationValue
with key "orderId" and value "123". This may
either be exactly one, more than one, or even none at all.
When a Saga manages a transaction around one or more domain concepts, such as Order, Shipment, Invoice, etc, that Saga needs to be associated with instances of those concepts. An association requires two parameters: the key, which identifies the type of association (Order, Shipment, etc) and a value, which represents the identifier of that concept.
Associating a Saga with a concept is done in several ways. First of all, when
a Saga is newly created when invoking a @StartSaga
annotated Event
Handler, it is automatically associated with the property identified in the
@SagaEventHandler
method. Any other association can be created
using the associateWith(String key, Object value)
method. Use the
removeAssociationWith(String key, Object value)
method to
remove a specific association.
Imagine a Saga that has been created for a transaction around an Order. The
Saga is automatically associated with the Order, as the method is annotated with
@StartSaga
. The Saga is responsible for creating an Invoice for
that Order, and tell Shipping to create a Shipment for it. Once both the
Shipment have arrived and the Invoice has been paid, the transaction is
completed and the Saga is closed.
Here is the code for such a Saga:
public class OrderManagementSaga extends AbstractAnnotatedSaga { private boolean paid = false; private boolean delivered = false; private transient CommandBus commandBus; @StartSaga @SagaEventHandler(associationProperty = "orderId") public void handle(OrderCreatedEvent event) { // client generated identifiers ShippingId shipmentId = createShipmentId(); InvoiceId invoiceId = createInvoiceId(); // associate the Saga with these values, before sending the commands associateWith("shipmentId", shipmentId); associateWith("invoiceId", invoiceId); // send the commands commandBus.dispatch(new PrepareShippingCommand(...)); commandBus.dispatch(new CreateInvoiceCommand(...)); } @SagaEventHandler(associationProperty = "shipmentId") public void handle(ShippingArrivedEvent event) { delivered = true; if (paid) { end(); } } @SagaEventHandler(associationProperty = "invoiceId") public void handle(InvoicePaidEvent event) { paid = true; if (delivered) { end(); } } // ... }
By allowing clients to generate an identifier, a Saga can be easily associated with a concept, without the need to a request-response type command. | |
We associate the event with these concepts before publishing the command. This way, we are guaranteed to also catch events generated as part of this command. | |
This will end this saga once the invoice is paid and the shipment has arrived. |
Of course, this Saga implementation is far from complete. What should happen if the invoice is not paid in time. What if the shipment cannot be delivered? The Saga should be able to cope with those scenarios as well.
As said before, an AssociationValue consists of a key, which is a String, and
a value, which may be of any type. However, a few simple rules should be
conformed to. The value of an AssociationValue should always be a Value Object
(as defined in Domain Driven Design). In terms of implementation, that means
they should be immutable and have a proper equals
and
hashCode
method. Preferably, they should implement the
Comparable interface, too. If Sagas need to be persisted,
AssociationValue
s need to be serializable. In that case, only
use Serializable values. Generally, it shouldn't be a problem to make Value
Objects Serializable.
It is easy to make a Saga take action when something happens. After all, there is an Event to notify the Saga. But what if you want your Saga to do something when nothing happens? That's what deadlines are used for. In invoices, that's typically several weeks, while the confirmation of a credit card payment should occur within a few seconds.
In Axon, you can use an EventScheduler
to schedule an Event for
publication. In the example of an Invoice, you'd expect that invoice to be paid
within 30 days. A Saga would, after sending the CreateInvoiceCommand
,
schedule an InvoicePaymentDeadlineExpiredEvent
to be published in 30
days. The EventScheduler returns a ScheduleToken
after scheduling an
Event. This token can be used to cancel the schedule, for example when a payment of
an Invoice has been received.
Scheduled Events must extend ApplicationEvent
or
ScheduledEvent
. The latter is aware of its own publication time.
Axon provides two EventScheduler implementations: a pure Java one and one using
Quartz as a backing scheduling mechanism.
This pure-Java implementation of the EventScheduler
uses a
ScheduledExecutorService
to schedule Event publication.
Although the timing of this scheduler is very reliable, it is a pure in-memory
implementation. Once the JVM is shut down, all schedules are lost. This makes
this implementation unsuitable for long-term schedules.
The SimpleEventScheduler
needs to be configured with an
EventBus
and a SchedulingExecutorService
(see the
static methods on the java.util.concurrent.Executors
class for
helper methods).
The QuartzEventScheduler
is a more reliable and enterprise-worthy
implementation. Using Quartz as underlying scheduling mechanism, it provides
more powerful features, such as persistence, clustering and misfire management.
This means Event publication is guaranteed. It might be a little late, but it
will be published.
It needs to be configured with a Quartz Scheduler
and an
EventBus
. Optionally, you may set the name of the group that
Quartz jobs are scheduled in, which defaults to "AxonFramework-Events".
One or more components will be listening for scheduled Events. These
components might rely on a Transaction being bound to the Thread that invokes
them. Scheduled Events are published by Threads managed by the
EventScheduler
. To manage threads, you can configure a
EventTriggerCallback
to listen for publication of scheduled
Events and manage transactions around them.
Note | |
---|---|
Spring users can use the |
Sagas generally do more than just maintaining state based on Events. They interact with external components. To do so, they need access to the Resources necessary to address to components. Usually, these resources aren't really part of the Saga's state and shouldn't be persisted as such. But once a Saga is reconstructed, these resources must be injected before an Event is routed to that instance.
For that purpose, there is the ResourceInjector
. It is use by the
SagaRepository
to inject resources into a Saga. Axon provides a
SpringResourceInjector
, which injects annotated fields and methods
with Resources from the Application Context.
Mark fields holding injected resources transient | |
---|---|
Since resources should not be persisted with the Saga, make sure to add
the |
Events need to be redirected to the appropriate Saga instances. To do so, some
infrastructure classes are required. The most important components are the
SagaManager
and the SagaRepository
.
The SagaManager
is responsible for redirecting Events to the
appropriate Saga instances and managing their life cycle. There are two
SagaManager
implementations in Axon Framework: the
AnnotatedSagaManager
, which provides the annotation support and the
SimpleSagaManager
, which is less powerful, but doesn't force you
into using annotations.
Sagas operate in a highly concurrent environment. Multiple Events may reach a Saga
at (nearly) the same time. This means that Sagas need to be thread safe. By default,
Axon's SagaManager
implementations will synchronize access to a Saga
instance. This means that only one thread can access a Saga at a time, and all
changes by one thread are guaranteed to be visible to any successive threads (a.k.a
happens-before order in the Java Memory Model). Optionally, you may switch this
locking off, if you are sure that your Saga is completely thread safe on its own.
Just setSynchronizeSagaAccess(false)
. When disabling synchronization,
do take note of the fact that this will allow a Saga to be invoked while it is in
the process of being stored by a repository. The result may be that a Saga is stored
in an inconsistent state first, and overwritten by it's actual state later.
This is by far the least powerful of the two implementations, but it doesn't
require the use of annotations. The SimpleSagaManager
needs to be
configured with a number of resources. Its constructor requires the type of Saga
it manages, the SagaRepository
, an
AssociationValueResolver
, a SagaFactory
and the
EventBus
. The AssociationValueResolver
is a
component that returns a Set
of AssociationValue
for a
given Event.
Then, you should also configure the types of Events the SagaManager should
create new instances for. This is done through the
setEventsToAlwaysCreateNewSagasFor
and
setEventsToOptionallyCreateNewSagasFor
methods. They both
accept a List of Saga classes.
This SagaManager implementation uses annotations on the Sagas themselves to manage the routing and life cycle of that Saga. As a result, this manager allows all information about the life cycle of a Saga to be available inside the Saga class itself. It can also manage any number of saga types. That means only a single AnnotatedSagaManager is required, even if you have multiple types of Saga.
The AnnotatedSagaManager
is constructed using a SagaRepository, a
SagaFactory (optional) and a vararg array of Saga classes. If no
SagaFactory
is provided, a GenericSagaFactory
is
used. It assumes that all Saga classes have a public no-arg constructor.
If you use Spring, you can use the axon
namespace to configure an
AnnotatedSagaManager. The supported Saga types are provided as a comma separated
list. This will also automatically configure a SpringResourceInjector, which
injects any annotated fields with resources from the Spring Application
Context.
<axon:saga-manager id="sagaManager" saga-factory="optionalSagaFactory" saga-repository="sagaRepository" event-bus="eventBus"> <axon:types> fully.qualified.ClassName, another.fq.ClassName </axon:types> </axon:saga-manager>
As with Event Listeners, it is also possible to asynchronously handle events
for sagas. To handle events asynchronously, the SagaManager needs to be
configured with an Executor
implementation. The
Executor
supplies the threads needed to process the events
asynchronously. Often, you'll want to use a thread pool. You may, if you want,
share this thread pool with other asynchronous activities.
When an executor is provided, the SagaManager will automatically use it to find associated Saga instances and dispatch the events each of these instances. The SagaManager will guarantee that for each Saga instance, all events are processed in the order they arrive. For optimization purposes, this guarantee does not count in between Sagas.
Because Transactions are often Thread bound, you may need to configure a Transaction Manager with the SagaManager. This transaction manager is invoked before and after each invocation to the Saga Repository and before and after each batch of Events has been processed by the Saga itself. The Transaction Manager has the opportunity to configure the batch size each time a transaction starts. The batch size describes the number of Events that a Saga may process before the transaction is committed. This mechanism can be compared to the mechanism for Event Listeners, desribed in Section 6.2.3, “Managing transactions in asynchronous event handling”.
In a Spring application context, a Saga Manager can be marked as asynchronous
by adding the executor
and optionally the
transaction-manager
attributes to the saga-manager
element, as shown below.
<axon:saga-manager id="sagaManager" saga-factory="optionalSagaFactory" saga-repository="sagaRepository" event-bus="eventBus" executor="myThreadPool" transaction-manager="txManager"> <axon:types> fully.qualified.ClassName, another.fq.ClassName </axon:types> </axon:saga-manager>
The
transaction-manager should point to a PlatformTransactionManager
,
Spring's interface for transaction managers. Generally you can use the same
transaction manager as the other components in your application (e.g.
JpaTransactionManager
).
The SagaRepository
is responsible for storing and retrieving Sagas,
for use by the SagaManager
. It is capable of retrieving specific Saga
instances by their identifier as well as by their Association Values.
There are some special requirements, however. Since concurrency in Sagas is a very delicate procedure, the repository must ensure that for each conceptual Saga instance (with equal identifier) only a single instance exists in the JVM.
Axon provides two SagaRepository
implementations: the
InMemorySagaRepository
and the
JpaSagaRepository
.
As the name suggests, this repository keeps a collection of Sagas in memory. This is the simplest repository to configure and the fastest to use. However, it doesn't provide any persistence. If the JVM is shut down, any stored Saga is lost. This implementation is particularly suitable for testing and some very specialized use cases.
The JpaSagaRepository
uses JPA to store the state and Association
Values of Sagas. Saga's do no need any JPA annotations; Axon will serialize the
sagas using a SagaSerializer
(comparable to Event serialization,
you can use either a JavaSagaSerializer
or an
XStreamSagaSerializer
). Although Sagas are persisted and may be
garbage collected when not used, the Association Values are kept in memory. The
memory footprint of these values is generally quite small and should no be a
problem. These values are reconstructed after a JVM restart, without loss of
data.
In order to ensure that only a single instance exists for each conceptual Saga, the JpaSagaRepository uses a specialized cache. Unlike many other caches, the primary goal of this cache is to prevent multiple instances of a single Saga. These Sagas are Weakly Referenced. That means that once a Saga is no longer referenced, the Garbage Collector may clean them up. When the Saga is needed, a new instance is automatically created.
The JpaSagaRepository is configured with a JPA EntityManager
, a
ResourceInjector
and a SagaSerializer
. Optionally,
you can choose whether to explicitly flush the EntityManager
after
each operation. This will ensure that data is sent to the database, even before
a transaction is committed. the default is to use explicit flushes.