Introducing Akka Cloud to Edge Continuum. Build once for the Cloud. Seamlessly deploy to the Edge - Read Blog
Support
akka akka-cluster scala

How To Distribute Application State with Akka Cluster - Part 4: The Source Code

Michael Read Senior Technical Consultant, Lightbend, Inc.

Proof Of Concept With Akka Cluster

In this series of blog posts, we walk you through a working Proof of Concept (PoC) built using Lightbend’s open source Scala programming language with the Akka distributed toolkit. In Part 1: Getting Started, we walked through building, testing, and running the PoC locally, with instrumentation and monitoring wired in from the very beginning using Lightbend Telemetry. In this Part 2: Docker and Local Deploy, we deploy to Docker to our local machine and test. In Part 3: Kubernetes & Monitoring, we move our PoC to Kubernetes (on Minikube), review some monitoring options, and dive deep into the PoC’s Kubernetes (K8s) deployment YAML files. 

In the previous 3 parts, we introduced our PoC and associated repository, which implements a highly consistent, and performant distributed cache with a persistence backing using Akka, Apache Cassandra, and Scala. We also showed you how to test, run locally, load test via Gatling, and monitor via Lightbend’s Telemetry tooling. Finally, in this fourth and final part, we look at the Scala source code required to create the PoC.

Deep Dive Into Scala Source Code

We’re going to start by diving into the bootstrapping process for our single docker image PoC. Then, we’ll move onto two roles: Nodes and Endpoints.

Bootstrapping

The PoC relies on the new Akka Typed APIs of which there are two APIs available. The first API is object oriented and it relies on classes to create your actors. The second API is functional, and uses behaviors to create actors behind the scenes. The PoC uses the functional API and is my personal preference.

The bootstrap process provides a root behavior to create a guardian actor and then starts the correct processes based on configuration. The mainline starts with the following code from the file com.lightbend.artifactstate.app.StartNode:

 def main(args: Array[String]): Unit = {
    val clusterName = appConfig.getString ("clustering.cluster.name")
    val clusterPort = appConfig.getInt ("clustering.port")
    val defaultPort = appConfig.getInt ("clustering.defaultPort")
    if (appConfig.hasPath("clustering.ports")) {
      val clusterPorts = appConfig.getIntList("clustering.ports")
      clusterPorts.forEach { port =>
        startNode(RootBehavior(port, defaultPort), clusterName)
      }
    }
    else {
      startNode(RootBehavior(clusterPort, defaultPort), clusterName)
    }
  }

The mainline is designed to start one or more nodes, based upon the number of ports provided in the configuration. The complete root behavior is provided through the following code:

 private object RootBehavior {
    def apply(port: Int, defaultPort: Int) : Behavior[NotUsed] =
      Behaviors.setup { context =>
        implicit val classicSystem: actor.ActorSystem =  TypedActorSystemOps(context.system).toClassic
 
        val TypeKey = EntityTypeKey[ArtifactCommand](ArtifactStatesShardName)
 
        val cluster = Cluster(context.system)
        context.log.info(s"starting node with roles: $cluster.selfMember.roles")
 
        if (cluster.selfMember.hasRole("k8s")) {
          AkkaManagement(classicSystem).start()
          ClusterBootstrap(classicSystem).start()
        }
 
        if (cluster.selfMember.hasRole("sharded")) {
          ClusterSharding(context.system).init(Entity(TypeKey)
          (createBehavior = ctx => ArtifactStateEntityActor(ctx.entityId))
            .withSettings(ClusterShardingSettings(context.system).withRole("sharded")))
        }
        else {
          if (cluster.selfMember.hasRole("endpoint")) {
            implicit val ec: ExecutionContextExecutor = context.system.executionContext
            val psEntities: ActorRef[ShardingEnvelope[ArtifactCommand]] =
              ClusterSharding(context.system).init(Entity(TypeKey)
              (createBehavior = ctx => ArtifactStateEntityActor(ctx.entityId)))
            val psCommandActor: ActorRef[ShardingEnvelope[ArtifactCommand]] = psEntities
 
            lazy val routes: Route = new ArtifactStateRoutes(context.system, psCommandActor).psRoutes
            val httpPort = context.system.settings.config.getString("akka.http.server.default-http-port")
            if (cluster.selfMember.hasRole("docker") || cluster.selfMember.hasRole("k8s")) {
              Http().bindAndHandle(routes, "0.0.0.0").map { binding =>
                context.log.info(s"Server online inside container on port ${httpPort}")
              }
            }
            else {
              Http().bindAndHandle(routes, "localhost").map { binding =>
                context.log.info(s"Server online at http://localhost:${httpPort}")
              }
            }
          }
        }
 
        if (port == defaultPort) {
          context.spawn(ClusterListenerActor(), "clusterListenerActor")
          context.log.info("started clusterListenerActor")
        }
 
        Behaviors.empty
      }
  }

