FREE Training Courses — 100+ Hours of Akka and Reactive Architecture Expertise Get Started
Support
machine-learning spark

How To Use Spark Streaming For Dynamically Controlled Streams

Boris Lublinsky Principal Architect, Lightbend, Inc.

Cloud-Native Design Techniques for Serving Machine Learning Models with Apache Spark

In the introductory post of this short series, How To Serve Machine Learning Models With Dynamically Controlled Streams, I described how dynamically controlled streams is a very powerful pattern for implementing streaming applications. While this can be implemented using different streaming engines and frameworks, making the right technology choice is based on many factors, including internal knowledge of frameworks/engines, enterprise architectural standards, and other considerations.

In this post, I demonstrate with code how dynamically controlled streams can be implemented leveraging Apache Spark, specifically Spark Structured Streaming, and the main properties of such implementation. You can also click on the technology you’re using for my other examples leveraging Akka Streams, Kafka Streams, and Apache Flink.


Dynamically Controlled Streams With Spark Streaming

Spark Structured Streaming supports arbitrary stateful operations which can be used to implement stateful (and dynamically controlled) streams. Unfortunately, there are several caveats that need to be considered for such implementation:

  • Spark provides two main methods for joining streams - stream stream joins, introduced in Apache Spark 2.3 and union. The limitation of stream join is that it works on windows and rarely applicable for streams with significantly different frequency of records. As a result we are using union here.
  • The limitation of union is that only streams with identical schemas can be unioned together. As a result, for our implementation we had to introduce a case class UnifiedDataModel, as a superset of used messages - SensorData and TemperatureControl

With this in place, the following code provides implementation for controller in Spark

object SparkStructuredController {

