7. Managing complex business transactions

Not every command is able to completely execute in a single ACID transaction. A very common example that pops up quite often as an argument for transactions is the money transfer. It is often believed that an atomic and consistent transaction is absolutely required to transfer money from one account to another. Well, it's not. On the contrary, it is quite impossible to do. What if money is transferred from an account on Bank A, to another account on Bank B? Does Bank A acquire a lock in Bank B's database? If the transfer is in progress, is it strange that Bank A has deducted the amount, but Bank B hasn't deposited it yet? Not really, it's "underway". On the other hand, if something goes wrong while depositing the money on Bank B's account, Bank A's customer would want his money back. So we do expect some form of consistency, ultimately.

While ACID transactions are not necessary or even impossible in some cases, some form of transaction management is still required. Typically, these transactions are referred to as BASE transactions: Basic Availability, Soft state, Eventual consistency. Contrary to ACID, BASE transactions cannot be easily rolled back. To roll back, compensating actions need to be taken to revert anything that has occurred as part of the transaction. In the money transfer example, a failure at Bank B to deposit the money, will refund the money in Bank A.

In CQRS, Sagas are responsible for managing these BASE transactions. They respond on Events produced by Commands and may produce new commands, invoke external applications, etc. In the context of Domain Driven Design, it is not uncommon for Sagas to be used as coordination mechanism between several bounded contexts.

7.1. Saga

A Saga is a special type of Event Listener: one that manages a business transaction. Some transactions could be running for days or even weeks, while others are completed within a few milliseconds. In Axon, each instance of a Saga is responsible for managing a single business transaction. That means a Saga maintains state necessary to manage that transaction, continuing it or taking compensating actions to roll back any actions already taken. Typically, and contrary to regular Event Listeners, a Saga has a starting point and an end, both triggered by Events. While the starting point of a Saga is usually very clear, while there could be many ways for a Saga to end.

In Axon, all Sagas must implement the Saga interface. As with Aggregates, there is a Saga implementation that allows you to annotate event handling methods with @SagaEventHandler: the AbstractAnnotatedSaga.

7.1.1. Life Cycle

As a single Saga instance is responsible for managing a single transaction. That means you need to be able to indicate the start and end of a Saga's Life Cycle.

The AbstractAnnotatedSaga allows you to annotate Event Handlers with an annotation (@SagaEventHandler). If a specific Event signifies the start of a transaction, add another annotation to that same method: @StartSaga. This annotation will create a new saga and invoke its event handler method when a matching Event is published.

By default, a new Saga is only started if no suitable existing Saga (of the same type) can be found. You can also force the creation of a new Saga instance by setting the forceNew property on the @StartSaga annotation to true.

Ending a Saga can be done in two ways. If a certain Event always indicates the end of a Saga's life cycle, annotate that Event's handler on the Saga with @EndSaga. The Saga's Life Cycle will be ended after the invocation of the handler. Alternatively, you can call end() from inside the Saga to end the life cycle. This allows you to conditionally end the Saga.

[Note]Note

If you don't use annotation support, you need to properly configure your Saga Manager (see Section 7.2.1, “SagaManager below). To end a Saga's life cycle, make sure the isActive() method of the Saga returns false.

7.1.2. Event Handling

Event Handling in a Saga is quite comparable to that of a regular Event Listener. There is one major difference, though. While there is a single instance of an Event Listener that deals will all incoming events, multiple instances of a Saga may exist, each interested in different Events. For example, a Saga that manages a transaction around an Order with Id "1" will not be interested in Events regarding Order "2", and vice versa.

Using association values

Instead of publishing all Events to all Saga instances (which would be a complete waste of resources), Axon will only publish Events containing properties that the Saga has been associated with. This is done using AssociationValues. An AssociationValue consists of a key and a value. The key represents the type of identifier used, for example "orderId" or "order". The value represents the corresponding value, "1" or "2" in the previous example.

The @SagaEventHandler annotation has two attributes, of which associationProperty is the most important one. This is the name of the property on the incoming Event that should be used to find associated Sagas. The key of the association value is the name of the property. The value is the value returned by property's getter method.

For example, consider an incoming Event with a method "String getOderId()", which returns "123". If a method accepting this Event is annotated with @SagaEventHandler(associationProperty="orderId"), this Event is routed to all Sagas that have been associated with an AssociationValue with key "orderId" and value "123". This may either be exactly one, more than one, or even none at all.