Within the Setup of the behavior we start by capturing a classic version of the actor system which will be used by Akka Management or Akka HTTP, both of which require the classic actor system. Then we create an “Entity Type Key” based upon the ArtifactCommand and also name the shards. Then we access the cluster API, and log the roles that are provided via configuration. With the exception of starting a cluster listener actor at the end of setup, the logic flow depends upon the provided roles.

If the “K8s” role exists we’re running on Kubernetes, and we need to start Akka Management for cluster formation, which uses the Kubernetes API via configuration.

Moving forward, we then need to determine if the role of the cluster node is either “sharded”, which hosts cluster sharding, or “endpoint” which provides an Akka HTTP endpoint.

For the “sharded” role, we’re simply initializing cluster sharding using the ArtifactStateEntityActor and we’re pretty much done.

For the “endpoint” role, we initialize cluster sharding as a proxy so no shards are hosted by the node, and then create an actor of type ShardingEnvelope[ArtifactCommand] that is used to forward our commands on to the proper sharding region. Next, if we’re running in Docker or Kubernetes we bind the HTTP routing DSL to IP address 0.0.0.0 otherwise we bind to localhost.

Finally, the startNode method creates a typed actor system using our root behavior:


 def startNode(behavior: Behavior[NotUsed], clusterName: String) = {
    val system = ActorSystem(behavior, clusterName, appConfig)
    system.whenTerminated // remove compiler warnings
  }
 

Our root behavior also spawns an actor that subscribes to Akka Cluster Events when the port and default port matches. The actor simply logs cluster events that are of interest. The source code is contained in the file com.lightbend.artifactstate.actors.ClusterListenerActor:

object ClusterListenerActor {
 
  def apply(): Behavior[ClusterDomainEvent] =
    Behaviors.setup[ClusterDomainEvent] { context =>
 
      val cluster = Cluster(context.system)
      cluster.subscriptions ! Subscribe(context.self.ref, classOf[ClusterDomainEvent])
 
      context.log.info(s"started actor ${context.self.path} - (${context.self.getClass})")
 
      def running(): Behavior[ClusterDomainEvent] =
        Behaviors.receive { (context, message) =>
          message match {
            case MemberUp(member) =>
              context.log.info("Member is Up: {}", member.address)
              Behaviors.same
            case UnreachableMember(member) =>
              context.log.info("Member detected as unreachable: {}", member)
              Behaviors.same
            case MemberRemoved(member, previousStatus) =>
              context.log.info(
                "Member is Removed: {} after {}",
                member.address, previousStatus)
              Behaviors.same
            case _ =>
              Behaviors.same // ignore
          }
        }
 
      running()
    }
 
}

Note that the structure of the ClusterListenerActor follows the Akka style guide by returning it’s behavior via the apply method.

Nodes

Now we’re going to dive into the implementation of our ArtifactStateEntityActor, which leverages Akka’s Persistent Entity API and Event Sourcing (ES) for persistence. The basic idea behind ES is that only the events that cause changes to an actor’s state are persisted as a journal on disk. If a persistent actor crashes, passivates due to inactivity, or is part of a re-balanced shard, the actor can be reconstituted by replaying it’s events when needed.

