In the introductory post of this short series, How To Serve Machine Learning Models With Dynamically Controlled Streams, I described how dynamically controlled streams is a very powerful pattern for implementing streaming applications. While this can be implemented using different streaming engines and frameworks, making the right technology choice is based on many factors, including internal knowledge of frameworks/engines, enterprise architectural standards, and other considerations.
In this post, I demonstrate with code how dynamically controlled streams can be implemented leveraging Kafka Streams, and the main properties of such implementation. You can also click on the technology you’re using for my other examples leveraging Akka Streams, Apache Flink, and Apache Spark.
Kafka streams provides State Store, which is a preferred way to implement stateful (and dynamically controlled) streams in the case of Kafka Streams implementation. For our implementation we are going to use a single (key/value) store containing an instance of TemperatureControl record for every SensorID known to the system.
With this, the basic Kafka Streams implementation for dynamically controlled stream looks like follows:
object KafkaTemperatureController {
private val port = 8888 // Port for queryable state
implicit val heaterControlSerde : Serde[HeaterControl] = new HeaterControlSerde
def main(args: Array[String]): Unit = {
val streamsConfiguration = new Properties
// Give the Streams application a unique name. The name must be unique in the Kafka cluster
// against which the application is run.
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
"dynamically-controlled-streams")
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, kafkaConfig.heatersourcegroup)
// Where to find Kafka broker(s).
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.brokers)
// Provide the details of our embedded http service that we'll use to connect to this streams
// instance and discover locations of stores.
streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "127.0.0.1:" + port)
// Default serdes
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.ByteArray.getClass)
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.ByteArray.getClass)
// Store definition
val logConfig = new HashMap[String, String]
val storeSupplier = Stores.inMemoryKeyValueStore("TemperatureController")
val storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.Integer,
new TemperatureControlSerde).withLoggingEnabled(logConfig)
// Create Stream builder
val builder = new StreamsBuilder
// Data input streams
val sensor = builder.stream[Array[Byte], Array[Byte]](kafkaConfig.heateroutputtopic)
val control = builder.stream[Array[Byte], Array[Byte]](kafkaConfig.temperaturesettopic)
// DataStore
builder.addStateStore(storeBuilder)
// Control stream processing
control
.mapValues(value => DataTransformer.controlFromByteArray(value))
.filter((key, value) => value.isSuccess)
.process(() => new TemperatureControlProcessor, "TemperatureController")
// Sensor stream processing
sensor
.mapValues(value => DataTransformer.sensorFromByteArray(value))
.filter((key, value) => value.isSuccess)
.transform(new SensorDataTransformer, "TemperatureController")
.mapValues(value => {
value.sensorID match {
case Some(result) =>
println(s"sending new control ${value.command} for sensor $result")
Some(HeaterControl(value.command, HeaterCommand.fromValue(value.command)))
case _ =>
None
})
.filter((key, value) => value.isDefined).mapValues(v => DataTransformer.toByteArray(v.get))
.to(kafkaConfig.heaterinputtopic)
// Create and build topology
val topology = builder.build
println(topology.describe)
// Create streams
val streams = new KafkaStreams(topology, streamsConfiguration)
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
System.out.println("Uncaught exception on thread " + t + " " + e.toString)
})
// Start streams
streams.start()
// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
sys.addShutdownHook{
try {
streams.close()
} catch {
case NonFatal(e) => println(s"During streams.close(), received: $e")
}
This class has three main parts:
As mentioned above, we are using here two processors (compare to actors used for Akka streams implementation).
class TemperatureControlProcessor extends AbstractProcessor[Array[Byte], Try[TemperatureControl]] {
private var controlStore: KeyValueStore[Integer, TemperatureControl] = null
override def process (key: Array[Byte], temperatureControl: Try[TemperatureControl]): Unit = {
val control = temperatureControl.get
controlStore.put(control.sensorID, control)
override def init(context: ProcessorContext): Unit = {
controlStore = context.getStateStore("TemperatureController").asInstanceOf[KeyValueStore[Integer, TemperatureControl]]
Objects.requireNonNull(controlStore, "State store can't be null")
Temperature control processor presented above gets TemperatureControl message and inserts it into table, which is configured in the init method. Sensor Data transformer (below) is responsible for processing SensorData message. This transformation first tries to get a control information from the table (set by the init method). If this information does not exist, no control information is returned. Otherwise we calculate an output signal (heater control) and return it.
class SensorDataTransformer extends Transformer[Array[Byte], Try[SensorData], (Array[Byte], MayBeHeaterControl)]{
private var controlStore: KeyValueStore[Integer, TemperatureControl] = null
private val previousCommands = Map[Int, Int]()
override def transform(key: Array[Byte], dataRecord: Try[SensorData]): (Array[Byte], MayBeHeaterControl) = {
val sensor = dataRecord.get
controlStore.get(sensor.sensorID) match {
case setting if setting != null => // We are controlling
(if(sensor.temperature > (setting.desired + setting.upDelta)) 1
else if(sensor.temperature < (setting.desired - setting.downDelta)) 0 else -1) match {
case action if(action < 0) => (key, MayBeHeaterControl(None, 0))
case action =>
val previousCommand = previousCommands.getOrElse(sensor.sensorID, -1)
if(previousCommand != action) {
previousCommands += (sensor.sensorID -> action)
(key, MayBeHeaterControl(Some(sensor.sensorID), action))
else (key, MayBeHeaterControl(None, 0))
case _ => // No control
(key, MayBeHeaterControl(None, 0))
override def init(context: ProcessorContext): Unit = {
controlStore = context.getStateStore("TemperatureController")
.asInstanceOf[KeyValueStore[Integer, TemperatureControl]]
Objects.requireNonNull(controlStore, "State store can't be null")
override def close(): Unit = {}
}
Similar to the Akka Streams implementation we are trying here to minimize the amount of outputs, by comparing current control value with the previously submitted. This requires implementation to remember previous value of control signal. Because this information is not required for proper functioning, instead of using additional table, we just keep this information in memory, thus making overall implementation more lightweight.
Unlike Akka Streams, Kafka streams does not implement Reactive stream processing. This means that the only way to implement resilient Kafka Stream application is through external supervision of Kafka Stream processes (leveraging, for example, Kubernetes deployments). Because in our implementation we are leveraging State store, which is continuously backed up to a Kafka topic behind the scenes, application restart does not lead to a data loss. This increases overall resilience of the Kafka streams application, but it might add latency to application restart. Although a compacted changelog topic is used to back up the store, restore operation can take a noticeable time.
When it comes to scalability of Kafka streams applications it leverages the same approach as Akka Streams, which is based on scalability (through partitioning) of Kafka. The more partitions we can create for a given topic, the better we can scale (run more instances) Kafka Streams application2.
Both this and the previous post discussed dynamically controlled streams implementations using streaming libraries - Akka Streams and Kafka Streams. In the next two posts we will examine how the pattern can be implemented using streaming engines, starting with Apache Flink.
Last year, I wrote an O'Reilly eBook titled Serving Machine Learning Models: A Guide to Architecture, Stream Processing Engines, and Frameworks, which I encourage you to download for additional learning. I hope you enjoy it!