arvil's blog

My Journey to Event Sourcing (Part 2)

In Part 1, I talked about my lead up to learning Event-Driven Microservices, the Event-Driven Architecture, CQRS, some common event-processing issues I encountered, and some solutions to solve them.

Now let's finally talk about Event Sourcing.

Remember when I said towards to the end of part 1 that, by chance, I saw a video about a certain Greg Young talking about Event Sourcing?

This is the video I was talking about: Greg Young — A Decade of DDD, CQRS, Event Sourcing.

In that video, he talked about how Event Sourcing isn't even a new concept while showing Sumerian clay tablets that tracks finances. Also, as you probably have guessed, he also talked about Domain Driven Design (DDD), and CQRS (Command Query Responsibility Segregation). He also talked about that Event Sourcing is NOT a top-level architecture.

In part 1, I talked about how I haven't heard about Event Sourcing before, and here's something that I've also haven't seen before: Domain Driven Design. I'll touch up on that later in great detail.

So what is Event Sourcing, really?

Every developer has used an event sourced software before through git

Greg Young (non-verbatim) from Polyglot Data • Greg Young • GOTO 2014

Event Sourcing is a concept where you deduce an entity's state based on its history instead of just mutating the state directly.

So what's so special about this? To properly appreciate the concept, let's talk about how traditional systems handle state mutations with a SQL database.

DECLARE @Id INT = 1;
INSERT Person (Id, Name, CreatedBy, CreatedOn) VALUES (@Id, 'John Doe', 'some_user', GET_DATE());
UPDATE Person SET Name = 'Something Else', LastUpdatedBy = 'some_user', LastUpdatedOn = GET_DATE() WHERE Id = @Id;
UPDATE Person SET Name = 'Some New Name', LastUpdatedBy = 'some_user', LastUpdatedOn = GET_DATE() WHERE Id = @Id;
UPDATE Person SET Name = 'A Random Name', LastUpdatedBy = 'some_user', LastUpdatedOn = GET_DATE() WHERE Id = @Id;

Can you tell me all the name changes that happened for this record?

Some might say: "Nobody needs that." Maybe, but what if you suddenly do? What if your client asks you that question?

Some might then say: "We have an audit log for ALL changes". Great! How comprehensive is that?

How would your audit log look for more complex mutations accross multiple entities? How sure are you that you're covering every single mutation?

Well, to be fair, there are temporal databases now that would solve this problem.

For reference, this is how your event store might look like for the same mutations:

{
    "streamId": "person:1",
    "eventType": "PersonCreated",
    "payload": {
        "id": 1,
        "name": "John Doe",
        "by": "some_user",
        "on": 1721005668
    },
    "version": 1
},
{
    "streamId": "person:1",
    "eventType": "PersonNameSet",
    "payload": {
        "id": 1,
        "newName": "Something Else",
        "by": "some_user",
        "on": 1721005669
    },
    "version": 2
},
{
    "streamId": "person:1",
    "eventType": "PersonNameSet",
    "payload": {
        "id": 1,
        "newName": "Some New Name",
        "by": "some_user",
        "on": 1721005670
    },
    "version": 3
},
{
    "streamId": "person:1",
    "eventType": "PersonNameSet",
    "payload": {
        "id": 1,
        "newName": "A Random Name",
        "by": "some_user",
        "on": 1721005671
    },
    "version": 4
}

Now, to get to the current state, this is something that you might do:

var events = LoadEvents("person:1");
Person person = new Person();
foreach (var @event in events) {
    switch (@event.EventType) {
        case "PersonCreated":
            person.Id = @event.Payload.Id;
            person.Name = @event.Payload.Name;
            break;
        case "PersonNameSet":
            person.Name = @event.Payload.NewName;
            break;
    }
}

At it's core, that is where Event Sourcing starts and ends. Instead of mutating data, you append events to an event store grouped together by a partition key that is usually called Stream ID in the Event Sourcing community.

Besides having a highly-auditable list of everything that happened to your system, you get the following non-comprehensive benefits to boot:

  1. Absolutely no data is ever lost,
  2. Ability to replay the history for data analytics, or for whatever reason,
  3. Append-only transactions lets you ingest data very efficiently,
  4. Easier to debug following the flow of data,
  5. You can answer questions even before it's asked.

Now this is when it starts to look a little more stream-lined: I started digging into Domain Driven Design, as well.

Domain Driven Design

What the difference between a "Software Developer" and a "Software Architect"?

My younger self would probably just say "Years of experience" or "A software architect knows software design patterns."

While that's true to an extent — my opinion on this matter changed quite a bit. While a typical Software Architect knows design patterns, and is probably more technically adept that most other developers in their organization, they should know the Whats, the Hows, and more importantly: the Whys.

A software architect should be more involved in the software design with full-grasp of the business domain. They should think of What they need to build, why the business requires it even before they tackle how they're going to build the software for it.