The first part of our implementation focuses on defining the command API:

 sealed trait BaseId extends MsgSerializeMarker {
    val artifactId: Long
    val userId: String
  }
  sealed trait ArtifactCommand extends BaseId
  sealed trait ArtifactQuery extends ArtifactCommand  
  sealed trait ArtifactResponse extends MsgSerializeMarker
 
  // queries
  final case class IsArtifactReadByUser(replyTo: ActorRef[ArtifactReadByUser], artifactId: Long, userId: String) extends ArtifactQuery
  final case class IsArtifactInUserFeed(replyTo: ActorRef[ArtifactInUserFeed], artifactId: Long, userId: String) extends ArtifactQuery
  final case class GetAllStates(replyTo: ActorRef[AllStates], artifactId: Long, userId: String) extends ArtifactQuery
 
  // commands
  final case class SetArtifactRead(replyTo: ActorRef[Okay], artifactId: Long, userId: String) extends ArtifactCommand
  final case class SetArtifactAddedToUserFeed(replyTo: ActorRef[Okay], artifactId: Long, userId: String) extends ArtifactCommand
  final case class SetArtifactRemovedFromUserFeed(replyTo: ActorRef[Okay], artifactId: Long, userId: String) extends ArtifactCommand

We start by defining our BaseID trait that extends MsgSerializeMarker. The BaseID is used to create a unique identity that’s made by combining the fields artifactId and userId. The MsgSerializeMarker is used to configure Akka’s custom serialization binding. The custom serialization binding can be found in the configuration file cluster-application-base.conf where we specify Akka’s new built in serializer as “jackson-json”.

Next we create a logical separation between commands and queries using the base traits ArtifactCommand and ArtifactQuery respectively, which are used to break out the respective command and query APIs, where commands cause changes to state, and queries are “read only”.

Next we define all possible responses for both commands and queries:

  // responses
  final case class Okay(okay: String = "OK") extends ArtifactResponse
  final case class ArtifactReadByUser(artifactRead: Boolean) extends ArtifactResponse
  final case class ArtifactInUserFeed(artifactInUserFeed: Boolean) extends ArtifactResponse
  final case class AllStates(artifactRead: Boolean, artifactInUserFeed: Boolean) extends ArtifactResponse

Next we define the events that are persisted as part of ES journal:

 sealed trait ArtifactEvent extends EventSerializeMarker
  final case class ArtifactRead() extends ArtifactEvent
  final case class ArtifactAddedToUserFeed() extends ArtifactEvent
  final case class ArtifactRemovedFromUserFeed() extends ArtifactEvent

Like commands, we’re using a trait, EventSerializeMarker, to identify the proper Akka serialization to use during persistence.

Next, we define our actor’s state represented in a single case class:

 
 final case class CurrState(artifactRead: Boolean = false, artifactInUserFeed : Boolean = false) extends MsgSerializeMarker
 

Note: since our state only contains booleans, our persisted events don’t contain any additional data. Normally, your events would also contain the new data that’s been received via a command, which causes a change in state, so that state can be accurately recreated during a journal replay as an actor is reconstituted.

Our entity actor only has one state, but it’s not uncommon to have more than one state. For example, if you were implementing a finite state machine, you’d likely have two or more different states, but only one would be in effect at any given time.

Next, we define our actor’s behavior via the apply method:

 
 def apply(entityId: String): Behavior[ArtifactCommand] =
    EventSourcedBehavior[ArtifactCommand, ArtifactEvent, CurrState](
      persistenceId = PersistenceId(ArtifactStatesShardName, entityId),
      emptyState = CurrState(),
      commandHandler,
      eventHandler)
 

The apply method is passed the entities unique ID, and returns a behavior of type ArtifactCommand that uses the function EventSourcedBehavior to define the type of commands that can be accepted, the type of our events, and what our current state is. We also provide the initial state when an entity is first created, and a command handler, and finally an event handler.

