FREE Training Courses — 100+ Hours of Akka and Reactive Architecture Expertise Get Started
Support
machine-learning

How To Serve Machine Learning Models With Dynamically Controlled Streams

Boris Lublinsky Principal Architect, Lightbend, Inc.

Design Techniques for Serving Machine Learning Models in Cloud-Native Applications

In this and related blog posts, I will explain how dynamically controlled streams can be used for implementing streaming applications. While this can be implemented using different streaming engines and frameworks, making the right technology choice is based on many factors, including internal knowledge of frameworks/engines, enterprise architectural standards, and other considerations.

Lightbend Platform supports integration with various streaming engines, and its module Lightbend Pipelines orchestrates and simplifies operations of complex streaming data applications. In the following posts, I will demonstrate how to implement dynamically controlled streams with the following four technologies:

I suggest you read this post before continuing to others, as we will lay the groundwork for better understanding more specific examples with the technologies listed above.


Real Life Needs For Stateful, Dynamic Streams

A common problem in building streaming application is integration of stream processing systems with other data existing in an enterprise in the form of enterprise services. The examples of real life systems with such requirements include, but are not limited to:

  • Algorithmic trading - in addition to the price, streams require additional information about stocks themselves and price prediction models.
  • Monitoring a production line - in addition to the production line events, requires additional information about production requests, products requirements, etc.
  • Supply chain optimization - in addition to information about products’ requirements, requires information about suppliers and their catalogs.
  • Intrusion detection - in addition to network information, requires intrusion models.
  • Transport Management Systems - in addition to the information about vehicles, requires information about traffic in the area.
  • Sports analytics  - in addition to information about scores might, for example, require information about team history.
  • Predictive maintenance - in addition to information from product sensors, requires additional information containing predictive models, temperature conditions, etc.

This additional data can be either static (or semi static) or it can be dynamic. In the case of static data it can be just “cached”1 inside stream processing. Here we will consider only the case of dynamic read only data.

Traditional approaches to this problem use RPC-based service invocations from within stream processing. The advantages of such an approach are:

  • Simple integration with existing technologies and organizational processes
  • Easier to understand if you come from a non-streaming world

On the other hand, such an approach can suffer from the following drawbacks:

  • Worse latency, due to remote calls instead of local function calls
  • Coupling the availability, scalability, and latency/throughput of your streaming application with the SLAs of the RPC interface

An alternative that we discuss here is to use stateful stream processing, with the ability to update the state dynamically as the state of the services changes – i.e., dynamically controlled streams.

Overall architecture

The overall architecture is based on the Log-centric execution architecture introduced by Jay Kreps.

Although this architecture looks very similar to the enterprise service bus (ESB) pattern, it is really very different (due to the difference between queue and log). For example, “an enterprise service bus represents an environment designed to foster sophisticated interconnectivity between services. It establishes an intermediate layer of processing that can help overcome common problems associated with reliability, scalability, and communications disparity”. The log represents a centralized subsystem with a well defined API for adding and retrieving data.

This pattern is similar to Whiteboard pattern, and provides the similar main advantages:

  • Adding new producers and consumers is easy and does not require any changes to the working system
  • Any topics (log is organized in topics) can be consumed by any number of consumers in any way.
  • Every consumer can manage its own position in the topic that it wants to consume. This is a foundation for replaying log.
  • Implementation, debugging and maintenance is simplified because it reuses the log (standardization of both access APIs and messaging semantics). Implementing the same service using direct connection (even through the queue) is harder and cannot be reused.

In the case of a log-centric architecture, there is no interconnectivity between services. Any data changes are added to the log and are available to any consumers that have access to the log.

The log just stores data sequentially (time sequence) without any attempts to manage producers and consumers of data. Additionally logs allow replay of messages in cases where some of the consumers have to be replaced.

With the log centric execution architecture in place, a dynamically controlled streams architecture can be a better alternative to the RPC-based service invocation from within stream.

This architecture basically requires a stateful stream processing for the main data stream with the state being updatable by a second stream - state update stream. Both streams are read from the centralized data log containing all of the incoming data and updates from all of the services.

Pattern Definition

An enterprise has multiple services that are being built independently, with different languages and platforms, that need to be used in stream processing.

How can I use these services inside stream processing?

Have each service produce data (as it changes) to the “control” stream for the stream processing to consume and use as a “state” that can be used by the data stream to produce results.

There are several important decisions that you need to make applying dynamically controlled stream patterns:

  • It typically assumed that the control stream is much “slower” compared to the data stream, for example data stream can be on seconds/minutes rate, while control stream is on the rate of hours/days/month rate
  • It is also assumed that the volume of the state data is “manageable” by a streaming engine - it is not on a hundred TB scale.
  • In order to implement a reliable stream processing, data schemas and data schema validation have to be implemented as part of the stream processing

All of the modern stream processing frameworks and engines, including Akka Streams, Kafka Streams, Flink, and Spark can be used for implementing dynamically controlled streams. Here we will show very simplistic implementations (the complete code is available here), dropping many required details to show the base approaches and some tradeoffs. But before describing how to do the actual implementation, let’s first describe a simple example, that we are going to use throughout.