What I'm trying to say is that I'm always focused on how to do things upfront instead of properly understanding the business needs first.

Domain-Driven Design (DDD), as its name suggests, shifts the focus into the domain itself and building software around it. This is a huge topic that I probably couldn't cover in one post, so I'll refer you to the Domain-Driven Design: Tackling Complexity in the Heart of Software by Eric Evans, or the Big Blue Book as people call it.

Domain-Driven Design: Tackling Complexity in the Heart of Software
Domain-Driven Design: Tackling Complexity in the Heart of Software by Eric Evans

Before we get into the weeds of DDD, let's to define a few terms first:

Here is a, hopefully, good representation of some of the concepts mentioned above:

class UserEntity {
    public required string Username { get; init; }
    public required string Name { get; set; }
}

record AddressValueObject(string Address, string City, string State, string ZipCode, string Country);

class UserAggregate {
    
    // Aggregate Root (I usually flatten this to the Aggregate)
    public UserEntity Root { get; init; }

    // a non-root entity
    public UserEntity? Manager { get; private set; }

    // an array of value objects
    public AddressValueObject[] Addresses { get; private set; } = [];

    // Setting the user as the Aggregate Root
    public UserAggregate(UserEntity user) {
        Root = user;
    }

    // Invariants
    public void SetManager(UserEntity manager) {
        Manager = manager;
    }

    public void AddAddress(AddressValueObject address) {
        Addresses = [..Addresses, address];
    }
}

What does it have to do with Event Sourcing?

While I've heard Greg Young said that you can do Event Sourcing without DDD and DDD without Event Sourcing, he also said that they both work well together - and I whole-heartedly agree.

Let's turn our previous code example into an Event Sourced model.

record UserEntity(string Username, string Name) {
    public static UserEntity Empty = new UserEntity(string.Empty, string.Empty);
}

record Address(string Address, string City, string State, string ZipCode, string Country) {
    public bool IsValid() => 
        !string.IsNullOrWhiteSpace(Address) &&
        !string.IsNullOrWhiteSpace(City) &&
        !string.IsNullOrWhiteSpace(State) &&
        !string.IsNullOrWhiteSpace(ZipCode) &&
        !string.IsNullOrWhiteSpace(Country);
}

class UserAggregate : Aggregate {

    public static GetStreamId(string username) => $"user:{username}";
    public string StreamId => GetStreamId(Username);

    public UserEntity User { get; private set; } = UserEntity.Empty;

    public UserEntity? Manager { get; private set; }

    public Address[] Addresses { get; private set; } = [];

    public UserAggregate() {
        RegisterHandler<UserCreated>(When);
        RegisterHandler<UserManagerChanged>(When);
        RegisterHandler<UserAddressAdded>(When);
    }

    public UserAggregate(string username, string name, string by) : this() {
        if (User != UserEntity.Empty) throw new InvalidOperationException("Cannot create user");
        Apply(new UserCreated(username, name, by, DateTimeOffset.ToUnixTimeMilliseconds()));
    }

    public void SetManager(string managerUsername, string managerName, string by) {
        if (this == UserEntity.EMPTY) throw new InvalidOperationException("User not found");
        if (string.IsNullOrWhitespace(username)) throw new InvalidOperationException("Invalid manager");
        if (Manager != null && Manager.Username == username) return; // noop

        Apply(new UserManagerChanged(Username, managerUsername, managerName, by, DateTimeOffset.ToUnixTimeMilliseconds()));
    }

    public void AddAddress(Address address, string by) {
        if (this == UserEntity.EMPTY) throw new InvalidOperationException("User not found");
        if (!address.IsValid) throw new InvalidOperationException("Address is invalid");
        if (Addresses.Contains(address)) return; // noop

        Apply(new UserAddressAdded(Username, address, by, DateTimeOffset.ToUnixTimeMilliseconds()));
    }

    private void When(UserCreated @event) {
        _user = new UserEntity(@event.Username, @event.Name);

        _changes.Add(@event);
    }

    private void When(UserManagerChanged @event) {
        Manager = new UserEntity(@event.ManagerUsername, @event.ManagerName);

        _changes.Add(@event);
    }

    private void When(UserAddressAdded @event) {
        Addresses = [..Addresses, @event.Address];

        _changes.Add(@event);
    }
}

record EventBase();
record UserCreated(string Username, string Name, string By, long Timestamp) : EventBase;
record UserManagerChanged(string Username, string ManagerUsername, string ManagerName, string By, long Timestamp) : EventBase;
record UserAddressAdded(string Username, Address Address, string By, long Timestamp) : EventBase;

abstract class Aggregate {
    public abstract string StreamId { get; }
    private readonly List<EventBase> _changes { get; private set; } = new List<EventBase>();
    public long Version { get; private set; } = 0;
    private readonly Dictionary<string, Action<EventBase>> _eventHandlers = new();

