Lightbend Activator

Akka Distributed Domain Driven Design with CQRS

Activator will be EOL-ed on May 24, 2017.

We’re making it easier and simpler for developers to get started with Lightbend technologies. This unfortunately means that future releases of Play, Akka and Scala will no longer include Activator support, and Lightbend’s Activator server will be decommissioned by the end of 2017. Here's what to do if:

You Use Activator Templates:
To use the Giter8 templates, please install sbt 0.13.13+ and use the “sbt new” command

You Wrote Activator Templates:
Please migrate your templates to Giter8 using this simple process

You Want New Getting Started Materials:
Please visit the Lightbend Tech Hub

Akka Distributed Domain Driven Design with CQRS

BoldRadius Solutions
Source
March 16, 2015
akka akka-persistence cluster-sharding cluster cqrs event-sourced scala

A starter distributed application with Akka and Spray that demonstrates Command Query Responsibility Segregation using Akka Persistence, Cluster Sharding and a distributed journal for Event Sourcing enabled with Cassandra.

How to get "Akka Distributed Domain Driven Design with CQRS" on your computer

There are several ways to get this template.

Option 1: Choose akka-dddd-cqrs in the Lightbend Activator UI.

Already have Lightbend Activator (get it here)? Launch the UI then search for akka-dddd-cqrs in the list of templates.

Option 2: Download the akka-dddd-cqrs project as a zip archive

If you haven't installed Activator, you can get the code by downloading the template bundle for akka-dddd-cqrs.

  1. Download the Template Bundle for "Akka Distributed Domain Driven Design with CQRS"
  2. Extract the downloaded zip file to your system
  3. The bundle includes a small bootstrap script that can start Activator. To start Lightbend Activator's UI:

    In your File Explorer, navigate into the directory that the template was extracted to, right-click on the file named "activator.bat", then select "Open", and if prompted with a warning, click to continue:

    Or from a command line:

     C:\Users\typesafe\akka-dddd-cqrs> activator ui 
    This will start Lightbend Activator and open this template in your browser.

Option 3: Create a akka-dddd-cqrs project from the command line

If you have Lightbend Activator, use its command line mode to create a new project from this template. Type activator new PROJECTNAME akka-dddd-cqrs on the command line.

Option 4: View the template source

The creator of this template maintains it at https://github.com/boldradius/akka-dddd-template#master.

Option 5: Preview the tutorial below

We've included the text of this template's tutorial below, but it may work better if you view it inside Activator on your computer. Activator tutorials are often designed to be interactive.

Preview the tutorial

Background

Distributed Domain Driven Design

CQRS/ES Command Query Responsibility Segregation / Event Sourcing

This is a pattern that uses Command and Query objects to apply the CQS principle for modifying and retrieving data.

Event Sourcing is an architectural pattern in which state is tracked with an immutable event log instead of destructive updates (mutable).

Getting Started

To get this application going, you will need to:

  • Set up the datastore
  • Boot the cluster nodes
  • Boot the Http microservice node

DataStore

This application requires a distributed journal. Storage backends for journals and snapshot stores are pluggable in Akka persistence. In this case we are using Cassandra. You can find other journal plugins here.

The datastore is specified in application.conf

cassandra-journal.contact-points = ["127.0.0.1"]
cassandra-snapshot-store.contact-points = ["127.0.0.1"]
As you can see, the default is localhost. In a cloud deployment, you could add several addresses to a cassandra cluster.

This application uses a simple domain to demonstrate CQRS and event sourcing with Akka Persistence. This domain is an online auction:


final case class Bid(price:Double, buyer:String, timeStamp:Long)

final case class Auction(auctionId:String,
                         startTime:Long,
                         endTime:Long,
                         initialPrice:Double,
                         acceptedBids:List[Bid],
                         refusedBids:List[Bid],
                         ended:Boolean)
        

This is a distributed application, leveraging Akka Cluster.

The Command path of this application is illustrated by the creation of an auction, and placing bids.

The Query path of this application is illustrated by the querying of winning bid and bid history.

In order to distribute and segregate these paths, we leverage Akka Cluster, as well as Cluster Sharding.

Cluster Sharding enables the distribution of the command and query actors across several nodes in the cluster, supporting interaction using their logical identifier, without having to care about their physical location in the cluster.

Cluster Nodes

You must first boot some cluster nodes (as many as you want). Running locally, these are distinguished by port eg:[2551,2552,...].
This cluster must specify one or more seed nodes in application.conf


akka.cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]

auto-down-unreachable-after = 10s
}

The Cluster Nodes are bootstrapped in ClusterNode.scala.

To boot each cluster node locally:


sbt 'runMain com.boldradius.cqrs.ClusterNodeApp nodeIpAddress port'
for example:

sbt 'runMain com.boldradius.cqrs.ClusterNodeApp 127.0.0.1 2551'

Http Microservice Node

The HTTP front end is implemented as a Spray microservice and is bootstrapped in HttpApp.scala.It participates in the Cluster, but as a proxy.

To run the microservice locally:


sbt 'runMain com.boldradius.cqrs.HttpApp httpIpAddress httpPort akkaIpAddres akkaPort'
for example:

sbt 'runMain com.boldradius.cqrs.HttpApp 127.0.0.1 9000 127.0.0.1 0'

The HTTP API enables the user to:

  • Create an Auction
  • Place a did
  • Query for the current winning bid
  • Query for the bid history