Associating Sagas with Domain Concepts

When a Saga manages a transaction around one or more domain concepts, such as Order, Shipment, Invoice, etc, that Saga needs to be associated with instances of those concepts. An association requires two parameters: the key, which identifies the type of association (Order, Shipment, etc) and a value, which represents the identifier of that concept.

Associating a Saga with a concept is done in several ways. First of all, when a Saga is newly created when invoking a @StartSaga annotated Event Handler, it is automatically associated with the property identified in the @SagaEventHandler method. Any other association can be created using the associateWith(String key, Object value) method. Use the removeAssociationWith(String key, Object value) method to remove a specific association.

Imagine a Saga that has been created for a transaction around an Order. The Saga is automatically associated with the Order, as the method is annotated with @StartSaga. The Saga is responsible for creating an Invoice for that Order, and tell Shipping to create a Shipment for it. Once both the Shipment have arrived and the Invoice has been paid, the transaction is completed and the Saga is closed.

Here is the code for such a Saga:

public class OrderManagementSaga extends AbstractAnnotatedSaga {

    private boolean paid = false;
    private boolean delivered = false;
    private transient CommandBus commandBus;

    @StartSaga
    @SagaEventHandler(associationProperty = "orderId")
    public void handle(OrderCreatedEvent event) {
        // client generated identifiers (1)
        ShippingId shipmentId = createShipmentId();
        InvoiceId invoiceId = createInvoiceId();
        // associate the Saga with these values, before sending the commands (2)
        associateWith("shipmentId", shipmentId);
        associateWith("invoiceId", invoiceId);
        // send the commands
        commandBus.dispatch(new PrepareShippingCommand(...));
        commandBus.dispatch(new CreateInvoiceCommand(...));
    }

    @SagaEventHandler(associationProperty = "shipmentId")
    public void handle(ShippingArrivedEvent event) {
        delivered = true;
        if (paid) {
            end(); (3)
        }
    }

    @SagaEventHandler(associationProperty = "invoiceId")
    public void handle(InvoicePaidEvent event) {
        paid = true;
        if (delivered) {
            end(); (4)
        }
    }

    // ...

}
1

By allowing clients to generate an identifier, a Saga can be easily associated with a concept, without the need to a request-response type command.

2

We associate the event with these concepts before publishing the command. This way, we are guaranteed to also catch events generated as part of this command.

34

This will end this saga once the invoice is paid and the shipment has arrived.

Of course, this Saga implementation is far from complete. What should happen if the invoice is not paid in time. What if the shipment cannot be delivered? The Saga should be able to cope with those scenarios as well.

AssociationValue considerations

As said before, an AssociationValue consists of a key, which is a String, and a value, which may be of any type. However, a few simple rules should be conformed to. The value of an AssociationValue should always be a Value Object (as defined in Domain Driven Design). In terms of implementation, that means they should be immutable and have a proper equals and hashCode method. Preferably, they should implement the Comparable interface, too. If Sagas need to be persisted, AssociationValues need to be serializable. In that case, only use Serializable values. Generally, it shouldn't be a problem to make Value Objects Serializable.

7.1.3. Keeping track of Deadlines

It is easy to make a Saga take action when something happens. After all, there is an Event to notify the Saga. But what if you want your Saga to do something when nothing happens? That's what deadlines are used for. In invoices, that's typically several weeks, while the confirmation of a credit card payment should occur within a few seconds.

In Axon, you can use an EventScheduler to schedule an Event for publication. In the example of an Invoice, you'd expect that invoice to be paid within 30 days. A Saga would, after sending the CreateInvoiceCommand, schedule an InvoicePaymentDeadlineExpiredEvent to be published in 30 days. The EventScheduler returns a ScheduleToken after scheduling an Event. This token can be used to cancel the schedule, for example when a payment of an Invoice has been received.

Scheduled Events must extend ApplicationEvent or ScheduledEvent. The latter is aware of its own publication time. Axon provides two EventScheduler implementations: a pure Java one and one using Quartz as a backing scheduling mechanism.

SimpleEventScheduler

