In the introductory post of this short series, How To Serve Machine Learning Models With Dynamically Controlled Streams, I described how dynamically controlled streams is a very powerful pattern for implementing streaming applications. While this can be implemented using different streaming engines and frameworks, making the right technology choice is based on many factors, including internal knowledge of frameworks/engines, enterprise architectural standards, and other considerations.
In this post, I demonstrate with code how dynamically controlled streams can be implemented leveraging Apache Flink, and the main properties of such implementation. You can also click on the technology you’re using for my other examples leveraging Akka Streams, Kafka Streams, and Apache Spark.
Flink provides native support for stateful stream processing including state support and dynamically controlled streams. The basic implementation of temperature control processor, based on Flink’s Coprocessor class is presented below
class TemperatureControlProcessor extends CoProcessFunction[SensorData, TemperatureControl, HeaterControl]{
var currentSettings : ValueState[Option[TemperatureControl]] = _
private val previousCommands = Map[Int, Int]()
override def open(parameters: Configuration): Unit = {
val currentSettingDesc = new ValueStateDescriptor[Option[TemperatureControl]](
"currentTemperatureSetting", // state name
createTypeInformation[Option[TemperatureControl]]) // type information
currentSettings = getRuntimeContext.getState(currentSettingDesc)
override def processElement2(control: TemperatureControl, ctx: CoProcessFunction[SensorData, TemperatureControl, HeaterControl]#Context, out: Collector[HeaterControl]): Unit = {
if(currentSettings.value == null) currentSettings.update(None)
println(s"New temperature settings $control")
currentSettings.update (Some(control))
override def processElement1(record: SensorData, ctx: CoProcessFunction[SensorData, TemperatureControl, HeaterControl]#Context, out: Collector[HeaterControl]): Unit = {
if(currentSettings.value == null) currentSettings.update(None)
currentSettings.value match {
case Some(setting) => // We are controlling
val action = (if(record.temperature > (setting.desired + setting.upDelta)) 1
else if(record.temperature < (setting.desired - setting.downDelta)) 0 else -1)
if((action >= 0) && (previousCommands.getOrElse(record.sensorID, -1) != action)){
println(s"sending new control $action for sensor ${record.sensorID}")
previousCommands += (record.sensorID -> action)
out.collect(HeaterControl(record.sensorID, HeaterCommand.fromValue(action)))
case _ => // No control
}
On the class level we are defining a ValueState - which is a Flink’s mechanism for working with state, which is backed by RockDB and snapshotting. Open method of the class does everything necessary to properly set up ValueState. In addition to ValueState we are also using a Map local to the instance of the class to store previous values of control variable. The instance level variable (unlike ValueState) will not be restored in the case of restarts, but this is fine in this case, because the map is used only for optimization and duplication of the control will not harm the system.
The rest of the implementation is very similar to the ControllerBehavior in the Akka Streams implementation with three main differences:
A Flink job implementation leverages TemperatureControlProcessor and looks like follows:
object TemperatureControllerJob {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
buildGraph(env)
System.out.println("[info] Job ID: " + env.getStreamGraph.getJobGraph.getJobID)
env.execute()
// Build execution Graph
def buildGraph(env : StreamExecutionEnvironment) : Unit = {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(5000)
// configure Kafka consumer
// Data
val sensorKafkaProps = new Properties
sensorKafkaProps.setProperty("bootstrap.servers", kafkaConfig.brokers)
sensorKafkaProps.setProperty("group.id", kafkaConfig.heateroutputgroup)
// always read the Kafka topic from the current location
sensorKafkaProps.setProperty("auto.offset.reset", "latest")
// Model
val controlKafkaProps = new Properties
controlKafkaProps.setProperty("bootstrap.servers", kafkaConfig.brokers)
controlKafkaProps.setProperty("group.id", kafkaConfig.temperaturesetgroup)
// always read the Kafka topic from the current location
controlKafkaProps.setProperty("auto.offset.reset", "earliest")
// create a Kafka consumers
// Sensor
val sensorConsumer = new FlinkKafkaConsumer011[Array[Byte]](
kafkaConfig.heateroutputtopic,
new ByteArraySchema,
sensorKafkaProps
// Control
val controlConsumer = new FlinkKafkaConsumer011[Array[Byte]](
kafkaConfig.temperaturesettopic,
new ByteArraySchema,
controlKafkaProps
val heaterProducer = new FlinkKafkaProducer011[Array[Byte]](
kafkaConfig.brokers, // broker list
kafkaConfig.heaterinputtopic, // target topic
new ByteArraySchema) // serialization schema
// Create input data streams
val controlStream = env.addSource(controlConsumer)
val sensorStream = env.addSource(sensorConsumer)
// Read data from streams
val controls = controlStream.map(DataTransformer.controlFromByteArray(_))
.flatMap(BadDataHandler[TemperatureControl])
.keyBy(_.sensorID)
val sensors = sensorStream.map(DataTransformer.sensorFromByteArray(_))
.flatMap(BadDataHandler[SensorData])
.keyBy(_.sensorID)
// Merge streams
sensors
.connect(controls)
.process(new TemperatureControlProcessor())
.map(value => DataTransformer.toByteArray(value))
.addSink(heaterProducer)
}
This implementation two methods - main method orchestrating overall execution and buildGraph method - building an execution graph (compare to Akka streams code). After defining input and output streams, it defines preliminary transformation on both sensor and control input streams and then connect them (based on the key) leveraging TemperatureControlProcessor. The result thenis submitted to the output stream (Kafka)
The main components of Flink’s fault tolerance are state’s fault tolerance and a current position in the input stream (for example Kafka offset), Flink achieves fault tolerance by implementing checkpointing of state and stream positions. Checkpoints allow Flink to recover state and positions in the streams to give the application the same semantics as a failure-free execution. Recovery under this mechanism is straightforward: Upon a failure, Flink re-deploys the entire distributed dataflow, and gives each operator the state that was snapshotted as part of the latest checkpoint. The sources are set to start reading the stream from position reflected in the same checkpoint.
In addition, Flink 1.5 introduced Task-local state recovery. With this new feature, when writing the state of operators to the remote storage, Flink can now also keep a copy on the local disk of each machine. In case of failover, the scheduler tries to reschedule tasks to their previous machine and load the state from the local disk instead of the remote storage, resulting in faster recovery.
When it comes to scalability of Flink it can leverages both, scalability of Kafka listeners based on scalability (through partitioning) of Kafka and scalability of the Flink itself based on parallelism. At the moment, Flink does not support auto scaling, but you can leverage savepoints to stop, rescale and restart Flink.
In the last post of this series we will show how implement dynamically controlled streams pattern leveraging Spark Structured Streaming.
Last year, I wrote an O'Reilly eBook titled Serving Machine Learning Models: A Guide to Architecture, Stream Processing Engines, and Frameworks, which I encourage you to download for additional learning. I hope you enjoy it!
1 Currently Flink’s CoProcessor function only supports two message types. If you need more than two you will have to chain multiple CoProcessor functions. ↩