In the introductory post of this short series, How To Serve Machine Learning Models With Dynamically Controlled Streams, I described how dynamically controlled streams is a very powerful pattern for implementing streaming applications. While this can be implemented using different streaming engines and frameworks, making the right technology choice is based on many factors, including internal knowledge of frameworks/engines, enterprise architectural standards, and other considerations.
In this post, I demonstrate with code how dynamically controlled streams can be implemented leveraging Akka Streams, and the main properties of such implementation. You can also click on the technology you’re using for my other examples leveraging Kafka Streams, Apache Flink, and Apache Spark.
When implementing dynamically controlled streams using Akka Streams, the first thing that we need to consider is that Akka Streams is a stateless stream processing framework, while dynamically controlled streams implementation requires state. Fortunately Akka Streams provides seamless integration with actors, which is a preferred way1 to implement stateful (and dynamically controlled) streams. For our simple example, we can use two actors:
For our implementation we decided to use Akka Typed - a latest type safe version of Akka2. The Akka Typed project expresses these actors and their interactions using behaviors and addresses. Messages can be sent to an address and behind this façade there is a behavior that receives the message and acts upon it.
An implementation starts from defining messages used by actors:
// Controller
trait ControllerActorTyped
case class SensorDataRequest(reply: ActorRef[MayBeHeaterControl], record : SensorData) extends ControllerActorTyped with ControllerManagerActorTyped
case class TemperatureSetting(reply: ActorRef[Done], record : TemperatureControl) extends ControllerActorTyped with ControllerManagerActorTyped
// Controller manager
trait ControllerManagerActorTyped
// Control
case class MayBeHeaterControl(sensorID : Option[Int], command : Int)
Here we define two traits - ControllerActorTyped and ControllerManagerActorTyped that define the identities of our classes and the actual messages, which extend those traits, thus being eligible as messages for our classes. The case class MayBeHeaterControl is an additional message used by the actors. With the messages in place, the implementation of the controller actor’s behavior (we are using object oriented style of actors implementation)
class ControllerBehaviour (context: ActorContext[ControllerActorTyped], sensorID : Int) extends AbstractBehavior[ControllerActorTyped] {
println(s"Creating controller for sensor $sensorID")
private var currentSetting: Option[TemperatureControl] = None
private var previousCommand = -1
override def onMessage(msg: ControllerActorTyped): Behavior[ControllerActorTyped] = {
msg match {
case setting: TemperatureSetting => // change temperature settings
println(s"New temperature settings ${setting.record}")
currentSetting = Some(setting.record)
setting.reply ! Done
case sensor: SensorDataRequest => // process sensor reading
currentSetting match {
case Some(setting) => // We are controlling
(if(sensor.record.temperature > (setting.desired + setting.upDelta)) 1
else if(sensor.record.temperature < (setting.desired - setting.downDelta)) 0 else -1)match {
case value if(value < 0) => sensor.reply ! MayBeHeaterControl(None, 0)
case _ =>
if(previousCommand != action) {
previousCommand = action
sensor.reply ! MayBeHeaterControl(Some(sensor.record.sensorID), action)
else sensor.reply ! MayBeHeaterControl(None, 0)
case _ => // No control
sensor.reply ! MayBeHeaterControl(None, 0)
this
}
The implementation is quite similar to untyped with the following main differences:
This implementation is also using optimization to minimize outgoing traffic. It only submits heater control command if it has change, thus avoiding sending the same (redundant) information every time sensor information is recieved
Implementation of controller manager actor’s behaviour is presented below:
class ControllerManagerBehavior (context: ActorContext[ControllerManagerActorTyped]) extends AbstractBehavior[ControllerManagerActorTyped] {
// Get (maybe) controller actor based on sensorID
private def getSensorActorMayBe(sensorID : Int): Option[ActorRef[ControllerActorTyped]] = {
val res = context.child(sensorID.toString)
if(res.isDefined)
Some(res.get.asInstanceOf[ActorRef[ControllerActorTyped]])
else
None
// Get (or create) controller actor based on sensorID
private def getSensorActor(sensorID : Int): ActorRef[ControllerActorTyped] = {
getSensorActorMayBe(sensorID) match {
case Some(actorRef) => actorRef
case _ => context.spawn(Behaviors.setup[ControllerActorTyped](context =>
new ControllerBehaviour(context, sensorID)), sensorID.toString)
override def onMessage(msg: ControllerManagerActorTyped): Behavior[ControllerManagerActorTyped] = {
msg match {
case setting: TemperatureSetting => // change temperature settings
getSensorActor(setting.record.sensorID) tell setting
case sensor: SensorDataRequest => // process sensor reading
getSensorActorMayBe(sensor.record.sensorID) match {
case Some(actorRef) => actorRef tell sensor
case _ => sensor.reply ! MayBeHeaterControl(None, 0)
this
}
As we mentioned before, in addition to being a router, this actor is also responsible for lifecycle of controller actors. As you can see from the code, creation of the typed actor is very different from creation of untyped one. Another difference is that context.child() returns a typed actor reference, which means that generic operations on it have to be done carefully. Finally AkkaTemperatureController object implements dynamically controlled streams leveraging actors described above by leveraging Akka Streams
object AkkaTemperatureController {
// Initialization
implicit val controllerManager = ActorSystem(
Behaviors.setup[ControllerManagerActorTyped](
context => new ControllerManagerBehavior(context)), "ControllerManager")
implicit val materializer = ActorMaterializer()
implicit val executionContext = controllerManager.executionContext
implicit val askTimeout = Timeout(30.seconds)
// Sources
val sensorSettings = ConsumerSettings(controllerManager.toUntyped, new ByteArrayDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(kafkaConfig.brokers)
.withGroupId(kafkaConfig.heateroutputgroup)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val controlSettings = ConsumerSettings(controllerManager.toUntyped, new ByteArrayDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(kafkaConfig.brokers)
.withGroupId(kafkaConfig.temperaturesetgroup)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// Sink
val heaterSettings =
ProducerSettings(controllerManager.settings.config.getConfig("akka.kafka.producer"), new ByteArraySerializer, new ByteArraySerializer)
.withBootstrapServers(kafkaConfig.brokers)
def main(args: Array[String]): Unit = {
// Control stream processing
Consumer.atMostOnceSource(controlSettings, Subscriptions.topics(kafkaConfig.temperaturesettopic))
.map(record => DataTransformer.controlFromByteArray(record.value)).collect { case Success(a) => a }
.via(ActorFlow.ask(1)(controllerManager)((elem, replyTo : ActorRef[Done]) =>
new TemperatureSetting(replyTo, elem)))
.runWith(Sink.ignore) // run the stream, we do not read the results directly
// Sensor stream processing
Consumer.atMostOnceSource(sensorSettings, Subscriptions.topics(kafkaConfig.heateroutputtopic))
.map(record => DataTransformer.sensorFromByteArray(record.value)).collect { case Success(a) => a }
.via(ActorFlow.ask(1)(controllerManager)((elem, replyTo : ActorRef[MayBeHeaterControl]) =>
new SensorDataRequest(replyTo, elem)))
.map(result => {
result.sensorID match {
case Some(value) =>
println(s"sending new control ${result.command} for sensor $value")
Some(HeaterControl(value, HeaterCommand.fromValue(result.command)))
case _ =>
None
})
.filter(_.isDefined)
.map(value => new ProducerRecord[Array[Byte], Array[Byte]](kafkaConfig.heaterinputtopic, DataTransformer.toByteArray(value.get)))
.runWith(Producer.plainSink(heaterSettings))
}
Here we are using Akka Streams typed and as a result, ActorSystem creation looks slightly different compared to the untyped case. The creation of ActorSystem, in this case, also creates a controller manager behaviour.
As we can see from the code above, there are independent stream processing pipelines here:
An important feature of Akka streams is that it implements Reactive stream processing. One of the most important features of the reactive systems is resiliency or fault tolerance, which in Akka is implemented through supervision which describes a dependency relationship between actors: the supervisor delegates tasks to subordinates and therefore must respond to their failures. When a subordinate detects a failure (i.e. throws an exception), it suspends itself and all its subordinates and sends a message to its supervisor, signaling failure.
For a complete example of supervision implementation, consult Akka typed documentation. Supervision works for the actors, but not for Akka Streams implementation, which support a few alternative recovery strategies.
In addition to this, if deployed on Kubernetes, usage of Deployments can further improve resiliency. In this case, Kubernetes will watch pod execution and restart it if it fails. In addition to the resiliency of the code itself, it is also necessary to ensure that in the case of restart, the application will restore its state. Because the state is kept in the actors, adding akka persistence can easily solve the problem. Alternatively, a simplified version of the state management can be implemented similarly to this implementation.
Additional consideration for dynamically controlled stream is scalability, especially considering that Akka streams implementation is running in a single JVM. In the case of usage Kafka as an input, scaling of above can be done as follows:
Assuming that all the sensors are sending their information with the same frequency and key (sensorID) distribution is even4 all the instances will get about the same amount of messages, which means, that by increasing the amount of partitions on corresponding kafka topics and the amount of implementation instances, it is possible to easily scale the implementation described above5.
In the next post we will examine how the pattern can be implemented using Kafka Streams.
Last year, I wrote an O'Reilly eBook titled Serving Machine Learning Models: A Guide to Architecture, Stream Processing Engines, and Frameworks, which I encourage you to download for additional learning. I hope you enjoy it!
1 See our post on Managing Streaming State & Queryable State. ↩
2 This library is still under heavy development and may change. The code presented here is based on the version 2.5.21. ↩
3 As we mentioned above, the type of the actor is defined by the message that it accepts. ↩
4 If the keys distribution is skewed, it can be improved by implementing custom partitioner. ↩
5 The important prerequisite of scaling dynamically controlled stream using Kafka partitions is the requirement that state identities (Actor’s identities in this case) are the same that message identities (message keys). If this is the case, the state will migrate (assuming that it is persistent) along with the messages to new servers. If this is not the case, it is necessary to use Akka cluster containing all the Actors. This will make overall implementation a little bit more complex and will lead to additional latencies due to remote access to Actors. ↩