The command handler is a function with two parameters, the current State and the incoming Command, and returns an Effect. It’s common to validate any commands received before accepting them, however in this case we accept all commands without exception:

 private val commandHandler: (CurrState, ArtifactCommand) => Effect[ArtifactEvent, CurrState] = { (state, command) =>
    command match {
      case SetArtifactRead (replyTo, _, _) => artifactRead(replyTo, state)
      case SetArtifactAddedToUserFeed (replyTo, _, _) => artifactAddedToUserFeed(replyTo, state)
      case SetArtifactRemovedFromUserFeed (replyTo, _, _) => artifactRemovedFromUserFeed(replyTo, state)
 
      case IsArtifactReadByUser (replyTo, _, _) => getArtifactRead(replyTo, state)
      case IsArtifactInUserFeed (replyTo, _, _) => getAritfactInFeed (replyTo, state)
      case GetAllStates (replyTo, _, _) => getArtifactState (replyTo, state)
    }
  }
 
  private def artifactRead(replyTo: ActorRef[Okay], currState: CurrState): Effect[ArtifactEvent, CurrState] = {
    Effect.persist(ArtifactRead()).thenRun(_ => replyTo ! Okay())
  }
 
  private def artifactAddedToUserFeed(replyTo: ActorRef[Okay], currState: CurrState): Effect[ArtifactEvent, CurrState] = {
    Effect.persist(ArtifactAddedToUserFeed()).thenRun(_ => replyTo ! Okay())
  }
 
  private def artifactRemovedFromUserFeed(replyTo: ActorRef[Okay], currState: CurrState): Effect[ArtifactEvent, CurrState] = {
    Effect.persist(ArtifactRemovedFromUserFeed()).thenRun(_ => replyTo ! Okay())
  }
 
  private def getArtifactRead(replyTo: ActorRef[ArtifactReadByUser], currState: CurrState): Effect[ArtifactEvent, CurrState] = {
    replyTo ! ArtifactReadByUser(currState.artifactRead)
    Effect.none
  }
 
  private def getAritfactInFeed(replyTo: ActorRef[ArtifactInUserFeed], currState: CurrState): Effect[ArtifactEvent, CurrState] = {
    replyTo ! ArtifactInUserFeed(currState.artifactInUserFeed)
    Effect.none
  }
 
  private def getArtifactState(replyTo: ActorRef[AllStates], currState: CurrState): Effect[ArtifactEvent, CurrState] = {
    replyTo ! AllStates(currState.artifactRead, currState.artifactInUserFeed)
    Effect.none
  }

Since we only have one state we can ignore the incoming state when processing commands. Commands that cause changes to state have at least two stages to Effect: first, the event must be successfully persisted and the state is then updated by the “event handler”, second a response can optionally be sent. For queries, commands that don’t change state, a response is sent, and the Effect.none is returned.

The event handler is a function that’s passed an event after its been successfully persisted and the new state is created by applying the event and returning the (new) current state:

 
private val eventHandler: (CurrState, ArtifactEvent) => CurrState = { (state, event) =>
    event match {
      case ArtifactRead() =>
        CurrState(artifactRead = true, artifactInUserFeed = state.artifactInUserFeed)
 
      case ArtifactAddedToUserFeed() =>
        CurrState(state.artifactRead, artifactInUserFeed = true)
 
      case ArtifactRemovedFromUserFeed() =>
        CurrState(state.artifactRead)
 
      case _ => throw new IllegalStateException(s"unexpected event [$event] in state [$state]")
    }
 }
 

The event handler is also used when the entity is started up to recover its state from the stored events so it should never perform any side effects.

So that’s it, that’s the entirety of the implementation of our Akka Persistent Entity.

Endpoints

Now we’re going to do a deep dive into the implementation of our endpoint, which relies on Akka HTTP and its routing DSL. Before we dive into the routing DSL let’s revisit our API:

Artifact / User Read


curl -d '{"artifactId":1, "userId":"Michael"}' -H "Content-Type: application/json" -X POST http://localhost:8082/artifactState/setArtifactReadByUser
curl -d '{"artifactId":1, "userId":"Michael"}' -H "Content-Type: application/json" -X POST http://localhost:8082/artifactState/isArtifactReadByUser


Artifact / User Feed


curl -d '{"artifactId":1, "userId":"Michael"}' -H "Content-Type: application/json" -X POST http://localhost:8082/artifactState/setArtifactAddedToUserFeed
curl -d '{"artifactId":1, "userId":"Michael"}' -H "Content-Type: application/json" -X POST http://localhost:8082/artifactState/isArtifactInUserFeed
curl -d '{"artifactId":1, "userId":"Michael"}' -H "Content-Type: application/json" -X POST http://localhost:8082/artifactState/setArtifactRemovedFromUserFeed
 

