Segregate Responsibilities with Elixir Commanded

Table of contents
Explore how Elixir's Commanded library revolutionizes application development through CQRS and Event Sourcing, offering powerful capabilities for audit, time travel, and seamless event-driven architecture.
Responsibility segregation in Elixir programming language
CQRS
Command Query Responsibility Segregation (CQRS) at a conceptual level, emphasizes that actions leading to state changes are designated as commands, while data retrieval actions are termed queries. Due to the distinct operational demands of executing commands and queries, developers are encouraged to employ diverse persistence strategies for handling each, thus segregating their responsibilities.
Event Sourcing pattern
Event Sourcing is a pattern for storing data as events in an append-only log called event store. While this definition may seem straightforward, it overlooks a crucial aspect: by storing events, the context surrounding each event is retained. For instance, from a single piece of information, you can discern not only that an email was sent but also the reason behind it. In contrast, in alternative storage patterns, the context of business logic operations is often lost or stored separately. The current state of an entity can be reconstructed by sequentially replaying all events in the order they occurred. The information within the system is derived directly from these events.
The proposal of the CQRS pattern coincided with the introduction of Event Sourcing to the public intentionally. Unlike state-based persistence, where circumventing the use of the domain model for queries may be feasible, such an approach becomes challenging, if not impossible, in event-sourced systems. This is due to the absence of a singular location where the complete state of a domain object is stored.
Pros of event sourced system
What are the benefits of those? Apart from others:
Eventual consistency
Eventual consistency entails the concept that reads or queries may not be instantly synchronized with writes. This phenomenon can also arise in systems that do not utilize event sourcing, often due to database replication delays. In essence, the data in your read storage eventually catches up with the latest writes, which could take milliseconds, seconds, minutes, etc. Given this, event sourcing systems frequently encounter eventual consistency challenges, necessitating readiness to address the trade-offs between eventual and strong consistency.