Air Conditioning: A Motivating Example (In Summer)

Throughout this post we will use a very simple example - temperature control. In a most simplistic case the implementation looks as follows:

  • There is a constant stream of temperature measurements from the sensor.
  • The thermostat settings are defined as a desired temperature Td.
  • When the temperature falls below Td - ∆t, a signal is sent to the heater to start.
  • When the temperature goes above Td + ∆t a signal is sent to the heater to stop.

In the sections below we will show how to implement this example, based on dynamically controlled stream pattern, leveraging different streaming frameworks and engines.

We assume here a very simple “heater model”, where temperature increases by 1 degree every N(configurable) minutes when the heater is on and decreases by 1 degree every M(configurable) minutes, when it is off. Temperature sensor provides reading every minute and thermostat setting, including both Td and ∆t arrive at any point.

The Model Implementation

For all of the examples below we first need to implement a heater model, based on our example definition. The implementation depends on several messages, which can be defined (using protocol buffers) as follows:

// Heater control
enum HeaterCommand{
   On = 0;
   Off = 1;
}

// Heater Control Message.
message HeaterControl {
   int32 sensorID = 1;
   HeaterCommand command = 2;
}

// Temperature Control Message
message TemperatureControl {
   int32 sensorID = 1;
   double desired = 2;
   double upDelta = 3;
   double downDelta = 4;
}

// Sensor data.
message SensorData {
   int32 sensorID = 1;
   double temperature = 2;
}

Because our implementation can support multiple sensors, sensor ID is included with each message.

The code below provides a fairly simple implementation. It assumes that temperature grows (or decreases) linearly, based on the value of control.

object Heater {

     var heaterOperation = 1
     val sensorID = 12345

     def main(args: Array[String]) {

     val brokers = kafkaConfig.brokers
     val temperatureUp = Duration(temperatureUpRate.rate)
     val temperatureDown = Duration(temperatureDownRate.rate)
     val timeInterval = Duration(sensorPublishInterval.interval)

    // Create kafka broker
    val kafka = KafkaLocalServer(true)
    kafka.start()
    println(s"Kafka Cluster created")

    // Create sender
    val sender = MessageSender[Array[Byte], Array[Byte]](kafkaConfig.brokers)

    // Start listener for control signal
    val listener = MessageListener(brokers, kafkaConfig.heaterinputtopic, 
kafkaConfig.heatersourcegroup, new ControlProcessor())
    listener.start()

    // Loop producing temperature reading
    val bos = new ByteArrayOutputStream
    var temperature = 42.0
    while(true){
     // Calculate output temperature
     heaterOperation match {
       case 0 => // Heater is on - increase temperature
         temperature = temperature + timeInterval.toMillis.toDouble/temperatureUp.toMillis.toDouble
       case _ => // Heater is off - decrease temperature
         temperature = temperature - timeInterval.toMillis.toDouble/temperatureDown.toMillis.toDouble
     
     // Send it to Kafka
     val sd = new SensorData(sensorID, temperature)
     bos.reset()
     sd.writeTo(bos)
     sender.writeValue(kafkaConfig.heateroutputtopic, bos.toByteArray)
     println(s"Send sensor data $sd")
     // Pause
     pause(timeInterval)
   
 

 def pause(timeInterval : Duration): Unit = Thread.sleep(timeInterval.toMillis)
}

This simple implementation is also listening on the control input, leveraging the following class

class ControlProcessor extends RecordProcessorTrait[Array[Byte], Array[Byte]] {

 import Heater._

 override def processRecord(record: ConsumerRecord[Array[Byte], Array[Byte]]): Unit = {
   try{
     heaterOperation = HeaterControl.parseFrom(record.value()).command.index
     println(s"Updated heater control to $heaterOperation")
   
   catch {
     case t: Throwable => println(s"Error reading heater control $t")
   
 
}

Which listens on a predefined queue and updates heater operation with the value of received control.

Finally, a little method, below, generates new control settings every five minutes:

def controltemperature(sender : MessageSender[Array[Byte], Array[Byte]]) : Future[Unit] = Future{
 var desired = 45.0
 val controlInterval = 10.minute
 val generator = new Random
 val bos = new ByteArrayOutputStream
 while(true) {
   val temperatureControl = TemperatureControl(sensorID, desired, 1.0, 1.0)
   bos.reset()
   temperatureControl.writeTo(bos)
   sender.writeValue(kafkaConfig.temperaturesettopic, bos.toByteArray)
   println(s"Send new temperature control $temperatureControl")
   desired = desired + (generator.nextInt(10) - 5)
   pause(controlInterval)
 
}

With this in place, we can proceed to the controlled streams implementations. Just select the technology you’re interested in using below:

AKKA STREAMS KAFKA STREAMS APACHE FLINK SPARK STREAMING


1 Although it sounds simple, depending on the size, data caching can be quite complex