FREE Training Courses — 100+ Hours of Akka and Reactive Architecture Expertise Get Started
Support
machine-learning akka-streams

How To Use Akka Streams For Dynamically Controlled Streams

Boris Lublinsky Principal Architect, Lightbend, Inc.

Cloud-Native Design Techniques for Serving Machine Learning Models with Akka Streams

In the introductory post of this short series, How To Serve Machine Learning Models With Dynamically Controlled Streams, I described how dynamically controlled streams is a very powerful pattern for implementing streaming applications. While this can be implemented using different streaming engines and frameworks, making the right technology choice is based on many factors, including internal knowledge of frameworks/engines, enterprise architectural standards, and other considerations.

In this post, I demonstrate with code how dynamically controlled streams can be implemented leveraging Akka Streams, and the main properties of such implementation. You can also click on the technology you’re using for my other examples leveraging Kafka Streams, Apache Flink, and Apache Spark.


Dynamically Controlled Streams With Akka Streams

When implementing dynamically controlled streams using Akka Streams, the first thing that we need to consider is that Akka Streams is a stateless stream processing framework, while dynamically controlled streams implementation requires state. Fortunately Akka Streams provides seamless integration with actors, which is a preferred way1 to implement stateful (and dynamically controlled) streams. For our simple example, we can use two actors:

  1. Controller actor (one per sensor ID), which is responsible for both - storing of the stream state expressed as a desired temperature and calculating heater control based on the value of the desired temperature and a current temperature sensor reading. Usage of a separate actor for sensor ID simplifies implementation and provides better overall scalability - many actors can process sensor data simultaneously
  2. Controller manager actor - a router managing controllers actors lifecycle and delivering requests to controller actors with a corresponding ID

For our implementation we decided to use Akka Typed - a latest type safe version of Akka2. The Akka Typed project expresses these actors and their interactions using behaviors and addresses. Messages can be sent to an address and behind this façade there is a behavior that receives the message and acts upon it.

An implementation starts from defining messages used by actors:

// Controller
trait ControllerActorTyped
case class SensorDataRequest(reply: ActorRef[MayBeHeaterControl], record : SensorData) extends ControllerActorTyped with ControllerManagerActorTyped
case class TemperatureSetting(reply: ActorRef[Done], record : TemperatureControl) extends ControllerActorTyped with ControllerManagerActorTyped

// Controller manager
trait ControllerManagerActorTyped

// Control
case class MayBeHeaterControl(sensorID : Option[Int], command : Int)

Here we define two traits - ControllerActorTyped and ControllerManagerActorTyped that define the identities of our classes and the actual messages, which extend those traits, thus being eligible as messages for our classes. The case class MayBeHeaterControl is an additional message used by the actors. With the messages in place, the implementation of the controller actor’s behavior (we are using object oriented style of actors implementation)

class ControllerBehaviour (context: ActorContext[ControllerActorTyped], sensorID : Int) extends AbstractBehavior[ControllerActorTyped] {

 println(s"Creating controller for sensor $sensorID")

 private var currentSetting: Option[TemperatureControl] = None
 private var previousCommand = -1