Query All States


curl -d '{"artifactId":1, "userId":"Michael"}' -H "Content-Type: application/json" -X POST http://localhost:8082/artifactState/getAllStates
 

We start our DSL with a pathPrefix of “artifactState” to assist in documenting our API. Also, if we ever needed to share the same endpoint IP address with another microservice. Reviewing the DSL below, notice that we support both GET and POST, for our queries, but only POST for our commands that cause state changes:

 
 lazy val psRoutes: Route =
    pathPrefix("artifactState") {
      concat(
        // QUERIES:
        pathPrefix("isArtifactReadByUser") {
            concat(
              get {
                parameters(("artifactId".as[Long], "userId")) { (artifactId, userId) =>
                  complete {
                    queryArtifactRead(ArtifactAndUser(artifactId, userId))
                  }
                }
              },
              post {
                entity(as[ArtifactAndUser]) { req =>
                  complete(StatusCodes.OK, queryArtifactRead(req))
                }
              })
        },
        pathPrefix("isArtifactInUserFeed") {
          concat(
            get {
              parameters((("artifactId").as[Long], "userId")) { (artifactId, userId) =>
                val req = ArtifactAndUser(artifactId, userId)
                complete(queryArtifactInUserFeed(req))
              }
            },
            post {
              entity(as[ArtifactAndUser]) { req =>
                complete(StatusCodes.OK, queryArtifactInUserFeed(req))
              }
            })
        },
        pathPrefix("getAllStates") {
          concat(
            get {
              parameters(("artifactId".as[Long], "userId")) { (artifactId, userId) =>
                val req = ArtifactAndUser(artifactId, userId)
                complete(queryAllStates(req))
              }
            },
            post {
              entity(as[ArtifactAndUser]) { req =>
                complete(StatusCodes.OK, queryAllStates(req))
              }
            })
        },
 
        // COMMANDS:
        pathPrefix("setArtifactReadByUser") {
          post {
            entity(as[ArtifactAndUser]) { req =>
              complete {
                cmdArtifactRead(req)
              }
            }
          }
        },
        pathPrefix("setArtifactAddedToUserFeed") {
          post {
            entity(as[ArtifactAndUser]) { req =>
              complete {
                cmdArtifactAddedToUserFeed(req)
              }
            }
          }
        },
        pathPrefix("setArtifactRemovedFromUserFeed") {
          post {
            entity(as[ArtifactAndUser]) { req =>
              complete {
                cmdArtifactRemovedFromUserFeed(req)
              }
            }
          }
        })
    }
 

For GETs, we’re able to extract the parameters directly from the URL and place them into the fields artifactId, and userId. For POSTs, we’re relying on marshaling of JSON through the following setup files:

com.lightbend.artifactstate.endpoint.ArtifactStatePocAPI


// these are just for the JSON formats/external protocol/api
object ArtifactStatePocAPI {
 
  final case class ArtifactAndUser(artifactId: Long, userId: String)
 
  sealed trait ExtResponses
  final case class ExtResponse(artifactId: Long, userId: String, answer: Option[Boolean], failureMsg: Option[String]) extends ExtResponses
  final case class AllStatesResponse(
                                      artifactId: Long,
                                      userId: String,
                                      artifactRead: Option[Boolean],
                                      artifactInUserFeed: Option[Boolean],
                                      failureMsg: Option[String]) extends ExtResponses
  final case class CommandResponse(success: Boolean) extends ExtResponses
 
}


ArtifactStatePocAPI defines case classes used in the API for making requests to the internal cluster, as well wrapping external responses before converting to JSON.

com.lightbend.artifactstate.endpoint.JsonSupport

 
object JsonFormats  {
  // import the default encoders for primitive types (Int, String, Lists etc)
  import DefaultJsonProtocol._
 
  implicit val userJsonFormat = jsonFormat2(ArtifactAndUser)
  implicit val psResponse = jsonFormat4(ExtResponse)
  implicit val psResponseII = jsonFormat5(AllStatesResponse)
  implicit val cmdResponse = jsonFormat1(CommandResponse)
 
}


JsonSupport provides implicit conversions for marshaling to and fro case classes and JSON.

