A pragmatic design for an Axon system

By 14th September 2018Coding, Software Architecture

The Axon Framework is a wonderful tool for implementing CQRS and event sourcing on the JVM. When I first started using it, I did so in all kinds of harmful ways. In this post I will set out an approach to implementing a system using Axon that maintains a healthy separation between the different layers and components and supports code maintainability. When used correctly, Axon allows one to create a loosely coupled system that is easy to test and easy to evolve.

Getting started

The quickest way to get going is to use Axon’s Spring Boot AutoConfiguration. Simply include the axon-spring-boot-starter dependency and you’re ready to go. You can now start by decorating your aggregates with @Aggregate and @CommandHandler annotations and it will respond to commands issued through a CommandGateway (provided that JPA is on the classpath, otherwise you also have to configure an EventStorageEngine). If you prefer not to rely on Spring auto-configuration, you can use the Configuration API. I do not recommend creating a configuration using the @EnableAxon annotation.

Application layer

There is nothing special here. This is simply where your users or machine clients interact with your system. The only rule is that the application layer doesn’t interact with the domain layer directly.

Controllers

Controllers are the components that co-ordinate the fulfilment of user requests. They are the bridge between the application and the domain.

The first responsibility of a controller should be to decide whether and how to interact with the domain through a CommandGateway or a QueryGateway. As such the controller is a “well-behaved client” of the domain and can take care of concerns that may not belong to the domain.

A controller can also co-ordinate multiple interactions with the domain layer in order to express the user’s intent. Beware, this may be a symptom of poor domain design and a hint that there is a domain element missing.

The simplest controller simply sends a command through the CommandGateway:

@Component
class ContactController(private val commandGateway: CommandGateway) {
    fun addContact(contactDTO: ContactDTO) {
        commandGateway.sendAndWait<AddContactCommand>(AddContactCommand(contactDTO.getBasicInfo()))
    }
}

Domain layer

Aggregates

Aggregates are the central objects in your system and are normally annotated with @Aggregate. Axon uses the term Aggregate as defined by Evans in Domain Driven Design:

“A cluster of associated objects that are treated as a unit for the purpose of data changes. External references are restricted to one member of the Aggregate, designated as the root. A set of consistency rules applies within the Aggregate’s boundaries.”

In Axon:

“An aggregate is an isolated tree of entities that is capable of handling commands.”

Aggregates in Axon may be implemented using either JPA or Event Sourcing, this is a matter of preference. A JPA aggregate must have an @Id field; an event sourcing aggregate must have an @AggregateIdentifier field. These are used to load aggregates from the repository. In either case, aggregates should only be used for processing commands and should not be queried directly.

Command handlers should preferably be defined directly in the aggregate to which it applies. Axon uses the field on the command object marked with @TargetAggregateIdentifier to load the correct aggregate from the repository and then executes the command handler. Command handlers may delegate to other services. In an event sourcing setup it should never perform state transitions  –  all state transitions must happen in the @EventSourcingHandler annotated methods because commands are not replayed, only events are.

It is often useful to let the command handler return the result of the command back to the application layer. However, this should be done with caution. Never use commands to query the aggregate state. Note: Commands handled by a constructor will always return the @AggregateIdentifier.

In an event sourcing system, the state of an aggregate is the sum of all the events the aggregate saw. Event sourcing handlers are used for loading the current state of the aggregate by replaying all the events since the last snapshot and reapplying the handlers to perform state transitions. These handlers should only apply state transitions because any code in the handler will be executed every time the aggregate is loaded.

The aggregate may now look something like:

@Aggregate
class Contact() {

    @AggregateIdentifier
    private lateinit var contactId: String
    private lateinit var basicInfo: BasicInfo
    private val platformContacts = mutableListOf<PlatformContact>()

    @CommandHandler
    constructor(command: AddContactCommand) : this() {
        apply(ContactAddedEvent(command.contactId, command.basicInfo))
        command.platformContacts
            .forEach { apply(PlatformContactAddedEvent(it)) }
    }

    @EventSourcingHandler
    fun handle(contactAddedEvent: ContactAddedEvent) {
        this.contactId = contactAddedEvent.contactId
        this.basicInfo = contactAddedEvent.basicInfo
    }

    @EventSourcingHandler
    fun handle(platformContactAddedEvent: PlatformContactAddedEvent) {
        this.platformContacts.add(platformContactAddedEvent.platformContact)
    }
}

Note that an Aggregate must (1) have a default constructor and (2) must set the @AggregateIdentifier on the very first event published.

