Akka Serverless - Keep the data. Lose the database. Learn More.
machine-learning kafka-streams

How To Use Kafka Streams For Dynamically Controlled Streams

Boris Lublinsky Principal Architect, Lightbend, Inc.

Cloud-Native Design Techniques for Serving Machine Learning Models with Kafka Streams

In the introductory post of this short series, How To Serve Machine Learning Models With Dynamically Controlled Streams, I described how dynamically controlled streams is a very powerful pattern for implementing streaming applications. While this can be implemented using different streaming engines and frameworks, making the right technology choice is based on many factors, including internal knowledge of frameworks/engines, enterprise architectural standards, and other considerations.

In this post, I demonstrate with code how dynamically controlled streams can be implemented leveraging Kafka Streams, and the main properties of such implementation. You can also click on the technology you’re using for my other examples leveraging Akka Streams, Apache Flink, and Apache Spark.

Dynamically Controlled Streams With Kafka Streams

Kafka streams provides State Store, which is a preferred way to implement stateful (and dynamically controlled) streams in the case of Kafka Streams implementation. For our implementation we are going to use a single (key/value) store containing an instance of TemperatureControl record for every SensorID known to the system.

With this, the basic Kafka Streams implementation for dynamically controlled stream looks like follows:

object KafkaTemperatureController {

 private val port = 8888 // Port for queryable state
 implicit val heaterControlSerde : Serde[HeaterControl] = new HeaterControlSerde

 def main(args: Array[String]): Unit = {

   val streamsConfiguration = new Properties
   // Give the Streams application a unique name.  The name must be unique in the Kafka cluster
   // against which the application is run.
   streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, kafkaConfig.heatersourcegroup)
   // Where to find Kafka broker(s).
   streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.brokers)
   // Provide the details of our embedded http service that we'll use to connect to this streams
   // instance and discover locations of stores.
   streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "" + port)
   // Default serdes
   // Store definition
   val logConfig = new HashMap[String, String]
   val storeSupplier = Stores.inMemoryKeyValueStore("TemperatureController")
   val storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.Integer, 