Let’s take a look at the request response cycle for a query:

 
  def handleResponse(req: ArtifactAndUser, f: Future[ArtifactResponse]): Future[ExtResponse] = {
    f.map {
      case ArtifactReadByUser(artifactRead) =>
        ExtResponse(req.artifactId, req.userId, Some(artifactRead), None)
      case ArtifactInUserFeed(artifactInUserFeed) =>
        ExtResponse(req.artifactId, req.userId, Some(artifactInUserFeed), None)
      case _ =>
        ExtResponse(req.artifactId, req.userId, None, Some("Internal Query Error: this shouldn't happen."))
    }
  }
 
  def queryArtifactRead(req: ArtifactAndUser): Future[ExtResponse] = {
    val result = psCommandActor.ask { ref : ActorRef[ArtifactResponse] =>
      ShardingEnvelope(req.artifactId + req.userId, IsArtifactReadByUser(ref, req.artifactId, req.userId))
    }
    handleResponse(req, result)
}
 

In this example the queryArtifactRead method is called by the routing DSL and is used to ask if the particular artifact has been read by a user. The ShardingEnvelope is used to wrap the request, so the internal cluster can find and route the request to the appropriate Node in the cluster. It’s worth noting here that the ref is a temporary actor used to capture the response and sent as replyTo field of the IsArtifactReadByUser request. The result, which is a future, is then passed to the utility function handleResponse to send the response back to the requesting client.

The request / response cycle for commands follows the same pattern as the query above:

 
  def handleCmdResponse(req: ArtifactAndUser, f: Future[ArtifactResponse]): Future[CommandResponse] = {
    f.map {
      case Okay(_) => CommandResponse(true)
      case _ =>
        system.log.error("Internal Command Error: this shouldn't happen.")
        CommandResponse(false)
    }.recover {
      case ex: Exception =>
        system.log.error(ex.getMessage, ex)
        ex.getMessage
        CommandResponse(false)
    }
  }
 
  def cmdArtifactRead(req: ArtifactAndUser): Future[CommandResponse] = {
    val result = psCommandActor.ask { ref : ActorRef[ArtifactResponse] =>
      ShardingEnvelope(req.artifactId + req.userId, SetArtifactRead(ref, req.artifactId, req.userId))
    }
    handleCmdResponse(req, result)
  }
 

That covers our dive into our Akka Routing DSL implementation.

Commercial Resilience Enhancements

As we mentioned in the first installment, this project takes advantage of Lightbend’s Commercial Resilience Enhancements, which come along with a Lightbend subscription.

We’ve already taken a look at Lightbend Telemetry for monitoring in previous installments. Next, we’re going to take a closer look at the other enhancements that are included in our PoC.

Akka Split Brain Resolver

Akka Split Brain Resolver is probably one of the most important enhancements for Akka and should be considered mandatory when deploying Akka Cluster to production environments.

When operating an Akka Cluster you must consider how to handle network partitions (a.k.a. split brain scenarios) and machine crashes (including JVM and hardware failures). This is crucial for correct behavior if you use Cluster Singleton or Cluster Sharding, especially together with Akka Persistence because split brains will happen. Guaranteed.

The causes of split brain scenarios are numerous and difficult to deal with properly. But ultimately, some (or all) nodes need to be shut down to avoid corruption of your data. The strategies for deciding which nodes to shut down are complicated and prone to error. However, Akka Split Brain Resolver provides a number of predefined strategies to choose from that best fit the characteristics of your system. In the case of our PoC, we’ve chosen the easy to understand strategy of keeping the majority (keep-majority) number of nodes that still see each other running while the others are shut down. This is accomplished by the following configuration:

cluster-application-base.conf

akka {
  ...


  cluster {
    ...
    downing-provider-class = "com.lightbend.akka.sbr.SplitBrainResolverProvider"
    split-brain-resolver.active-strategy=keep-majority
    split-brain-resolver.keep-majority {
      # if the 'role' is defined the decision is based only on members with that 'role'
      role = "sharded"
    }
  }
}