    public void RegisterEventHandler<T>(Action(T payload) handler) where T : EventBase {
        var type = typeof(T);
        _eventHandlers.Add(type.Name, (@event) => handler((T)@event));
    }

    protected virtual void Apply(EventBase @event) {
        if (_eventHandlers.TryGetValue(@event.GetType().Name, out Action<EventBase> handler)) {
            handler.Invoke(@event);
            _changes.Add(@event);
        }
    }

    public void Load(IEnumerable<EventWrapper<EventBase>> wrappers) {
        foreach (var wrapper in wrappers) {
            if (!_eventHandlers.TryGetValue(wrapper.EventType, out Action<EventBase> handler)) continue;

            handler.Invoke(wrapper.Payload);
            Version = wrapper.Version;
        }
    }

    public EventBase[] Changes => _changes.ToArray();
    public void ClearChanges() => _changes.Clear();
}

class EventWrapper<T> where T: EventBase {
    public string Id => $"{StreamId}:{Version}";
    public required string StreamId { get; set; }
    public required string EventType { get; set; }
    public required T Payload { get; set; }
    public required string Version { get; set; }
    public required long Timestamp { get; set; }
}

The changes are fairly obvious, on top of some validations, here are the things that are added:

  1. A couple of new records, namely EventBase, UserCreated, UserManagerChanged, and UserAddressAdded,
  2. the invariants now never apply mutations, but instead calls an Apply method,
  3. a couple of When methods that mutates the state,
  4. some RegisterHandler on the constructor, and,
  5. new abstract class named Aggregate that our UserAggregate model now inherits that has a couple of methods,
  6. an EventWrapper<T> with an EventBase filter.

It might look like some unnecessary abstraction as the first example and the second one essentially works the same at the moment, but this is where the Repository and the EventStore comes in.

IEventStore eventStore = new InMemoryEventStore();
var repository = new AggregateRepository<UserAggregate>(eventStore);
var user = repository.Load(UserAggregateRoot.GetStreamId("some_user"));
user.SetManager("some_manager", "Some Manager", "admin_user");
repository.Save(user);

class AggregateRepository<T> where T: Aggregate, new() {
    private readonly IEventStore _eventStore;

    AggregateRepository(IEventStore eventStore) {
        _eventStore = eventStore;
    }

    public T Load(string streamId) {
        var aggregateRoot = new T();
        aggregateRoot.Load(_eventStore.LoadStreamEvents(streamId));

        return aggregateRoot;
    }

    void Save(T aggregateRoot) {
       aggregateRoot.Version = _eventStore.AppendToStream(aggregateRoot.StreamId, aggregateRoot.Changes, aggregateRoot.Version);
       aggregateRoot.ClearChanges();
    }
}

interface IEventStore {
    IEnumerable<EventWrapper> LoadStreamEvents(string streamId);
    long AppendToStream(string streamId, EventBase[] events, long expectedVersion);
}

class InMemoryEventStore : IEventStore {
    private readonly Dictionary<string, List<EventWrapper>> _eventStore = new();

    public IEnumerable<EventWrapper> LoadStreamEvents(string streamId) {
        if (_eventStore.TryGetValue(streamId, out List<EventWrapper> wrappers)) {
            return wrappers;
        }

        return Enumerable.Empty<EventWrapper<EventBase>>();
    }

    public long AppendToStream(string streamId, EventBase[] eventBase, long expectedVersion) {

        var newVersion = expectedVersion;

        if (!_eventStore.TryGetValue(streamId, out List<EventWrapper> wrappers)) {
            wrappers = new List<EventWrapper>();
            _eventStore[streamId] = wrappers;
        } 

        wrappers.AddRange(eventBase.Select(e => new EventWrapper {
            StreamId = streamId,
            EventType = e.GetType().Name,
            Payload = e,
            Version = ++newVersion,
            Timestamp = DateTimeOffset.ToUnixTimeMilliseconds()
        }));

        return newVersion;
    }
}

Whew! That's quite a bit of code there! But, this is already event sourcing in single bounded context in full!

In essence, everything that the Repository is doing is acting like an intermediary between the Aggregate and the EventStore — rehydrating its state from a list of events, and passing uncommited changes to the EventStore and clearing it up if it succeeds. The EventStore is responsible for data persistence operations and retruning the last version of an Aggregate's stream.

The Aggregate abstract class is just some helper class to help the Repository track the changes and keep your actual UserAggregate as tech-agnostic as much as possible.

Albeit all happening in memory, you can just create your own implementation of the IEventStore but would probably have some degree of trouble with JSON deserialization. I will be covering that on the next post in this series.

In part 3, I will be adding CQRS, projections and projectors, and some general cross-domain event processing to the mix. I will also be talking about how all these information led to me creating and open-sourcing Uneventful, my Event Sourcing library for dotnet.

15 July 2024 | Tags: posts; event sourcing; DDD; c#