 override def onMessage(msg: ControllerActorTyped): Behavior[ControllerActorTyped] = {
   msg match {
     case setting: TemperatureSetting => // change temperature settings
       println(s"New temperature settings ${setting.record}")
       currentSetting = Some(setting.record)
       setting.reply ! Done
     case sensor: SensorDataRequest =>   // process sensor reading
       currentSetting match {
         case Some(setting) => // We are controlling
           (if(sensor.record.temperature > (setting.desired + setting.upDelta)) 1
           else if(sensor.record.temperature < (setting.desired - setting.downDelta)) 0 else -1)match {
             case value if(value < 0) => sensor.reply ! MayBeHeaterControl(None, 0)
             case _ =>
               if(previousCommand != action) {
                 previousCommand = action
                 sensor.reply ! MayBeHeaterControl(Some(sensor.record.sensorID), action)
               
               else sensor.reply ! MayBeHeaterControl(None, 0)
           
         case _ => // No control
           sensor.reply ! MayBeHeaterControl(None, 0)
       
   
   this
 
}

The implementation is quite similar to untyped with the following main differences:

  • Instead of extending an Actor, in the case of typed Akka we implement behaviour extending AbstractBehavior[type]
  • The message is strongly typed within onMessage method
  • There is no sender - instead a replyTo has to be explicitly defined in the message itself

This implementation is also using optimization to minimize outgoing traffic. It only submits heater control command if it has change, thus avoiding sending the same (redundant) information every time sensor information is recieved

Implementation of controller manager actor’s behaviour is presented below:

class ControllerManagerBehavior (context: ActorContext[ControllerManagerActorTyped]) extends AbstractBehavior[ControllerManagerActorTyped] {

 // Get (maybe) controller actor based on sensorID
 private def getSensorActorMayBe(sensorID : Int): Option[ActorRef[ControllerActorTyped]] = {
   val res = context.child(sensorID.toString)
   if(res.isDefined)
     Some(res.get.asInstanceOf[ActorRef[ControllerActorTyped]])
   else
     None
 

 // Get (or create) controller actor based on sensorID
 private def getSensorActor(sensorID : Int): ActorRef[ControllerActorTyped] = {
   getSensorActorMayBe(sensorID) match {
     case Some(actorRef) => actorRef
     case _ => context.spawn(Behaviors.setup[ControllerActorTyped](context => 
new ControllerBehaviour(context, sensorID)), sensorID.toString)
   
 

 override def onMessage(msg: ControllerManagerActorTyped): Behavior[ControllerManagerActorTyped] = {
   msg match {
     case setting: TemperatureSetting => // change temperature settings
       getSensorActor(setting.record.sensorID) tell setting
     case sensor: SensorDataRequest =>   // process sensor reading
       getSensorActorMayBe(sensor.record.sensorID) match {
         case Some(actorRef) => actorRef tell sensor
         case _ => sensor.reply ! MayBeHeaterControl(None, 0)
       
   
   this
 
}

As we mentioned before, in addition to being a router, this actor is also responsible for lifecycle of controller actors. As you can see from the code, creation of the typed actor is very different from creation of untyped one. Another difference is that context.child() returns a typed actor reference, which means that generic operations on it have to be done carefully. Finally AkkaTemperatureController object implements dynamically controlled streams leveraging actors described above by leveraging Akka Streams

object AkkaTemperatureController {

 // Initialization
 implicit val controllerManager = ActorSystem(
   Behaviors.setup[ControllerManagerActorTyped](
     context => new ControllerManagerBehavior(context)), "ControllerManager")
 implicit val materializer = ActorMaterializer()
 implicit val executionContext = controllerManager.executionContext
 implicit val askTimeout = Timeout(30.seconds)

 // Sources
 val sensorSettings = ConsumerSettings(controllerManager.toUntyped, new ByteArrayDeserializer, new ByteArrayDeserializer)
   .withBootstrapServers(kafkaConfig.brokers)
   .withGroupId(kafkaConfig.heateroutputgroup)
   .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

 val controlSettings = ConsumerSettings(controllerManager.toUntyped, new ByteArrayDeserializer, new ByteArrayDeserializer)
   .withBootstrapServers(kafkaConfig.brokers)
   .withGroupId(kafkaConfig.temperaturesetgroup)
   .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

 // Sink
 val heaterSettings =
   ProducerSettings(controllerManager.settings.config.getConfig("akka.kafka.producer"), new ByteArraySerializer, new ByteArraySerializer)
     .withBootstrapServers(kafkaConfig.brokers)

 def main(args: Array[String]): Unit = {

   // Control stream processing
   Consumer.atMostOnceSource(controlSettings, Subscriptions.topics(kafkaConfig.temperaturesettopic))
     .map(record => DataTransformer.controlFromByteArray(record.value)).collect { case Success(a) => a }
     .via(ActorFlow.ask(1)(controllerManager)((elem, replyTo : ActorRef[Done]) => 
new TemperatureSetting(replyTo, elem)))
     .runWith(Sink.ignore) // run the stream, we do not read the results directly

   // Sensor stream processing
   Consumer.atMostOnceSource(sensorSettings, Subscriptions.topics(kafkaConfig.heateroutputtopic))
     .map(record => DataTransformer.sensorFromByteArray(record.value)).collect { case Success(a) => a }
     .via(ActorFlow.ask(1)(controllerManager)((elem, replyTo : ActorRef[MayBeHeaterControl]) => 
new SensorDataRequest(replyTo, elem)))
     .map(result => {
       result.sensorID match {
         case Some(value) =>
           println(s"sending new control ${result.command} for sensor $value")
           Some(HeaterControl(value, HeaterCommand.fromValue(result.command)))
         case _ =>
           None
       
     })
     .filter(_.isDefined)
     .map(value => new ProducerRecord[Array[Byte], Array[Byte]](kafkaConfig.heaterinputtopic, DataTransformer.toByteArray(value.get)))
     .runWith(Producer.plainSink(heaterSettings))
 
}

Here we are using Akka Streams typed and as a result, ActorSystem creation looks slightly different compared to the untyped case. The creation of ActorSystem, in this case, also creates a controller manager behaviour.

As we can see from the code above, there are independent stream processing pipelines here:

  • Control pipeline listens for temperature setting stream (from Kafka), transforming protobuf message to scala representation and updating the state of the corresponding controller actor. In this implementation a message could have been one way (we do not need the reply), but in order for stream graph to operate properly, the interaction has to be request/reply. Here we are using a special Akka message type Done, to signify that we do not need the content of the reply message. The other thing here, specific to Akka typed is specifying replyTo with the type based on the reply message3.
  • Sensor pipeline listens to sensor stream (from Kafka), transforming protobuf message to scala representation and using state of the corresponding controller actor to calculate control value. If the value is produced, it is written to an output Kafka queue to be delivered to the heater.

Scalability And Resilience Against Failures

An important feature of Akka streams is that it implements Reactive stream processing. One of the most important features of the reactive systems is resiliency or fault tolerance, which in Akka is implemented through supervision which describes a dependency relationship between actors: the supervisor delegates tasks to subordinates and therefore must respond to their failures. When a subordinate detects a failure (i.e. throws an exception), it suspends itself and all its subordinates and sends a message to its supervisor, signaling failure.

For a complete example of supervision implementation, consult Akka typed documentation. Supervision works for the actors, but not for Akka Streams implementation, which support a few alternative recovery strategies.

In addition to this, if deployed on Kubernetes, usage of Deployments can further improve resiliency. In this case, Kubernetes will watch pod execution and restart it if it fails. In addition to the resiliency of the code itself, it is also necessary to ensure that in the case of restart, the application will restore its state. Because the state is kept in the actors, adding akka persistence can easily solve the problem. Alternatively, a simplified version of the state management can be implemented similarly to this implementation.

Additional consideration for dynamically controlled stream is scalability, especially considering that Akka streams implementation is running in a single JVM. In the case of usage Kafka as an input, scaling of above can be done as follows:

  • Modify messages, described above, to move sensorID out of the message value and to the message key.
  • This allows to partition both sensor and control topics, thus allow to run as many instances of implementations as the amount of partitions in the topics.
  • If a standard kafka partitioner is used, all of the messages for a given key will always be delivered to the same partition, thus guaranteeing that they will be processed by the same instance in the order they were sent.

Assuming that all the sensors are sending their information with the same frequency and key (sensorID) distribution is even4 all the instances will get about the same amount of messages, which means, that by increasing the amount of partitions on corresponding kafka topics and the amount of implementation instances, it is possible to easily scale the implementation described above5.

In the next post we will examine how the pattern can be implemented using Kafka Streams.


Learn More About Machine Learning

Last year, I wrote an O'Reilly eBook titled Serving Machine Learning Models: A Guide to Architecture, Stream Processing Engines, and Frameworks, which I encourage you to download for additional learning. I hope you enjoy it!

GET THE FREE EBOOK

 


1 See our post on Managing Streaming State & Queryable State.

2 This library is still under heavy development and may change. The code presented here is based on the version 2.5.21.

3 As we mentioned above, the type of the actor is defined by the message that it accepts.

4 If the keys distribution is skewed, it can be improved by implementing custom partitioner.

5 The important prerequisite of scaling dynamically controlled stream using Kafka partitions is the requirement that state identities (Actor’s identities in this case) are the same that message identities (message keys). If this is the case, the state will migrate (assuming that it is persistent) along with the messages to new servers. If this is not the case, it is necessary to use Akka cluster containing all the Actors. This will make overall implementation a little bit more complex and will lead to additional latencies due to remote access to Actors.