My Journey to Event Sourcing (Part 2)
In Part 1, I talked about my lead up to learning Event-Driven Microservices, the Event-Driven Architecture, CQRS, some common event-processing issues I encountered, and some solutions to solve them.
Now let's finally talk about Event Sourcing.
Remember when I said towards to the end of part 1 that, by chance, I saw a video about a certain Greg Young talking about Event Sourcing?
This is the video I was talking about: Greg Young — A Decade of DDD, CQRS, Event Sourcing.
In that video, he talked about how Event Sourcing isn't even a new concept while showing Sumerian clay tablets that tracks finances. Also, as you probably have guessed, he also talked about Domain Driven Design (DDD), and CQRS (Command Query Responsibility Segregation). He also talked about that Event Sourcing is NOT a top-level architecture.
In part 1, I talked about how I haven't heard about Event Sourcing before, and here's something that I've also haven't seen before: Domain Driven Design. I'll touch up on that later in great detail.
So what is Event Sourcing, really?
Every developer has used an event sourced software before through git
Greg Young (non-verbatim) from Polyglot Data • Greg Young • GOTO 2014
Event Sourcing is a concept where you deduce an entity's state based on its history instead of just mutating the state directly.
So what's so special about this? To properly appreciate the concept, let's talk about how traditional systems handle state mutations with a SQL database.
DECLARE @Id INT = 1;
INSERT Person (Id, Name, CreatedBy, CreatedOn) VALUES (@Id, 'John Doe', 'some_user', GET_DATE());
UPDATE Person SET Name = 'Something Else', LastUpdatedBy = 'some_user', LastUpdatedOn = GET_DATE() WHERE Id = @Id;
UPDATE Person SET Name = 'Some New Name', LastUpdatedBy = 'some_user', LastUpdatedOn = GET_DATE() WHERE Id = @Id;
UPDATE Person SET Name = 'A Random Name', LastUpdatedBy = 'some_user', LastUpdatedOn = GET_DATE() WHERE Id = @Id;
Can you tell me all the name changes that happened for this record?
Some might say: "Nobody needs that." Maybe, but what if you suddenly do? What if your client asks you that question?
Some might then say: "We have an audit log for ALL changes". Great! How comprehensive is that?
How would your audit log look for more complex mutations accross multiple entities? How sure are you that you're covering every single mutation?
Well, to be fair, there are temporal databases now that would solve this problem.
For reference, this is how your event store might look like for the same mutations:
{
"streamId": "person:1",
"eventType": "PersonCreated",
"payload": {
"id": 1,
"name": "John Doe",
"by": "some_user",
"on": 1721005668
},
"version": 1
},
{
"streamId": "person:1",
"eventType": "PersonNameSet",
"payload": {
"id": 1,
"newName": "Something Else",
"by": "some_user",
"on": 1721005669
},
"version": 2
},
{
"streamId": "person:1",
"eventType": "PersonNameSet",
"payload": {
"id": 1,
"newName": "Some New Name",
"by": "some_user",
"on": 1721005670
},
"version": 3
},
{
"streamId": "person:1",
"eventType": "PersonNameSet",
"payload": {
"id": 1,
"newName": "A Random Name",
"by": "some_user",
"on": 1721005671
},
"version": 4
}
Now, to get to the current state, this is something that you might do:
var events = LoadEvents("person:1");
Person person = new Person();
foreach (var @event in events) {
switch (@event.EventType) {
case "PersonCreated":
person.Id = @event.Payload.Id;
person.Name = @event.Payload.Name;
break;
case "PersonNameSet":
person.Name = @event.Payload.NewName;
break;
}
}
At it's core, that is where Event Sourcing starts and ends. Instead of mutating data, you append events to an event store grouped together by a partition key that is usually called Stream ID
in the Event Sourcing community.
Besides having a highly-auditable list of everything that happened to your system, you get the following non-comprehensive benefits to boot:
- Absolutely no data is ever lost,
- Ability to replay the history for data analytics, or for whatever reason,
- Append-only transactions lets you ingest data very efficiently,
- Easier to debug following the flow of data,
- You can answer questions even before it's asked.
Now this is when it starts to look a little more stream-lined: I started digging into Domain Driven Design, as well.
Domain Driven Design
What the difference between a "Software Developer" and a "Software Architect"?
My younger self would probably just say "Years of experience" or "A software architect knows software design patterns."
While that's true to an extent — my opinion on this matter changed quite a bit. While a typical Software Architect knows design patterns, and is probably more technically adept that most other developers in their organization, they should know the Whats, the Hows, and more importantly: the Whys.
A software architect should be more involved in the software design with full-grasp of the business domain. They should think of What they need to build, why the business requires it even before they tackle how they're going to build the software for it.
What I'm trying to say is that I'm always focused on how to do things upfront instead of properly understanding the business needs first.
Domain-Driven Design (DDD), as its name suggests, shifts the focus into the domain itself and building software around it. This is a huge topic that I probably couldn't cover in one post, so I'll refer you to the Domain-Driven Design: Tackling Complexity in the Heart of Software by Eric Evans, or the Big Blue Book as people call it.
Before we get into the weeds of DDD, let's to define a few terms first:
-
Domain. A sphere of knowledge, influence, or activity. The subject area to which the user applies a program is the domain of the software.
-
Ubiquitous Language.
Domain experts have limited understanding of the technical jargon of software development, but they use the jargon of their field — probably in various flavors. Developers, on the other hand, may understand and discuss the system in descriptive, functional terms, devoid of the meaning carried by the experts' language. Or developers may create abstractions that support their design but are not understood by the domain experts.
— the Big Blue Book (ch 2, page 24)
What it means is that classes, methods, and properties should be as close as possible to what domain experts and users use. This would make communication more streamlined making it easier for everybody to understand what's being discussed.
-
Bounded Context. A description of a boundary (typically a subsystem, or the work of a particular team) within which a particular model is defined and applicable. Outside this boundary, the terms and concepts of the model may have different meanings.
In EF language, you can probably call this a DB Context.
-
Entity. An object defined primarily by its identity, rather than its attributes. An entity has a distinct identity that runs through time and different states. Like a User, or an Account, for example.
-
Value Objects. Objects that describe some characteristic or attribute but carry no concept of identity. Examples include address, or currency. Depending on the domain, they usually only exist because they are attached on an Entity — as in
User.Address
, whereUser
is the entity, andAddress
is the Value Object. -
Aggregate Root. A cluster of associated objects that are treated as a unit for the purpose of data changes. Each aggregate has a root and a boundary. The boundary defines what is inside the aggregate. The root is a single, specific entity contained in the aggregate. All external access to the aggregate is through the root.
-
Aggregate. A collection of related objects that are treated as a single unit for data changes. An aggregate is made up of one or more entities and value objects, with one entity designated as the aggregate root. The aggregate root is responsible for ensuring the integrity of the aggregate's invariants and controlling access to its components.
-
Invariants. Business rules that must always be consistent. They define the integrity of the business process and need to be enforced within the aggregate.
Here is a, hopefully, good representation of some of the concepts mentioned above:
class UserEntity {
public required string Username { get; init; }
public required string Name { get; set; }
}
record AddressValueObject(string Address, string City, string State, string ZipCode, string Country);
class UserAggregate {
// Aggregate Root (I usually flatten this to the Aggregate)
public UserEntity Root { get; init; }
// a non-root entity
public UserEntity? Manager { get; private set; }
// an array of value objects
public AddressValueObject[] Addresses { get; private set; } = [];
// Setting the user as the Aggregate Root
public UserAggregate(UserEntity user) {
Root = user;
}
// Invariants
public void SetManager(UserEntity manager) {
Manager = manager;
}
public void AddAddress(AddressValueObject address) {
Addresses = [..Addresses, address];
}
}
What does it have to do with Event Sourcing?
While I've heard Greg Young said that you can do Event Sourcing without DDD and DDD without Event Sourcing, he also said that they both work well together - and I whole-heartedly agree.
Let's turn our previous code example into an Event Sourced model.
record UserEntity(string Username, string Name) {
public static UserEntity Empty = new UserEntity(string.Empty, string.Empty);
}
record Address(string Address, string City, string State, string ZipCode, string Country) {
public bool IsValid() =>
!string.IsNullOrWhiteSpace(Address) &&
!string.IsNullOrWhiteSpace(City) &&
!string.IsNullOrWhiteSpace(State) &&
!string.IsNullOrWhiteSpace(ZipCode) &&
!string.IsNullOrWhiteSpace(Country);
}
class UserAggregate : Aggregate {
public static GetStreamId(string username) => $"user:{username}";
public string StreamId => GetStreamId(Username);
public UserEntity User { get; private set; } = UserEntity.Empty;
public UserEntity? Manager { get; private set; }
public Address[] Addresses { get; private set; } = [];
public UserAggregate() {
RegisterHandler<UserCreated>(When);
RegisterHandler<UserManagerChanged>(When);
RegisterHandler<UserAddressAdded>(When);
}
public UserAggregate(string username, string name, string by) : this() {
if (User != UserEntity.Empty) throw new InvalidOperationException("Cannot create user");
Apply(new UserCreated(username, name, by, DateTimeOffset.ToUnixTimeMilliseconds()));
}
public void SetManager(string managerUsername, string managerName, string by) {
if (this == UserEntity.EMPTY) throw new InvalidOperationException("User not found");
if (string.IsNullOrWhitespace(username)) throw new InvalidOperationException("Invalid manager");
if (Manager != null && Manager.Username == username) return; // noop
Apply(new UserManagerChanged(Username, managerUsername, managerName, by, DateTimeOffset.ToUnixTimeMilliseconds()));
}
public void AddAddress(Address address, string by) {
if (this == UserEntity.EMPTY) throw new InvalidOperationException("User not found");
if (!address.IsValid) throw new InvalidOperationException("Address is invalid");
if (Addresses.Contains(address)) return; // noop
Apply(new UserAddressAdded(Username, address, by, DateTimeOffset.ToUnixTimeMilliseconds()));
}
private void When(UserCreated @event) {
_user = new UserEntity(@event.Username, @event.Name);
_changes.Add(@event);
}
private void When(UserManagerChanged @event) {
Manager = new UserEntity(@event.ManagerUsername, @event.ManagerName);
_changes.Add(@event);
}
private void When(UserAddressAdded @event) {
Addresses = [..Addresses, @event.Address];
_changes.Add(@event);
}
}
record EventBase();
record UserCreated(string Username, string Name, string By, long Timestamp) : EventBase;
record UserManagerChanged(string Username, string ManagerUsername, string ManagerName, string By, long Timestamp) : EventBase;
record UserAddressAdded(string Username, Address Address, string By, long Timestamp) : EventBase;
abstract class Aggregate {
public abstract string StreamId { get; }
private readonly List<EventBase> _changes { get; private set; } = new List<EventBase>();
public long Version { get; private set; } = 0;
private readonly Dictionary<string, Action<EventBase>> _eventHandlers = new();
public void RegisterEventHandler<T>(Action(T payload) handler) where T : EventBase {
var type = typeof(T);
_eventHandlers.Add(type.Name, (@event) => handler((T)@event));
}
protected virtual void Apply(EventBase @event) {
if (_eventHandlers.TryGetValue(@event.GetType().Name, out Action<EventBase> handler)) {
handler.Invoke(@event);
_changes.Add(@event);
}
}
public void Load(IEnumerable<EventWrapper<EventBase>> wrappers) {
foreach (var wrapper in wrappers) {
if (!_eventHandlers.TryGetValue(wrapper.EventType, out Action<EventBase> handler)) continue;
handler.Invoke(wrapper.Payload);
Version = wrapper.Version;
}
}
public EventBase[] Changes => _changes.ToArray();
public void ClearChanges() => _changes.Clear();
}
class EventWrapper<T> where T: EventBase {
public string Id => $"{StreamId}:{Version}";
public required string StreamId { get; set; }
public required string EventType { get; set; }
public required T Payload { get; set; }
public required string Version { get; set; }
public required long Timestamp { get; set; }
}
The changes are fairly obvious, on top of some validations, here are the things that are added:
- A couple of new records, namely
EventBase
,UserCreated
,UserManagerChanged
, andUserAddressAdded
, - the invariants now never apply mutations, but instead calls an
Apply
method, - a couple of
When
methods that mutates the state, - some
RegisterHandler
on the constructor, and, - new abstract class named
Aggregate
that ourUserAggregate
model now inherits that has a couple of methods, - an
EventWrapper<T>
with anEventBase
filter.
It might look like some unnecessary abstraction as the first example and the second one essentially works the same at the moment, but this is where the Repository
and the EventStore
comes in.
IEventStore eventStore = new InMemoryEventStore();
var repository = new AggregateRepository<UserAggregate>(eventStore);
var user = repository.Load(UserAggregateRoot.GetStreamId("some_user"));
user.SetManager("some_manager", "Some Manager", "admin_user");
repository.Save(user);
class AggregateRepository<T> where T: Aggregate, new() {
private readonly IEventStore _eventStore;
AggregateRepository(IEventStore eventStore) {
_eventStore = eventStore;
}
public T Load(string streamId) {
var aggregateRoot = new T();
aggregateRoot.Load(_eventStore.LoadStreamEvents(streamId));
return aggregateRoot;
}
void Save(T aggregateRoot) {
aggregateRoot.Version = _eventStore.AppendToStream(aggregateRoot.StreamId, aggregateRoot.Changes, aggregateRoot.Version);
aggregateRoot.ClearChanges();
}
}
interface IEventStore {
IEnumerable<EventWrapper> LoadStreamEvents(string streamId);
long AppendToStream(string streamId, EventBase[] events, long expectedVersion);
}
class InMemoryEventStore : IEventStore {
private readonly Dictionary<string, List<EventWrapper>> _eventStore = new();
public IEnumerable<EventWrapper> LoadStreamEvents(string streamId) {
if (_eventStore.TryGetValue(streamId, out List<EventWrapper> wrappers)) {
return wrappers;
}
return Enumerable.Empty<EventWrapper<EventBase>>();
}
public long AppendToStream(string streamId, EventBase[] eventBase, long expectedVersion) {
var newVersion = expectedVersion;
if (!_eventStore.TryGetValue(streamId, out List<EventWrapper> wrappers)) {
wrappers = new List<EventWrapper>();
_eventStore[streamId] = wrappers;
}
wrappers.AddRange(eventBase.Select(e => new EventWrapper {
StreamId = streamId,
EventType = e.GetType().Name,
Payload = e,
Version = ++newVersion,
Timestamp = DateTimeOffset.ToUnixTimeMilliseconds()
}));
return newVersion;
}
}
Whew! That's quite a bit of code there! But, this is already event sourcing in single bounded context in full!
In essence, everything that the Repository
is doing is acting like an intermediary between the Aggregate
and the EventStore
— rehydrating its state from a list of events, and passing uncommited changes to the EventStore
and clearing it up if it succeeds. The EventStore
is responsible for data persistence operations and retruning the last version of an Aggregate
's stream.
The Aggregate
abstract class is just some helper class to help the Repository
track the changes and keep your actual UserAggregate
as tech-agnostic as much as possible.
Albeit all happening in memory, you can just create your own implementation of the IEventStore
but would probably have some degree of trouble with JSON deserialization. I will be covering that on the next post in this series.
In part 3, I will be adding CQRS, projections and projectors, and some general cross-domain event processing to the mix. I will also be talking about how all these information led to me creating and open-sourcing Uneventful, my Event Sourcing library for dotnet.
- Previous: My Journey to Event Sourcing (Part 1)