 def main(args: Array[String]): Unit = {

   // Create context
   val sparkSession = SparkSession.builder
     .appName("SparkModelServer").master("local")
     .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
     .config("spark.sql.streaming.checkpointLocation", sparkConfig.checkpointingDir)
     .getOrCreate()

   sparkSession.sparkContext.setLogLevel("ERROR")
   import sparkSession.implicits._

   // Message parsing
   // In order to be able to uninon both streams we are using here combined format
   sparkSession.udf.register("deserializeData", (data: Array[Byte]) =>  
DataModelTransform.sensorFromByteArray(data))
   sparkSession.udf.register("deserializeControl", (data: Array[Byte]) =>
 				DataModelTransform.controlFromByteArray(data))

   // Create data stream
   val sensorstream = sparkSession
     .readStream
     .format("kafka").option("kafka.bootstrap.servers", kafkaConfig.brokers)
     .option("subscribe", kafkaConfig.heateroutputtopic)
     .option(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.heateroutputgroup)
     .option(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
     .option("startingOffsets", "earliest").option("failOnDataLoss", "false")
     .load().selectExpr("""deserializeData(value) AS data""")
     .select("data.sensorID", "data.sensorData", "data.temperatureControl")
     .as[UnifiedDataModel]

   // Create control stream
   val controlstream = sparkSession
     .readStream
     .format("kafka").option("kafka.bootstrap.servers", kafkaConfig.brokers)
     .option("subscribe", kafkaConfig.temperaturesettopic)
     .option(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.temperaturesetgroup)
     .option(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
     .option("startingOffsets", "earliest").option("failOnDataLoss", "false")
     .load().selectExpr("""deserializeControl(value) AS data""")
     .select("data.sensorID", "data.sensorData", "data.temperatureControl")
     .as[UnifiedDataModel]

   val heatercontrolstream = controlstream.union(sensorstream)
     .filter(_.sensorID > 0)
     .groupByKey(_.sensorID)
     .mapGroupsWithState(GroupStateTimeout.NoTimeout())(controlTemperature).
as[Seq[InternalHeaterControl]]
     .withColumn("value", explode($"value")).select("value.sensorID", "value.action")
     .as[InternalHeaterControl].map(DataModelTransform.toByteArray(_))

   heatercontrolstream.writeStream
     .outputMode("update").format("kafka").option("kafka.bootstrap.servers", kafkaConfig.brokers)
     .option("topic", kafkaConfig.heaterinputtopic).trigger(Trigger.ProcessingTime("1 second")).start

   //Wait for all streams to finish
   sparkSession.streams.awaitAnyTermination()
 

This implementation is fairly straightforward. First, two streams are created reading from two kafka topics and then they are unioned together. The resulting stream, containing both streams is then processed leveraging mapGroupsWithState with controlTemperature function presented below. The resulting heatercontrolstream is then written to Kafka

def controlTemperature(key: Int, values: Iterator[UnifiedDataModel], state: 
 GroupState[TemperatureControlWithLastValue]) : Seq[InternalHeaterControl] = {
 var results = new ListBuffer[InternalHeaterControl]()
 values.foreach(value => {
   value.sensorData match {
     case null =>  // This is control
       println(s"New temperature settings ${value.temperatureControl}")
       val lastControl = if(state.exists) state.get.lastValue else -1
       state.update(TemperatureControlWithLastValue(lastControl, value.temperatureControl))
     case _ => // This is data
       if (state.exists) {
         val setting = state.get
         val action = (if (value.sensorData.temperature > (setting.temperatureControl.desired +
 			setting.temperatureControl.upDelta)) 1
         else if (value.sensorData.temperature < (setting.temperatureControl.desired -
 			setting.temperatureControl.downDelta)) 0 else -1)
         if((action >= 0) && (setting.lastValue != action)) {
           println(s"sending new control $action for sensor ${value.sensorID}")
           results += InternalHeaterControl(value.sensorID, action)
           state.update(TemperatureControlWithLastValue(action, setting.temperatureControl))
   
 })
 results.toList
}

The controlTemperature function (above) is very similar to Akka Streams ControllerBehaviour. This method receives a union of both streams and depending on the message, it either process it as a temperatureControl or sensorData. A state contains both current control setting and the last submitted control value, that are used for calculation of the current control value.

Although this implementation works, it suffers from several drawbacks:

  • Because a union operation is used here, continuous processing mode (introduced in Spark 2.3) does not work. As a result, we are forced to use mini batch processing which introduces additional delay in switching heater control, and as result greater variation of the temperature.
  • Usage of union forces us to introduce additional intermediate types which makes code less readable.
  • Spark streaming does not support well scala Option/Try, so it is necessary to introduce an alternative implementation for filtering out failed data transformation

We will take a look at a better Spark Structured Streaming implementation below

An Alternative Implementation Of Spark Structured Streaming 1

The idea behind alternative implementation is the fact that Spark can run multiple queries in parallel. Based on this, implementation is running two queries - one for sensor data and one for control settings. To be able to coordinate their execution, we are using a map, which is updated by control setting query and is used by sensor processing query. For the new settings to take effect, e are restarting sensor processing query every time we are getting new control settings. Finally, to simplify management of sensor processing query, control setting processing query is implemented using Spark Streaming rather than Structured Streaming. The code is presented below:

object SparkStructuredStateControllerl {

 def main(args: Array[String]): Unit = {

   // Create context
   val sparkSession = SparkSession.builder.appName("SparkModelServer").master("local[3]")
     .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
     .config("spark.sql.streaming.checkpointLocation", sparkConfig.checkpointingDir).getOrCreate()

   val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(1))
   sparkSession.sparkContext.setLogLevel("ERROR")
   import sparkSession.implicits._

   sparkSession.udf.register("deserializeData", (data: Array[Byte]) => 
 				DataModelTransform.sensorFromByteArray(data))
   // Current state of temperature settings
   val currenSettings = mutable.Map[Int, TemperatureControl]()

   // Create broadcast variable for the sink definition
   val temperatureControlProcessor = sparkSession.sparkContext.broadcast(new TemperatureController)

   // Create data stream
   val sensorstream = sparkSession.readStream.format("kafka")
     .option("kafka.bootstrap.servers", kafkaConfig.brokers)
     .option("subscribe", kafkaConfig.heateroutputtopic)
     .option(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.heateroutputgroup)
     .option(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
     .option("startingOffsets", "earliest").option("failOnDataLoss", "false")
     .load().selectExpr("""deserializeData(value) AS data""").select("data.sensorID", "data.temperature")
     .as[SensorData].filter(_.sensorID >= 0).map(data => {
       temperatureControlProcessor.value.control(data, currenSettings.get(data.sensorID))
     }).as[InternalHeaterControl].filter(_.sensorID >= 0)
     .map(DataModelTransform.toByteArray(_))

   var sensorQuery = sensorstream.writeStream.outputMode("update").format("kafka")
     .option("kafka.bootstrap.servers", kafkaConfig.brokers).option("topic", kafkaConfig.heaterinputtopic)
     .trigger(Trigger.Continuous("5 second")) .start

   // Create settings kafka stream
   val kafkaParams = KafkaSupport.getKafkaConsumerConfig(kafkaConfig.brokers)
   val modelsStream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte]](ssc,PreferConsistent,
     Subscribe[Array[Byte], Array[Byte]](Set(kafkaConfig.temperaturesettopic),kafkaParams))

   modelsStream.foreachRDD( rdd =>
     if (!rdd.isEmpty()) {
       val settings = rdd.map(_.value).collect
         .map(DataTransformer.controlFromByteArray(_)).filter(_.isSuccess).map(_.get)
       val newSettings = settings.map(setting => {
         println(s"New temperature settings $setting")
         // Update state with the new model
         (setting.sensorID -> setting)
       }).toMap

       // Stop currently running data stream
       sensorQuery.stop

       // Merge maps
       newSettings.foreach{ case (name, value) => { currenSettings(name) = value }}

       // restatrt data stream
       sensorQuery = sensorstream.writeStream.outputMode("update").format("kafka")
         .option("kafka.bootstrap.servers", kafkaConfig.brokers).option("topic", kafkaConfig.heaterinputtopic)
         .trigger(Trigger.Continuous("5 second")).start
     
   

   // Execute
   ssc.start()
   ssc.awaitTermination()
 

Because in this implementation there is no place explicitly containing state, in order to avoid sending a lot of duplicate messages, we have implemented a separate class for calculating control signal:

class TemperatureController{
 var previousControl = -1

 def control(data: SensorData, controlSetting : Option[TemperatureControl]) : InternalHeaterControl = {
   controlSetting match {
     case Some (setting) =>
       (if (data.temperature > (setting.desired + setting.upDelta)) 1
       else if (data.temperature < (setting.desired - setting.downDelta)) 0 else -1) match {
         case action if (action >= 0) =>
           if(action != previousControl) {
             println(s"sending new control $action for sensor ${data.sensorID}")
             previousControl = action
             InternalHeaterControl(data.sensorID, action)
           
           else InternalHeaterControl(-1, -1)
         case _ => InternalHeaterControl(-1, -1)
       
     case _ => InternalHeaterControl(-1, -1)
   
 
}

Because sensor stream in this case is very simple, we can use continuous execution thus implementing real time processing. Additionally, because we are processing each stream independently, we can use the streams native records for processing, without introduction of the combined records.

In this implementation we are using .collect method on the RDD, which is commonly considered an anti pattern for Spark development. But in this case there is relatively small (<1000, for example), its ok to use collect.

Scalability, Fault Tolerance, And Load Balancing

Similar to Flink, The main components of Spark Streaming fault tolerance are state’s (including RDD) fault tolerance and a current position in the input stream (for example Kafka offset), Spark Streaming achieves fault tolerance by implementing checkpointing of state and stream positions. Checkpoints allow Spark Streaming to recover state and positions in the streams to provide Spark streaming applications with a failure-free execution semantics.

When it comes to scalability of Spark Streaming it can leverage both the scalability of Kafka listeners based on scalability (through partitioning) of Kafka and scalability of the Spark Streaming itself based on parallelism.


Learn More About Machine Learning

Last year, I wrote an O'Reilly eBook titled Serving Machine Learning Models: A Guide to Architecture, Stream Processing Engines, and Frameworkswhich I encourage you to download for additional learning. I hope you enjoy it!

GET THE FREE EBOOK

 


1 This approach was suggested by Gerard Maas (@maasg) of Lightbend