Akka Serverless - Who said building scalable, resilient systems takes lots of time? Learn More.
machine-learning flink

How To Use Apache Flink For Dynamically Controlled Streams

Boris Lublinsky Principal Architect, Lightbend, Inc.

Cloud-Native Design Techniques for Serving Machine Learning Models with Apache Flink

In the introductory post of this short series, How To Serve Machine Learning Models With Dynamically Controlled Streams, I described how dynamically controlled streams is a very powerful pattern for implementing streaming applications. While this can be implemented using different streaming engines and frameworks, making the right technology choice is based on many factors, including internal knowledge of frameworks/engines, enterprise architectural standards, and other considerations.

In this post, I demonstrate with code how dynamically controlled streams can be implemented leveraging Apache Flink, and the main properties of such implementation. You can also click on the technology you’re using for my other examples leveraging Akka Streams, Kafka Streams, and Apache Spark.

Dynamically Controlled Streams With Apache Flink

Flink provides native support for stateful stream processing including state support and dynamically controlled streams. The basic implementation of temperature control processor, based on Flink’s Coprocessor class is presented below

class TemperatureControlProcessor extends CoProcessFunction[SensorData, TemperatureControl, HeaterControl]{

 var currentSettings : ValueState[Option[TemperatureControl]] = _
 private val previousCommands = Map[Int, Int]()

