arvil's blog

My Journey to Event Sourcing (Part 1)

Event Sourcing. These 2 words, together, is something that I haven't heard until 2019.

History with context is important

Quite an apt heading, if you catch my drift

Prior to 2019, I already have a significant amount of professional experience under my belt. I've climbed through the corporate ranks from being a junior developer, to a mid-level developer, to a senior developer, to being one of the lead developers on my team - and yet, I haven't heard anything about this "Event Sourcing" before.

To be fair, when you stay with companies for enough time, you'd get siloed, and you'll be living in a bubble. Companies tend to have their own standards and practices, and with the constant drumming of new functionalities knocking on your doorstep day-in and day-out, you'll have little time to look out into the abyss for too long.

Mind you, I did my own research and try to innovate quite often. I was one of our company's early adopter to Angular2 when we were using AngularJS for years. I've learned about ANTLR4 from a colleague that authored a C# Eval GitHub repository and found a use-case that would make our product better. I could write a couple more things that I did to justify that I was an innovator, but the truth is that my view is quite limited.

See, these were the times when we were still building monoliths that are hosted on-prem while we read headlines about how micro-services are streamlining development teams' workflows while both increasing overall system resilience and performance and driving down costs, if you know what you're doing, that is.

As much as I'd like to dig deep into this, being a lead developer, I haven't quite reached a high-enough corporate position to actually drive this massive change. Although, things are going to change eventually...

Author Company Changed Life Event

I eventually moved to a different company. I was hired with a single directive: to create a full-blown system that can handle high-traffic scenarios with massive data ingress and real-time reporting to be hosted on the cloud.

The first few months was full of data-gathering while creating web app mockups with Angular, creating mobile apps for Point-of-Sale devices and for reporting with Flutter.

I've written some back-end code that closely resembled the ones that I've built before: a monolith. I wasn't particularly excited about it, but hey, I needed to single-handedly create a huge system and fast!

Unfortunately, load-testing my backend really opened my eyes to the limitations of monoliths - while I can definitely scale up my Azure SQL instance and my App Services, it also increased the costs. By quite a lot, I dare say.

Then the COVID-19 pandemic hit. Everywhere was in lock-down and the world stood still.

While everybody is, understandably, panicking, I took this world-halting event as an opportunity to do research on the very thing that (1) I always wanted to learn, and (2) could help me achieve that mythical fast and highly available system that I am required to conjure.

Event-Driven Shenanigans

One month in: Learning about Command Query Responsibility Segregation (CQRS) and Event-Driven architecture (EDA) has opened a myriad of other options for me. I can now create microservices with event-driven state-transfer and I have created a Proof-of-Concept that was fast and works well... most times.

The problem is that I was doing multiple commits in one transaction. Commit the record in my SQL Database then Publish an event.

// something like
try {
    await context.Record.AddAsync(theNewEntry);
    await context.SaveChangesAsync();

    await eventPublisher.PublishAsync(theNewEntry);

    return Results.Ok(theNewEntry.Id);
} catch (Exception ex) {
    return Results.Problem(ex.Message);
}

With that, sure enough no data on the producer's side will be lost, but a failure to publish the event through some transient errors will leave consumers expecting to be updated will not be notified that the state has changed.

I've scoured the internet for potential solutions and found this approach:

await using var transaction = await context.Database.BeginTransactionAsync(cancellationToken);
try {
    await context.Record.AddAsync(theNewEntry);
    await context.SaveChangesAsync();

    await eventPublisher.PublishAsync(theNewEntry);

    await transaction.CommitAsync();

    return Results.Ok(theNewEntry.Id);
} catch (Exception ex) {
    await transaction.RollbackAsync();
    return Results.Problem(ex.Message);
}

See, the problem here is worse: database commits CAN and WILL fail and with that approach, when it fails a new state saying otherwise will be published and consumers will update their copies with a bad state.

Publish Event, Update Eventually

Countless YouTube videos, blogs, posts, and Stack Overflow visits after, I've chanced upon the Event-First approach.

I've mentioned that my issue stems from the fact that I was doing multiple commits in one transaction, so the solution was fairly evident - just make one commit. Specifically: just publish the event first.

The Event-first approach is self-explanatory. By doing CQRS and EDA, you're pretty much eventually consistent anyway, so why not go all in?

Some code should explain it better:

// domain endpoint
app.MapPost("/", 
    async (
        EventPublisher eventPublisher, 
        NewRecordRequest request
    ) => {
        var newEntry = new Record {
            Name = request.Name,
            SomeValue = request.SomeValue
        };

        try {
            await eventPublisher.PublishAsync(newEntry)

            return Results.Ok();
        } catch (Exception ex) {
            return Results.Problem(ex.Message);
        }
    }
);