Create Auction

 POST http://127.0.0.1:9000/startAuction

 {"auctionId":"123",
 "start":"2015-01-20-16:25",
 "end":"2015-07-20-16:35",
 "initialPrice" : 2,
 "prodId" : "3"}
             
Place Bid

POST http://127.0.0.1:9000/bid

{"auctionId":"123",
"buyer":"dave",
"bidPrice":6}
             
Query for the current winning bid

GET http://localhost:8080/winningBid/123
             
Query for the bid history

http://localhost:8080/bidHistory/123
             

Spray service fowards to the cluster

The trait HttpAuctionServiceRoute.scala implements a route that takes ActorRefs (one for command and query) as input. Upon receiving an Http request, it either sends a command message to the command actor, or a query message to the query actor.

 def route(command: ActorRef, query:ActorRef) = {
     post {
        path("startAuction") {
            extract(_.request) { e =>
                entity(as[StartAuctionDto]) {
                    auction => onComplete(
                        (command ? StartAuctionCmd(auction.auctionId,....

         

Exploring the Command path in the Cluster

The command path is implemented in BidProcessor.scala. This is a PersistentActor that receives commands:


def initial: Receive = {
    case a@StartAuctionCmd(id, start, end, initialPrice, prodId) => ...
}

def takingBids(auctionId: String, startTime: Long, closeTime: Long): Receive = {
            case a@PlaceBidCmd(id, buyer, bidPrice) => ...
}
and produces events, writing them to the event journal, and notifying the Query Path of the updated journal:

val event = AuctionStartedEvt(id, start, end, initialPrice, prodId)   // the event to be persisted
persistAsync(event) { evt =>                                          // block that will run once event has been written to journal
readRegion ! Update(await = true)                                   // update the Query path
auctionStateMaybe = startMaybeState(id, start, end, initialPrice)   // update internal state
...
}
  

This actor is cluster sharded on auctionId as follows:

val idExtractor: ShardRegion.IdExtractor = {
    case m: AuctionCmd => (m.auctionId, m)
}

val shardResolver: ShardRegion.ShardResolver = msg => msg match {
    case m: AuctionCmd => (math.abs(m.auctionId.hashCode) % 100).toString
}

val shardName: String = "BidProcessor"
            
This means, there is only one instance of this actor in the cluster, and all commands with the same auctionId will be routed to the same actor.

If this actor receives no commands for 1 minute, it will passivate ( a pattern enabling the parent to stop the actor, in order to reduce memory consumption without losing any commands it is currently processing):


/** passivate the entity when no activity */
context.setReceiveTimeout(1 minute)     // this will send a ReceiveTimeout message after one minute, if no other messages come in
        
The timeout is handled in the Passivation.scala trait:

protected def withPassivation(receive: Receive): Receive = receive.orElse{
    // tell parent actor to send us a poisinpill
    case ReceiveTimeout => context.parent ! Passivate(stopMessage = PoisonPill)

    // stop
    case PoisonPill => context.stop(self)
}
             

If this actor fails, or is passivated, and then is required again (to handle a command), the cluster will spin it up, and it will replay the event journal, updating it's internal state:

def receiveRecover: Receive = {
    case evt: AuctionEvt => updateState(evt)

    case RecoveryCompleted => {
        auctionStateMaybe.fold[Unit]({}) { auctionState =>
            if (auctionState.ended)
                context.become(passivate(auctionClosed(auctionState.auctionId, auctionState.endTime)).orElse(unknownCommand))
            else{
                context.become(passivate(takingBids(auctionState.auctionId, auctionState.startTime, auctionState.endTime)).orElse(unknownCommand))
                }
            }
        }
}
              

Exploring the Query path in the Cluster

The Queries are handled in a different Actor: BidView.scala. This is a PersistentView that handles query messages, or prompts from it's companion PersistentActor to update itself.

BidView.scala is linked to the BidProcessor.scala event journal via it's persistenceId


override val persistenceId: String = "BidProcessor" + "-" + self.path.name
    
This means it has access to this event journal, and can maintain, and recover state from this journal.

It is possible for a PersistentView to save it's own snapshots, but, in our case, it isn't required.

This PersistentView is sharded in the same way the PersistentActor is:


val idExtractor: ShardRegion.IdExtractor = {
    case m : AuctionEvt => (m.auctionId,m)
    case m : BidQuery => (m.auctionId,m)
}

val shardResolver: ShardRegion.ShardResolver = {
    case m: AuctionEvt => (math.abs(m.auctionId.hashCode) % 100).toString
    case m: BidQuery => (math.abs(m.auctionId.hashCode) % 100).toString
}
        
One could have used a different shard strategy here, but a consequence of the above strategy is that the Query Path will reside in the same Shard Region as the command path, reducing latency of the Update() message from Command to Query.

The PersistentView maintains the following model in memory:


final case class BidState(auctionId:String,
                         start:Long,
                         end:Long,
                         product:Double,
                         acceptedBids:List[Bid],
                         rejectedBids:List[Bid],
                         closed:Boolean)
This model is sufficient to satisfy both queries: Winning Bid, and Bid History:


  def auctionInProgress(currentState:BidState, prodId:String):Receive = {

    case  GetBidHistoryQuery(auctionId) =>  sender ! BidHistoryResponse(auctionId,currentState.acceptedBids)

    case  WinningBidPriceQuery(auctionId) =>
        currentState.acceptedBids.headOption.fold(
        sender ! WinningBidPriceResponse(auctionId,currentState.product))(b =>
        sender ! WinningBidPriceResponse(auctionId,b.price))

          ....

  }