This pure-Java implementation of the EventScheduler uses a ScheduledExecutorService to schedule Event publication. Although the timing of this scheduler is very reliable, it is a pure in-memory implementation. Once the JVM is shut down, all schedules are lost. This makes this implementation unsuitable for long-term schedules.

The SimpleEventScheduler needs to be configured with an EventBus and a SchedulingExecutorService (see the static methods on the java.util.concurrent.Executors class for helper methods).

QuartzEventScheduler

The QuartzEventScheduler is a more reliable and enterprise-worthy implementation. Using Quartz as underlying scheduling mechanism, it provides more powerful features, such as persistence, clustering and misfire management. This means Event publication is guaranteed. It might be a little late, but it will be published.

It needs to be configured with a Quartz Scheduler and an EventBus. Optionally, you may set the name of the group that Quartz jobs are scheduled in, which defaults to "AxonFramework-Events".

Scheduled Events and Transactions

One or more components will be listening for scheduled Events. These components might rely on a Transaction being bound to the Thread that invokes them. Scheduled Events are published by Threads managed by the EventScheduler. To manage threads, you can configure a EventTriggerCallback to listen for publication of scheduled Events and manage transactions around them.

[Note]Note

Spring users can use the QuartzEventSchedulerFactoryBean or SimpleEventSchedulerFactoryBean for easier configuration. It allows you to set the PlatformTransactionManager directly.

7.1.4. Injecting Resources

Sagas generally do more than just maintaining state based on Events. They interact with external components. To do so, they need access to the Resources necessary to address to components. Usually, these resources aren't really part of the Saga's state and shouldn't be persisted as such. But once a Saga is reconstructed, these resources must be injected before an Event is routed to that instance.

For that purpose, there is the ResourceInjector. It is use by the SagaRepository to inject resources into a Saga. Axon provides a SpringResourceInjector, which injects annotated fields and methods with Resources from the Application Context.

[Tip]Mark fields holding injected resources transient

Since resources should not be persisted with the Saga, make sure to add the transient keyword to those fields. This will prevent the serialization mechanism to attempt to write the contents of these fields to the repository. The repository will automatically re-inject the required resources after a Saga has been deserialized.

7.2. Saga Infrastructure

Events need to be redirected to the appropriate Saga instances. To do so, some infrastructure classes are required. The most important components are the SagaManager and the SagaRepository.

7.2.1. SagaManager

The SagaManager is responsible for redirecting Events to the appropriate Saga instances and managing their life cycle. There are two SagaManager implementations in Axon Framework: the AnnotatedSagaManager, which provides the annotation support and the SimpleSagaManager, which is less powerful, but doesn't force you into using annotations.

Sagas operate in a highly concurrent environment. Multiple Events may reach a Saga at (nearly) the same time. This means that Sagas need to be thread safe. By default, Axon's SagaManager implementations will synchronize access to a Saga instance. This means that only one thread can access a Saga at a time, and all changes by one thread are guaranteed to be visible to any successive threads (a.k.a happens-before order in the Java Memory Model). Optionally, you may switch this locking off, if you are sure that your Saga is completely thread safe on its own. Just setSynchronizeSagaAccess(false). When disabling synchronization, do take note of the fact that this will allow a Saga to be invoked while it is in the process of being stored by a repository. The result may be that a Saga is stored in an inconsistent state first, and overwritten by it's actual state later.

SimpleSagaManager

This is by far the least powerful of the two implementations, but it doesn't require the use of annotations. The SimpleSagaManager needs to be configured with a number of resources. Its constructor requires the type of Saga it manages, the SagaRepository, an AssociationValueResolver, a SagaFactory and the EventBus. The AssociationValueResolver is a component that returns a Set of AssociationValue for a given Event.

Then, you should also configure the types of Events the SagaManager should create new instances for. This is done through the setEventsToAlwaysCreateNewSagasFor and setEventsToOptionallyCreateNewSagasFor methods. They both accept a List of Saga classes.

AnnotatedSagaManager

This SagaManager implementation uses annotations on the Sagas themselves to manage the routing and life cycle of that Saga. As a result, this manager allows all information about the life cycle of a Saga to be available inside the Saga class itself. It can also manage any number of saga types. That means only a single AnnotatedSagaManager is required, even if you have multiple types of Saga.