If you’re deploying to K8s we’d recommend taking a look at the Kubernetes Lease (lease-majority) strategy for configurating Akka Split Brain Resolver. While this strategy does require some additional resources, which is beyond the scope of this article, this appears to be a very safe and reliable approach.

Akka Thread Starvation Detector

The Akka Thread Starvation Detector is a diagnostic tool that monitors the dispatcher of an ActorSystem and logs a warning if the dispatcher becomes unresponsive. The Starvation Detector is automatically run when the ActorSystem is started but can be used with other Akka execution contexts (ExecutionContext). For example, if you’ve set up another execution context to assist in isolating blocking threads, you can configure the Starvation Detector to monitor that too. It’s interesting to note that Akka’s ExecutionContext is designed to replace existing thread pools provided by Scala. So if you’re using a separate Scala thread pool for thread isolation, you can replace it with Akka’s ExecutionContext and then configure Starvation Detector to monitor it.

It’s also worth noting that Lightbend Telemetry can also instrument Akka’s dispatchers.

Akka Configuration Checker

Akka comes with a massive amount of configuration settings that can be tweaked. It can be difficult to know which knobs to turn and which to leave alone. Finding correct values and appropriate relations between different settings may seem like a black art. In many cases incorrect configuration values contribute to terrible stability and bad performance.

The Config Checker is run automatically when the ActorSystem is started provided its dependency is included in your build. Then, simply check the Config Checker’s output in your Akka logs.

Akka Diagnostics Recorder

The Akka Diagnostics Recorder writes configuration and system information to a file that can be attached to your Lightbend support cases. It also registers a MBean in the “akka” name space, which can be accessed from a JMX console such as JConsole. From JMX you can trigger thread dumps that will also be appended to the file. This information helps us at Lightbend to give you the best support possible.

By default, the file is saved in a directory called akka-diagnostics within the current directory. This can be changed with the following configuration:

 
akka.diagnostics.recorder.dir = "target/diagnostics/"
 

The report file name contains the name of the actor system name and remote address if any.

For example, if you want to capture the Akka diagnostics file from a K8s node pod, you need a pod name. From your terminal window enter the following command:


$ kubectl get pods
NAME                            READY   STATUS    RESTARTS   AGE
cassandra-db-58cb5695fb-6jbxj   1/1     Running   1          7d2h
endpoint-6df5c4448b-n9cd7       1/1     Running   1          7d2h
node-54465bb85d-4jvdg           1/1     Running   1          7d2h
node-54465bb85d-6jz64           1/1     Running   1          7d2h
node-54465bb85d-z56nb           1/1     Running   1          7d2h

Next, pick one of the nodes, and use the following command to copy the akka-diagnostics directory to your own /tmp directory. For example, in your terminal window enter the following:


$ kubectl cp node-54465bb85d-4jvdg:/opt/docker/akka-diagnostics/ /tmp/akka-diagnostics/

Now, your /tmp/akka-diagnostics/ directory contains the Akka diagnostics file that can be attached to your corresponding support ticket with Lightbend.


$ ls -l /tmp/akka-diagnostics/
total 32
-rw-rw-r-- 1 michael michael 26345 Jan 29 10:40 Diagnostics-ArtifactStateCluster-172.17.0.9-2552.json
-rw-rw-r-- 1 michael michael   223 Jan 29 10:40 readme.txt

For more information on Akka Diagnostics Recorder, please refer to the documentation.

Conclusion

Building, testing, containerizing, deploying, and monitoring distributed microservices is time consuming and difficult, but using the tools we highlighted from Lightbend in this series illustrated how you too can build highly scalable and distributed applications faster, easier, and correctly.

We started this series by walking you through building, testing, and running the PoC locally, with instrumentation and monitoring wired in from the very beginning using Lightbend Telemetry. Next, we walked you through deploying to Docker and ultimately Kubernetes (on Minikube), finishing with a deep dive into the source code. 

We hope you've learned something useful in this series, and if you'd like to set up a demo or speak to someone about using Akka Cluster in your organization, click below to get started in that direction:

SEE AKKA CLUSTER IN ACTION

 

The Total Economic Impact™
Of Lightbend Akka

  • 139% ROI
  • 50% to 75% faster time-to-market
  • 20x increase in developer throughput
  • <6 months Akka pays for itself