// domain event processor on Azure Functions that assumes that only events 
// from type Record are being published (for demo purposes only)
[FixedDelayRetry(-1, "0:00:05")] // we'll talk about this later
[Function("YourEventProcessor")]
public async Task([EventHubListener(/** event hub parameters **/)] EventData[] events) {

    // simplified for brevity - it's not that simple
    foreach (Record entry in events.Select(e => /** deserialize **/)) {
        await using var context = new SomeContext(_dbContextOptions); // for ef folks, we'll talk about this, too

        if (entry.Id is default) {
            await context.Record.AddAsync(entry);
        } else {
            context.Record.Update(entry);
        }
        await context.SaveChangesAsync();
    }
}

"Looks great and works great!" Some might think. But not me.

While my load testing shows even faster data ingress, the pessimist in me says that problems might arise from this approach.

Here's a non-comprehensive list of my concerns:

  1. I can't read my write. The endpoint taking in the requests will have no idea what the new record's primary key is and couldn't return anything to the client.

    The workaround here is to not rely on your datastore (SQL Server) for this instance to generate the primary key for you.

    Like so:

    app.MapPost("/", 
        async (
            EventPublisher eventPublisher, 
            NewRecordRequest request
        ) => {
            var newEntry = new Record {
                Id = Guid.NewGuid(), // or request.Id if applicable
                Name = request.Name,
                SomeValue = request.SomeValue
            };
    
            try {
                await eventPublisher.PublishAsync(newEntry)
    
                return Results.Ok(newEntry.Id);
            } catch (Exception ex) {
                return Results.Problem(ex.Message);
            }
        }
    );
    
  2. Idempotency. EDA comes with eventual consistency and, usually, at-least-once delivery guarantees.

    At-least-once delivery guarantees are great to ensure that events are all processed unless ACKed. This means that unless the processing succeeds, the consumer will continuously try to redeliver the event until it succeeds OR it reaches the retry limit*, but there's an inherent problem with that.

    See, event consumers usually process and acknowledge events by batch. Consider the following logs:

    Attempt #1:
    6aad23ee-4a35-4a1b-b280-a8a312f84363 succeeded
    4af46112-30d1-4336-8a6a-75c89d5151c0 failed
    
    Attempt #2:
    6aad23ee-4a35-4a1b-b280-a8a312f84363 succeeded
    4af46112-30d1-4336-8a6a-75c89d5151c0 succeeded
    ACK batch 
    

    See the potential problem there? Still, no?

    Now, let's switch our domain model into something a little different.

    class UserOrderCount {
        public required Guid { get; init; }
        public required int OrderCount { get; set; }
    }

    And now, let's make our logs more verbose:

    Attempt #1:
    6aad23ee-4a35-4a1b-b280-a8a312f84363 processing 
    6aad23ee-4a35-4a1b-b280-a8a312f84363 OrderCount incremented by 1 (current count: 1)
    6aad23ee-4a35-4a1b-b280-a8a312f84363 succeeded
    4af46112-30d1-4336-8a6a-75c89d5151c0 processing
    4af46112-30d1-4336-8a6a-75c89d5151c0 failed
    
    Attempt #2:
    6aad23ee-4a35-4a1b-b280-a8a312f84363 processing 
    6aad23ee-4a35-4a1b-b280-a8a312f84363 OrderCount incremented by 1 (current count: 2)
    6aad23ee-4a35-4a1b-b280-a8a312f84363 succeeded
    4af46112-30d1-4336-8a6a-75c89d5151c0 processing
    4af46112-30d1-4336-8a6a-75c89d5151c0 OrderCount incremented by 1 (current count: 1)
    4af46112-30d1-4336-8a6a-75c89d5151c0 succeeded
    ACK batch 

    Now do you see the issue? One event caused one record to inadvertently update twice just because something else had failed.

    An idempotent operation produces the same result no matter how many times it's applied.

    Some operations are inherently idempotent, like setting a name for example. Anything that adds, subtracts, multiplies, or divide is not.

    So how do we solve this? There's a couple of ways - tons that have already been covered by multiple people.

    Here's what I did, depending on the scenario:

    1. For event-driven state-transfer for single entities, include a Version or a Timestamp on your events and check against them before doing any mutation.

    2. For data aggregation that usually targets an RDMS like SQL, like reporting and running counts, create a new table, like a RecordsProcessed table, that contains a unique constraint on the primary key AND the version of the records being processed. This should let you open a database transaction, mutate your record, then insert the primary key and the version to the RecordsProcessed table, then commit the transaction. Just make sure to check the table first before doing any processing and skip processing should the record exist.

    * I usually make my consumers retry indefinitely to ensure reliability, debatable I know, but let's discuss this on a different post.

So far, so good?

That information is more than enough to let you create a fully working event-driven system that's scalable and reliable - and back then, I was ready to stop there. But then... I saw a YouTube presentation of a certain Greg Young about Event Sourcing.

Now that's a story that would have to wait for part 2. 😁

13 July 2024 | Tags: posts; event sourcing; DDD; c#