To use collaborating services during command handling, simply add the service as a parameter of the command handler. Axon’s SpringResourceInjector will provide the instance. For example:

@Aggregate
class Interaction() {

    @AggregateIdentifier
    private lateinit var interactionId: InteractionId

    @CommandHandler
    constructor(
        addInteractionCommand: AddInteractionCommand, 
        interactionScoreCalculator: InteractionScoreCalculator) : this() {

        applyEvent(InteractionAddedEvent(
                InteractionId(),
                addInteractionCommand.contactId,
                addInteractionCommand.platformContact,
                interactionScoreCalculator.calculateScore(
                    addInteractionCommand.platformContact)
        ))
    }

    @EventSourcingHandler
    fun handle(interactionAddedEvent: InteractionAddedEvent) {
        this.interactionId = interactionAddedEvent.interactionId
    }
}

Sagas

In Axon, sagas are first-class citizens. A saga is normally described as “a long-running business process” that co-ordinates the interactions between components. A saga is a choreographer that listens for events and directs components. If an aggregate listens for commands and emits events, then a saga listens for events and emits commands.

Sagas should be kept as simple as possible. Introducing sagas allows you to separate logic from process. Aggregates contain business logic while sagas ensure that aggregates collaborate in a well-defined business process. For a concise description of sagas, I suggest this post by Jonathan Oliver.

Good sagas simply listen for events and emit commands in response to them. Because sagas are stateful they can maintain a state machine to determine the next command to issue. Not following this rule and including logic in a saga leads to issues that are hard to diagnose when sagas are restarted, for example after a JVM restart.

@Saga
class RelationshipSaga {

    @Transient
    @Autowired
    private lateinit var commandGateway: CommandGateway

    private var relationshipInteractionScore = InteractionScore(0)

    @StartSaga
    @SagaEventHandler(associationProperty = "contactId")
    fun handle(interactionAddedEvent: InteractionAddedEvent) {
        if (this.relationshipInteractionScore == InteractionScore(0)) {
            this.commandGateway.send<UpdateRelationshipLevelCommand>(
                    UpdateRelationshipLevelCommand(interactionAddedEvent.contactId, RelationshipLevel.ACQUAINTANCE))
        }

        this.relationshipInteractionScore += interactionAddedEvent.interactionScore

        when {
            this.relationshipInteractionScore >= InteractionScore(10) ->
                this.commandGateway.send<UpdateRelationshipLevelCommand>(
                        UpdateRelationshipLevelCommand(interactionAddedEvent.contactId, RelationshipLevel.ASSOCIATE))
        }
    }
}

In the above example, note the stateful relationshipInteractionScore, and the @Transient commandGateway. The gateway should not be serialized with the rest of the saga and is therefore marked as transient.

The question often arises whether to implement some process, perhaps a simple and short one, in the aggregate or to implement it in a saga. On the one hand any process can be implemented in a saga, while a handy heuristic in turn is to ask whether the user is waiting for the process to complete. In this synchronous case, it is better to implement the process in the aggregate and to return a response to the user. The alternative is to let the user poll the query side until the result is available which is clearly not desirable. If this process is hidden from the user, then a saga can be used instead and a response can immediately be returned.

A final word of warning: Beware of events employed as passive-aggressive commands. Don’t perform logic in response to events and only use sagas to co-ordinate business processes by issuing new commands in response to events.

Services, repositories, ports and adapters

All the other mainstays of a system designed following DDD principles are still in play and are usually used by aggregates to execute commands. Isolating logic in a service make aggregates much easier to test. Just remember that aggregates should never use repositories to access the query model in order to execute commands. Because of the eventually consistent nature of an event sourcing system, doing so will get you in all sorts of trouble. Instead the controller can use a QueryGateway to retrieve the necessary data before issuing a command. A repository of reference data, on the other hand, is fine. Remember to record the relevant data in the event store because you won’t be able to go back in time to that exact state of the repository. The same applies to any data from the ports and adapters.

CQRS

CQRS is not the same as event sourcing, but Axon strongly encourages it. Creating view projections using @EventHandlers is very intuitive and allows one to create a read model of your data that exactly matches the reading patterns. Axon’s approach also allows scaling out the microservices that serve read requests independently from the ones that serve the commands by populating the read model by listening to events.

The final word

Axon is a powerful tool for implementing event sourcing and CQRS with aggregates, sagas and event handlers. Just remember to look out for all the many pitfalls.

A working example of the code snippets included in this post can be seen on GitHub.

by Niel de Wet

RMB FOUNDeRY

Author RMB FOUNDeRY

More posts by RMB FOUNDeRY