Sunday, January 15, 2012

Event Sourcing, Akka FSMs and functional domain models

I blogged on Event Sourcing and functional domain models earlier. In this post I would like to share more of my thoughts on the same subject and how with a higher level of abstraction you can make your domain aggregate boundary more resilient and decoupled from external references.

When we talk about a domain model, the Aggregate takes the centerstage. An aggregate is a core abstraction that represents the time invariant part of the domain. It's an embodiment of all states that the aggregate can be in throughout its lifecycle in the system. So, it's extremely important that we take every pain to distil the domain model and protect the aggregate from all unwanted external references. Maybe an example will make it clearer.

Keeping the Aggregate pure

Consider a Trade model as the aggregate. By Trade, I mean a security trade that takes place in the stock exchange where counterparties exchange securities and currencies for settlement. If you're a regular reader of my blog, you must be aware of this, since this is almost exclusively the domain that I talk of in my blog posts.

A trade can be in various states like newly entered, value date added, enriched with tax and fee information, net trade value computed etc. In a trading application, as a trade passes through the processing pipeline, it moves from one state to another. The final state represents the complete Trade object which is ready to be settled between the counterparties.

In the traditional model of processing we have the final snapshot of the aggregate - what we don't have is the audit log of the actual state transitions that happened in response to the events. With event sourcing we record the state transitions as a pipeline of events which can be replayed any time to rollback or roll-forward to any state of our choice. Event sourcing is coming up as one of the potent ways to model a system and there are lots of blog posts being written to discuss about the various architectural strategies to implement an event sourced application.

That's ok. But whose responsibility is it to manage these state transitions and record the timeline of changes ? It's definitely not the responsibility of the aggregate. The aggregate is supposed to be a pure abstraction. We must design it as an immutable object that can respond to events and transform itself into the new state. In fact the aggregate implementation should not be aware of whether it's serving an event sourced architecture or not.

There are various ways you can model the states of an aggregate. One option that's frequently used involves algebraic data types. Model the various states as a sum type of products. In Scala we do this as case classes ..

sealed abstract class Trade {
  def account: Account
  def instrument: Instrument
  //..
}

case class NewTrade(..) extends Trade {
  //..
}

case class EnrichedTrade(..) extends Trade {
  //..
}

Another option may be to have one data type to model the Trade and model states as immutable enumerations with changes being effected on the aggregate as functional updates. No in place mutation, but use functional data structures like zippers or type lenses to create the transformed object in the new state. Here's an example where we create an enriched trade out of a newly created one ..

// closure that enriches a trade
val enrichTrade: Trade => Trade = {trade =>
  val taxes = for {
    taxFeeIds      <- forTrade // get the tax/fee ids for a trade
    taxFeeValues   <- taxFeeCalculate // calculate tax fee values
  }
  yield(taxFeeIds ° taxFeeValues)
  val t = taxFeeLens.set(trade, taxes(trade))
  netAmountLens.set(t, t.taxFees.map(_.foldl(principal(t))((a, b) => a + b._2)))
}

But then we come back to the same question - if the aggregate is distilled to model the core domain, who handles the events ? Someone needs to model the event changes, effect the state transitions and take the aggregate from one state to the next.

Enter Finite State Machines

In one of my projects I used the domain service layer to do this. The domain logic for effecting the changes lies with the aggregate, but they are invoked from the domain service in response to events when the aggregate reaches specific states. In other words I model the domain service as a finite state machine that manages the lifecycle of the aggregate.

In our example a Trading Service can be modeled as an FSM that controls the lifecycle of a Trade. As the following ..

import TradeModel._

class TradeLifecycle(trade: Trade, timeout: Duration, log: Option[EventLog]) 
  extends Actor with FSM[TradeState, Trade] {
  import FSM._

  startWith(Created, trade)

  when(Created) {
    case Event(e@AddValueDate, data) =>
      log.map(_.appendAsync(data.refNo, Created, Some(data), e))
      val trd = addValueDate(data)
      notifyListeners(trd) 
      goto(ValueDateAdded) using trd forMax(timeout)
  }

  when(ValueDateAdded) {
    case Event(StateTimeout, _) =>
      stay

    case Event(e@EnrichTrade, data) =>
      log.map(_.appendAsync(data.refNo, ValueDateAdded, None,  e))
      val trd = enrichTrade(data)
      notifyListeners(trd)
      goto(Enriched) using trd forMax(timeout)
  }

  when(Enriched) {
    case Event(StateTimeout, _) =>
      stay

    case Event(e@SendOutContractNote, data) =>
      log.map(_.appendAsync(data.refNo, Enriched, None,  e))
      sender ! data
      stop
  }

  initialize
}

The snippet above contains a lot of other details which I did not have time to prune. It's actually part of the implementation of an event sourced trading application that uses asynchronous messaging (actors) as the backbone for event logging and reaching out to multiple consumers based on the CQRS paradigm.