In the context of Elixir programming, the Commanded package provides a powerful framework for implementing CQRS and Event Sourcing patterns. Let's explore how Commanded facilitates the implementation of these patterns in Elixir applications.
Commanded in practice
Let's explore the application code and check how to implement event sourcing in our own elixir applications
As always when adding a new package we need to adjust our mix.exs file:
def deps
do
[
{:commanded
,
"~> 1.4"
},
{:jason
,
"~> 1.3"
}
# Optional
]end
Event store
Next, we must decide what store to use to persist our events. We have two primary options to choose from:
Both options are great and have their pros and cons but to make things simpler I’ll suggest using PostgreSQL adapter in the beginning.
We will add another dependency for that:
def deps
do
[
{:eventstore
,
"~> 1.4"
}
]end
as well as MyApp.EventStore module which we will include in our supervision tree:
# event_store.ex
defmodule MyApp.EventStore
do
use
EventStore,
otp_app:
:my_app
def init
(config)
do
{:ok
, config}
end
end
# application.ex
defmodule MyApp.Application
do
use
Application
def start
(_type, _args)
do
children = [
MyApp.EventStore
]
opts = [strategy:
:one_for_one
,
name:
MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
Finally, a bunch of configurations to set up our store:
config
:my_app
, MyApp.EventStore,
serializer:
EventStore.JsonSerializer,
username:
"postgres"
,
password:
"postgres"
,
database:
"eventstore"
,
hostname:
"localhost"
config :my_app
,
event_stores:
[MyApp.EventStore]
And now we are good to go and run mix tasks for our event store initialization:
mix
do
event_store.create
,
event_store.init
By Your Command! - Commanded library quickstart

Commands & Events
In the simplest words, commands represent intentions to perform actions, while events represent the outcomes of those actions.
A command should include a field that uniquely identifies the aggregate instance, such as "competition_id." Because commands are standard Elixir structs, you can improve the process of defining structs, enforcing mandatory keys, or performing validations by employing libraries like typed_struct or domo. This approach reduces the need for excessive boilerplate code.
defmodule MyApp.Commands.CreateCompetition
do
@moduledoc
"""
Create a new competition command
"""
use
Domo
@type
t() :: %__MODULE_
_
{
competition_id:
nil
| binary(),
name:
binary(),
url:
url(),
user_id:
binary()
}
defstruct [
:competition_id
,
:name
,
:url
,
:user_id
]end
Domain events signify significant occurrences within the scope of an aggregate. They are conventionally named in the past tense, such as "competition created".
For each domain event, create a separate module and define its fields using defstruct. Ensure that each event also includes a field to uniquely identify the aggregate instance, such as “competition_id”
Additionally, remember to implement the Jason.Encoder protocol for the event struct to enable JSON serialization, as illustrated below.
defmodule MyApp.Events.CompetitionCreated
do
@moduledoc
"""
Competition created event
"""
@derive
[Jason.Encoder]
@type
t() :: %__MODULE_
_
{
competition_id:
binary(),
name:
binary(),
url:
binary(),
user_id:
binary()
}
defstruct [
:competition_id
,
:name
,
:url
,
:user_id
]end
Aggregates
Aggregates play a crucial role in DDD by defining consistency boundaries within a domain model and encapsulating domain logic. A single aggregate is a cluster of domain objects that are treated as a single unit for the purpose of data changes. In Commanded, aggregate is comprised of its state, public command functions, command handlers functions, and state mutators. As we store events, we have access to a detailed event log of past events handled in our application.
defmodule MyApp.Aggregates.Competition
do
@moduledoc
"""
Competition write model aggregate
"""
use
Domo
@type
t() :: %__MODULE_
_
{
id:
binary(),
name:
binary(),
url:
binary()
}
defstruct [
:id
,
:name
,
:url
]
alias
MyApp.Aggregates.Competition
alias
MyApp.Commands.CreateCompetition
alias
MyApp.Events.CompetitionCreated
# Public API
def create_competition
(%Competition{
id:
nil
}, uid, name, url, user_id)
do
event = %CompetitionCreated{
competition_id:
uid,
name:
name,
url:
url,
user_id:
user_id
}
{:ok
, event}
end
# Command handler
def execute
(%Competition{
id:
nil
}, %CreateCompetition{} = command)
do
%CompetitionCreated{
competition_id:
command.id,
name:
command.name,
url:
command.url,
user_id:
command.user_id
}
end
# State mutator
def apply
(%Competition{} = competition, %CompetitionCreated{} = event)
do
%Competition{
competition
| competition_id:
event.id,
name:
event.name,
url:
event.url
}
end
end
Dispatchers & Handlers
At the opposite of handling command execution directly inside aggregate, we can use a separate module. A handler accepts both the aggregate and the command undergoing execution. This handler provides the opportunity to validate, authorize, and/or augment the command with supplementary data before executing the relevant function within the aggregate module.
defmodule MyApp.Handlers.CreateCompetitionHandler
do
@behaviour
Commanded.Commands.Handler
alias
MyApp.Commands.CreateCompetition
alias
MyApp.Aggregates.CompetitionAggregate
def handle
(%CompetitionAggregate{} = aggregate, %CreateCompetition{} = command)
do
%CreateCompetition{id:
uid,
name:
name,
url:
url,
user_id:
user_id} = command
CompetitionAggregate.create_competition(aggregate, uid, name, url, user_id)
end
end
A Router module serves the purpose of dispatching commands to their designated command handler and/or aggregate module.
To create a router module, utilize Commanded.Commands.Router, and proceed to register each command along with its corresponding handler.
defmodule MyApp.Router
do
use
Commanded.Commands.Router
alias
MyApp.Commands.CreateCompetition
alias
MyApp.Aggregates.CompetitionAggregate
dispatch CreateCompetition, to:
CompetitionAggregate,
identity:
:competition_id
end
Projectors & Projections
The concept involves software components known as Projections and Projectors, which subscribe to the real-time event stream from the events database. Upon receiving an event, the Projection can then translate the data contained within that event into a view model within a designated reporting database.
You have the flexibility to opt for an SQL or NoSQL database, a document store, a filesystem, a full-text search index, or any other storage mechanism. Moreover, you can employ multiple storage providers, tailored to optimize the querying requirements they need to fulfill.
The simplest solution will be using the Commanded Ecto Projections library.
This part will be our read model and if we don't change the default behaviour by ourselves it will be updated asynchronously so after the write model finishes saving events to the event store we can't be sure read models are also updated.
defmodule MyApp.Projectors.Competition
do
use
Commanded.Projections.Ecto,
application:
MyApp.Application,
repo:
MyApp.Projections.Repo,
name:
"MyApp.Projectors.Competition"
alias
MyApp.Events.CompetitionCreated
alias
MyApp.Projections.Competition
project %CompetitionCreated{} = event, _metadata, fn
multi ->
%CompetitionCreated{name:
name,
url:
url,
user_id:
user_id} = event
projection = %Competition{name:
name,
url:
url,
user_id:
user_id}
Ecto.Multi.insert(multi, :competition
, projection)
end
end
Related posts
Dive deeper into this topic with these related posts