 override def open(parameters: Configuration): Unit = {

   val currentSettingDesc = new ValueStateDescriptor[Option[TemperatureControl]](
     "currentTemperatureSetting",                               // state name
     createTypeInformation[Option[TemperatureControl]])                // type information
   currentSettings = getRuntimeContext.getState(currentSettingDesc)

 override def processElement2(control: TemperatureControl, ctx: CoProcessFunction[SensorData, TemperatureControl, HeaterControl]#Context, out: Collector[HeaterControl]): Unit = {

   if(currentSettings.value == null) currentSettings.update(None)

   println(s"New temperature settings $control")
   currentSettings.update (Some(control))

 override def processElement1(record: SensorData, ctx: CoProcessFunction[SensorData, TemperatureControl, HeaterControl]#Context, out: Collector[HeaterControl]): Unit = {

   if(currentSettings.value == null) currentSettings.update(None)

   currentSettings.value match {
     case Some(setting) => // We are controlling
       val action = (if(record.temperature > (setting.desired + setting.upDelta)) 1
         else if(record.temperature < (setting.desired - setting.downDelta)) 0 else -1)
       if((action >= 0) && (previousCommands.getOrElse(record.sensorID, -1) != action)){
         println(s"sending new control $action for sensor ${record.sensorID}")
         previousCommands += (record.sensorID -> action)
         out.collect(HeaterControl(record.sensorID, HeaterCommand.fromValue(action)))
     case _ => // No control

On the class level we are defining a ValueState - which is a Flink’s mechanism for working with state, which is backed by RockDB and snapshotting. Open method of the class does everything necessary to properly set up ValueState. In addition to ValueState we are also using a Map local to the instance of the class to store previous values of control variable. The instance level variable (unlike ValueState) will not be restored in the case of restarts, but this is fine in this case, because the map is used only for optimization and duplication of the control will not harm the system.

The rest of the implementation is very similar to the ControllerBehavior in the Akka Streams implementation with three main differences:

  • Unlike Akka implementation where all of the requests are delivered to onMessage method Flink provides individual methods for messages1.
  • Unlike Akka Streams, where, in order to provide reactive processing, every actor invocation has to receive reply, Flink CoProcessor function uses explicit output collector, that does not need to be written to for every invocation. As a result, code here is a little simpler, avoiding sending MayBeHeaterControl message as an intermediate output. As a result this class calculates HeaterControl directly, but only when necessary.
  • Unlike Akka, where a separate instance of Actor behaviour is created for every key, in the case of Flink an instance of CoProcessor function is created for a key shard, hence the necessity for a ValuState.

A Flink job implementation leverages TemperatureControlProcessor and looks like follows:

object TemperatureControllerJob {

 def main(args: Array[String]): Unit = {
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   System.out.println("[info] Job ID: " + env.getStreamGraph.getJobGraph.getJobID)

 // Build execution Graph
 def buildGraph(env : StreamExecutionEnvironment) : Unit = {

   // configure Kafka consumer
   // Data
   val sensorKafkaProps = new Properties
   sensorKafkaProps.setProperty("bootstrap.servers", kafkaConfig.brokers)
   sensorKafkaProps.setProperty("group.id", kafkaConfig.heateroutputgroup)
   // always read the Kafka topic from the current location
   sensorKafkaProps.setProperty("auto.offset.reset", "latest")

   // Model
   val controlKafkaProps = new Properties
   controlKafkaProps.setProperty("bootstrap.servers", kafkaConfig.brokers)
   controlKafkaProps.setProperty("group.id", kafkaConfig.temperaturesetgroup)
   // always read the Kafka topic from the current location
   controlKafkaProps.setProperty("auto.offset.reset", "earliest")

   // create a Kafka consumers
   // Sensor
   val sensorConsumer = new FlinkKafkaConsumer011[Array[Byte]](
     new ByteArraySchema,

   // Control
   val controlConsumer = new FlinkKafkaConsumer011[Array[Byte]](
     new ByteArraySchema,

   val heaterProducer = new FlinkKafkaProducer011[Array[Byte]](
     kafkaConfig.brokers,                            // broker list
     kafkaConfig.heaterinputtopic,                   // target topic
     new ByteArraySchema)                            // serialization schema

   // Create input data streams
   val controlStream = env.addSource(controlConsumer)
   val sensorStream = env.addSource(sensorConsumer)

   // Read data from streams
   val controls = controlStream.map(DataTransformer.controlFromByteArray(_))
   val sensors = sensorStream.map(DataTransformer.sensorFromByteArray(_))

   // Merge streams
     .process(new TemperatureControlProcessor())
     .map(value => DataTransformer.toByteArray(value))

This implementation two methods - main method orchestrating overall execution and buildGraph method - building an execution graph (compare to Akka streams code). After defining input and output streams, it defines preliminary transformation on both sensor and control input streams and then connect them (based on the key) leveraging TemperatureControlProcessor. The result thenis submitted to the output stream (Kafka)

Scalability, Resilience, And Load Balancing

The main components of Flink’s fault tolerance are state’s fault tolerance and a current position in the input stream (for example Kafka offset), Flink achieves fault tolerance by implementing checkpointing of state and stream positions. Checkpoints allow Flink to recover state and positions in the streams to give the application the same semantics as a failure-free execution. Recovery under this mechanism is straightforward: Upon a failure, Flink re-deploys the entire distributed dataflow, and gives each operator the state that was snapshotted as part of the latest checkpoint. The sources are set to start reading the stream from position reflected in the same checkpoint.

In addition, Flink 1.5 introduced Task-local state recovery. With this new feature, when writing the state of operators to the remote storage, Flink can now also keep a copy on the local disk of each machine. In case of failover, the scheduler tries to reschedule tasks to their previous machine and load the state from the local disk instead of the remote storage, resulting in faster recovery.

When it comes to scalability of Flink it can leverages both, scalability of Kafka listeners based on scalability (through partitioning) of Kafka and scalability of the Flink itself based on parallelism. At the moment, Flink does not support auto scaling, but you can leverage savepoints to stop, rescale and restart Flink.

In the last post of this series we will show how implement dynamically controlled streams pattern leveraging Spark Structured Streaming.

Learn More About Machine Learning

Last year, I wrote an O'Reilly eBook titled Serving Machine Learning Models: A Guide to Architecture, Stream Processing Engines, and Frameworks, which I encourage you to download for additional learning. I hope you enjoy it!



1 Currently Flink’s CoProcessor function only supports two message types. If you need more than two you will have to chain multiple CoProcessor functions.


The Total Economic Impact™
Of Lightbend Akka Platform

  • 139% ROI
  • 50% to 75% faster time-to-market
  • 20x increase in developer throughput
  • <6 months Akka Platform pays for itself