Note that the FSM model above makes it very explicit about the states that the Trade model can reach and the events that it handles while in each of these states. Also we can use this FSM technique to log events (for event sourcing), notify listeners about the events (CQRS) in a very much declarative manner as implemented above.

Let me know in the comments what are your views on this FSM approach towards handling state transitions in domain models. I think it helps keep aggregates pure and helps design domain services that focus on serving specific aggregate roots.

I will be talking about similar stuff, Akka actor based event sourcing implementations and functional domain models in PhillyETE 2012. Please drop by if this interests you.

11 comments:

Channing Walton said...

Hi Debasish,
thanks for this article, more food for thought.

I recently modelled state transitions for Trades as follows. A (immutable) Trade has a state as a field, modelled as case classes. State transitions are made by calling a method on Trade: update(event:SomeEvent):Option[Trade] = state.update(this, event)

The state is responsible for transitioning the trade and optionally creating a new trade with a new state. There are cases where there is no transition in response to an event for a number of reasons, hence the Option. In some scenarios there may be a need to return an error (Either or scalaz Validation), but not in my case.

This approach enables each State to encapsulate the knowledge about what the valid transitions to other states are.

I suspect that this design represents a FSM albeit implicitly. Maybe thats a problem?

Channing Walton said...

Hi Debasish,
thanks for this article, more food for thought.

I recently modelled state transitions for Trades as follows. A (immutable) Trade has a state as a field, modelled as case classes. State transitions are made by calling a method on Trade: update(event:SomeEvent):Option[Trade] = state.update(this, event)

The state is responsible for transitioning the trade and optionally creating a new trade with a new state. There are cases where there is no transition in response to an event for a number of reasons, hence the Option. In some scenarios there may be a need to return an error (Either or scalaz Validation), but not in my case.

This approach enables each State to encapsulate the knowledge about what the valid transitions to other states are.

I suspect that this design represents a FSM albeit implicitly. Maybe thats a problem?

Unknown said...

@Channing Walton

You are correct! We end up implementing FSMs most of the time without knowing beforehand.

I am a bit of a fanatic on domain abstractions being *pure*. Why should the *state* field and the *update* method exist on the Trade object ? Is it part of the domain in the real world ? Hence I took this approach of externalizing the whole FSM outside the Trade abstraction.

But again, designs are never wrong or right .. if it works for you it's great :-) ..

Cheers.

Channing Walton said...

Hi,
> Why should the *state* field and the *update* method exist on the Trade object ?

Well. There lies hours of debate :D My OO brain says push behaviour into objects, the Trade 'should' know how to update itself and what its interested in in the domain of this app.

I used to be very certain about these things. I am not young enough to be so certain any more ;)

Unknown said...

That's one way to look into it. But consider, if you were to save a Trade object to the database, will u put the save() method in Trade. By your logic, you should because the Trade object should know how to persist itself. This can make the Trade abstraction bloated. Just like it's the responsibility of a DAO to persist a Trade object, I claim that similarly it's the responsibility of a service object to manage it's transitions. The Trade object will move through various states as implied from the values of it's various fields. And as long as we manage this movement keeping it immutable, I think we are done.

Channing Walton said...

Yes I see what you are saying. Of course I wouldn't put the save on the Trade. I think I make a distinction between 'pure' domain specific behaviour which lives on the Trade, versus systemic functions like persistence. I realise that is very vague and imprecise.

When you say 'it's the responsibility of a service object to manage it's transitions', did you mean that it's the responsibility of a separate service object to manage the Trade's transitions? Or, its the Trades responsibility to manage its transitions.

Unknown said...

I meant that we should have domain services to manage the lifecycle and state transitions of all domain objects. For database persistence we usually do it through the Repository. In this case for managing the states of a domain object we can use the FSM. The point that I was trying to make in the post is that why not make the FSM explicit - so now we have clear declarative code that documents the events and states very explicitly.

Domain Registration Chennai said...

Thanks ....nice stuff...great article

Gary said...

Hi Debasish,

One question with your implementation - in a realistic production system, wouldn't you want to be sure your events were logged prior to processing the state changes and letting clients know that for example their trade has been executed? I feel like you leave yourself open to the small but substantial (in terms of cost) risk of being unable to recreate state if your system goes down and an event had not been logged.

Thanks for the great article.

Unknown said...

Hi Gary -
Are u pointing to the fact that we log events asynchronously and hence leaving a small window of chance that the logging may fail and yet I send notification to my clients ?

Yes, that's a problem w/ the current implementation but I wanted to keep things simple to focus on the paradigm itself instead of the implementation details.

However, even in a production deployment we would not go for synchronous logging - that would be blocking IO and will kill the scalability. There are other ways to get both the benefits where you need to use an STM and a transient log as well as a persistent log. Martin has blogged on these implementation issues in his series on Event Sourcing. Please check http://krasserm.blogspot.in/2012/01/building-event-sourced-web-application.html for more details.

Unknown said...

Thanks Debasish, I have actually been reading both of your blogs and have been experimenting with various approaches. My particular case is for both a trading system and the configuration for it (web app).