new TemperatureControlSerde).withLoggingEnabled(logConfig)
   // Create Stream builder
   val builder = new StreamsBuilder
   // Data input streams
   val sensor  = builder.stream[Array[Byte], Array[Byte]](kafkaConfig.heateroutputtopic)
   val control  = builder.stream[Array[Byte], Array[Byte]](kafkaConfig.temperaturesettopic)

   // DataStore

   // Control stream processing
     .mapValues(value => DataTransformer.controlFromByteArray(value))
     .filter((key, value) => value.isSuccess)
     .process(() => new TemperatureControlProcessor, "TemperatureController")

   // Sensor stream processing
     .mapValues(value => DataTransformer.sensorFromByteArray(value))
     .filter((key, value) => value.isSuccess)
     .transform(new SensorDataTransformer, "TemperatureController")
     .mapValues(value => {
       value.sensorID match {
         case Some(result) =>
           println(s"sending new control ${value.command} for sensor $result")
           Some(HeaterControl(value.command, HeaterCommand.fromValue(value.command)))
         case _ =>
     .filter((key, value) => value.isDefined).mapValues(v => DataTransformer.toByteArray(v.get))

   // Create and build topology
   val topology = builder.build

   // Create streams
   val streams = new KafkaStreams(topology, streamsConfiguration)
   streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
     override def uncaughtException(t: Thread, e: Throwable): Unit = {
       System.out.println("Uncaught exception on thread " + t + " " + e.toString)

   // Start streams

   // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
     try {
     } catch {
       case NonFatal(e) => println(s"During streams.close(), received: $e")

This class has three main parts:

  • Configuration - population of streams configuration
  • Creation of kafka streams topology1. This is leveraging StreamsBuilder APIs. Here we first define and add to the pipeline our data store (Table) and two input streams - sensor and control streams. Here we are leveraging Kafka streams DSL for Scala to build this topology. We also leverage here Kafka streams DSL to invoke processor's APIs (process and transform).
  • Execution of the topology

As mentioned above, we are using here two processors (compare to actors used for Akka streams implementation).

class TemperatureControlProcessor extends AbstractProcessor[Array[Byte], Try[TemperatureControl]] {

 private var controlStore: KeyValueStore[Integer, TemperatureControl] = null

 override def process (key: Array[Byte], temperatureControl: Try[TemperatureControl]): Unit = {
   val control = temperatureControl.get
   controlStore.put(control.sensorID, control)

 override def init(context: ProcessorContext): Unit = {
   controlStore = context.getStateStore("TemperatureController").asInstanceOf[KeyValueStore[Integer, TemperatureControl]]
   Objects.requireNonNull(controlStore, "State store can't be null")

Temperature control processor presented above gets TemperatureControl message and inserts it into table, which is configured in the init method. Sensor Data transformer (below) is responsible for processing SensorData message. This transformation first tries to get a control information from the table (set by the init method). If this information does not exist, no control information is returned. Otherwise we calculate an output signal (heater control) and return it.

class SensorDataTransformer extends Transformer[Array[Byte], Try[SensorData], (Array[Byte], MayBeHeaterControl)]{

 private var controlStore: KeyValueStore[Integer, TemperatureControl] = null
 private val previousCommands = Map[Int, Int]()

 override def transform(key: Array[Byte], dataRecord: Try[SensorData]): (Array[Byte], MayBeHeaterControl) = {

   val sensor = dataRecord.get
   controlStore.get(sensor.sensorID) match {
     case setting if setting != null => // We are controlling
      (if(sensor.temperature > (setting.desired + setting.upDelta)) 1
         else if(sensor.temperature < (setting.desired - setting.downDelta)) 0 else -1) match {
         case action if(action < 0) => (key, MayBeHeaterControl(None, 0))
         case action =>
           val previousCommand = previousCommands.getOrElse(sensor.sensorID, -1)
           if(previousCommand != action) {
             previousCommands += (sensor.sensorID -> action)
             (key, MayBeHeaterControl(Some(sensor.sensorID), action))
           else (key, MayBeHeaterControl(None, 0))
     case _ => // No control
      (key, MayBeHeaterControl(None, 0))

 override def init(context: ProcessorContext): Unit = {
   controlStore = context.getStateStore("TemperatureController")
.asInstanceOf[KeyValueStore[Integer, TemperatureControl]]
   Objects.requireNonNull(controlStore, "State store can't be null")

 override def close(): Unit = {}

Similar to the Akka Streams implementation we are trying here to minimize the amount of outputs, by comparing current control value with the previously submitted. This requires implementation to remember previous value of control signal. Because this information is not required for proper functioning, instead of using additional table, we just keep this information in memory, thus making overall implementation more lightweight.

Scalability, Resilience, And Load Balancing

Unlike Akka Streams, Kafka streams does not implement Reactive stream processing. This means that the only way to implement resilient Kafka Stream application is through external supervision of Kafka Stream processes (leveraging, for example, Kubernetes deployments). Because in our implementation we are leveraging State store, which is continuously backed up to a Kafka topic behind the scenes, application restart does not lead to a data loss. This increases overall resilience of the Kafka streams application, but it might add latency to application restart. Although a compacted changelog topic is used to back up the store, restore operation can take a noticeable time.

When it comes to scalability of Kafka streams applications it leverages the same approach as Akka Streams, which is based on scalability (through partitioning) of Kafka. The more partitions we can create for a given topic, the better we can scale (run more instances) Kafka Streams application2.

Both this and the previous post discussed dynamically controlled streams implementations using streaming libraries - Akka Streams and Kafka Streams. In the next two posts we will examine how the pattern can be implemented using streaming engines, starting with Apache Flink.

Learn More About Machine Learning

Last year, I wrote an O'Reilly eBook titled Serving Machine Learning Models: A Guide to Architecture, Stream Processing Engines, and Frameworks, which I encourage you to download for additional learning. I hope you enjoy it!



1 Compare to Akka Streams topology, shown above

2 Similar to the case of Akka streams it is necessary to ensure that state identity (table key, in this case) is the same as the message identity (message key), which will allow state to migrate along with the messages.


The Total Economic Impact™
Of Lightbend Akka Platform

  • 139% ROI
  • 50% to 75% faster time-to-market
  • 20x increase in developer throughput
  • <6 months Akka Platform pays for itself