The AnnotatedSagaManager is constructed using a SagaRepository, a SagaFactory (optional) and a vararg array of Saga classes. If no SagaFactory is provided, a GenericSagaFactory is used. It assumes that all Saga classes have a public no-arg constructor.

If you use Spring, you can use the axon namespace to configure an AnnotatedSagaManager. The supported Saga types are provided as a comma separated list. This will also automatically configure a SpringResourceInjector, which injects any annotated fields with resources from the Spring Application Context.

<axon:saga-manager id="sagaManager" saga-factory="optionalSagaFactory"
                   saga-repository="sagaRepository" event-bus="eventBus">
	<axon:types>
		fully.qualified.ClassName,
    another.fq.ClassName
	</axon:types>
</axon:saga-manager>

Asynchronous Event Handling for Sagas

As with Event Listeners, it is also possible to asynchronously handle events for sagas. To handle events asynchronously, the SagaManager needs to be configured with an Executor implementation. The Executor supplies the threads needed to process the events asynchronously. Often, you'll want to use a thread pool. You may, if you want, share this thread pool with other asynchronous activities.

When an executor is provided, the SagaManager will automatically use it to find associated Saga instances and dispatch the events each of these instances. The SagaManager will guarantee that for each Saga instance, all events are processed in the order they arrive. For optimization purposes, this guarantee does not count in between Sagas.

Because Transactions are often Thread bound, you may need to configure a Transaction Manager with the SagaManager. This transaction manager is invoked before and after each invocation to the Saga Repository and before and after each batch of Events has been processed by the Saga itself. The Transaction Manager has the opportunity to configure the batch size each time a transaction starts. The batch size describes the number of Events that a Saga may process before the transaction is committed. This mechanism can be compared to the mechanism for Event Listeners, desribed in Section 6.2.3, “Managing transactions in asynchronous event handling”.

In a Spring application context, a Saga Manager can be marked as asynchronous by adding the executor and optionally the transaction-manager attributes to the saga-manager element, as shown below.

<axon:saga-manager id="sagaManager" saga-factory="optionalSagaFactory"
                   saga-repository="sagaRepository" event-bus="eventBus"
                   executor="myThreadPool" transaction-manager="txManager">
	<axon:types>
		fully.qualified.ClassName,
        another.fq.ClassName
	</axon:types>
	</axon:saga-manager>

The transaction-manager should point to a PlatformTransactionManager, Spring's interface for transaction managers. Generally you can use the same transaction manager as the other components in your application (e.g. JpaTransactionManager).

7.2.2. SagaRepository

The SagaRepository is responsible for storing and retrieving Sagas, for use by the SagaManager. It is capable of retrieving specific Saga instances by their identifier as well as by their Association Values.

There are some special requirements, however. Since concurrency in Sagas is a very delicate procedure, the repository must ensure that for each conceptual Saga instance (with equal identifier) only a single instance exists in the JVM.

Axon provides two SagaRepository implementations: the InMemorySagaRepository and the JpaSagaRepository.

InMemorySagaRepository

As the name suggests, this repository keeps a collection of Sagas in memory. This is the simplest repository to configure and the fastest to use. However, it doesn't provide any persistence. If the JVM is shut down, any stored Saga is lost. This implementation is particularly suitable for testing and some very specialized use cases.

JpaSagaRepository

The JpaSagaRepository uses JPA to store the state and Association Values of Sagas. Saga's do no need any JPA annotations; Axon will serialize the sagas using a SagaSerializer (comparable to Event serialization, you can use either a JavaSagaSerializer or an XStreamSagaSerializer). Although Sagas are persisted and may be garbage collected when not used, the Association Values are kept in memory. The memory footprint of these values is generally quite small and should no be a problem. These values are reconstructed after a JVM restart, without loss of data.

In order to ensure that only a single instance exists for each conceptual Saga, the JpaSagaRepository uses a specialized cache. Unlike many other caches, the primary goal of this cache is to prevent multiple instances of a single Saga. These Sagas are Weakly Referenced. That means that once a Saga is no longer referenced, the Garbage Collector may clean them up. When the Saga is needed, a new instance is automatically created.

The JpaSagaRepository is configured with a JPA EntityManager, a ResourceInjector and a SagaSerializer. Optionally, you can choose whether to explicitly flush the EntityManager after each operation. This will ensure that data is sent to the database, even before a transaction is committed. the default is to use explicit flushes.