In this and related blog posts, I will explain how dynamically controlled streams can be used for implementing streaming applications. While this can be implemented using different streaming engines and frameworks, making the right technology choice is based on many factors, including internal knowledge of frameworks/engines, enterprise architectural standards, and other considerations.
Lightbend Platform supports integration with various streaming engines, and its module Lightbend Pipelines orchestrates and simplifies operations of complex streaming data applications. In the following posts, I will demonstrate how to implement dynamically controlled streams with the following four technologies:
I suggest you read this post before continuing to others, as we will lay the groundwork for better understanding more specific examples with the technologies listed above.
A common problem in building streaming application is integration of stream processing systems with other data existing in an enterprise in the form of enterprise services. The examples of real life systems with such requirements include, but are not limited to:
This additional data can be either static (or semi static) or it can be dynamic. In the case of static data it can be just “cached”1 inside stream processing. Here we will consider only the case of dynamic read only data.
Traditional approaches to this problem use RPC-based service invocations from within stream processing. The advantages of such an approach are:
On the other hand, such an approach can suffer from the following drawbacks:
An alternative that we discuss here is to use stateful stream processing, with the ability to update the state dynamically as the state of the services changes – i.e., dynamically controlled streams.
The overall architecture is based on the Log-centric execution architecture introduced by Jay Kreps.
Although this architecture looks very similar to the enterprise service bus (ESB) pattern, it is really very different (due to the difference between queue and log). For example, “an enterprise service bus represents an environment designed to foster sophisticated interconnectivity between services. It establishes an intermediate layer of processing that can help overcome common problems associated with reliability, scalability, and communications disparity”. The log represents a centralized subsystem with a well defined API for adding and retrieving data.
This pattern is similar to Whiteboard pattern, and provides the similar main advantages:
In the case of a log-centric architecture, there is no interconnectivity between services. Any data changes are added to the log and are available to any consumers that have access to the log.
The log just stores data sequentially (time sequence) without any attempts to manage producers and consumers of data. Additionally logs allow replay of messages in cases where some of the consumers have to be replaced.
With the log centric execution architecture in place, a dynamically controlled streams architecture can be a better alternative to the RPC-based service invocation from within stream.
This architecture basically requires a stateful stream processing for the main data stream with the state being updatable by a second stream - state update stream. Both streams are read from the centralized data log containing all of the incoming data and updates from all of the services.
An enterprise has multiple services that are being built independently, with different languages and platforms, that need to be used in stream processing.
How can I use these services inside stream processing?
Have each service produce data (as it changes) to the “control” stream for the stream processing to consume and use as a “state” that can be used by the data stream to produce results.
There are several important decisions that you need to make applying dynamically controlled stream patterns:
All of the modern stream processing frameworks and engines, including Akka Streams, Kafka Streams, Flink, and Spark can be used for implementing dynamically controlled streams. Here we will show very simplistic implementations (the complete code is available here), dropping many required details to show the base approaches and some tradeoffs. But before describing how to do the actual implementation, let’s first describe a simple example, that we are going to use throughout.
Throughout this post we will use a very simple example - temperature control. In a most simplistic case the implementation looks as follows:
In the sections below we will show how to implement this example, based on dynamically controlled stream pattern, leveraging different streaming frameworks and engines.
We assume here a very simple “heater model”, where temperature increases by 1 degree every N(configurable) minutes when the heater is on and decreases by 1 degree every M(configurable) minutes, when it is off. Temperature sensor provides reading every minute and thermostat setting, including both Td and ∆t arrive at any point.
For all of the examples below we first need to implement a heater model, based on our example definition. The implementation depends on several messages, which can be defined (using protocol buffers) as follows:
// Heater control
enum HeaterCommand{
On = 0;
Off = 1;
}
// Heater Control Message.
message HeaterControl {
int32 sensorID = 1;
HeaterCommand command = 2;
}
// Temperature Control Message
message TemperatureControl {
int32 sensorID = 1;
double desired = 2;
double upDelta = 3;
double downDelta = 4;
}
// Sensor data.
message SensorData {
int32 sensorID = 1;
double temperature = 2;
}
Because our implementation can support multiple sensors, sensor ID is included with each message.
The code below provides a fairly simple implementation. It assumes that temperature grows (or decreases) linearly, based on the value of control.
object Heater {
var heaterOperation = 1
val sensorID = 12345
def main(args: Array[String]) {
val brokers = kafkaConfig.brokers
val temperatureUp = Duration(temperatureUpRate.rate)
val temperatureDown = Duration(temperatureDownRate.rate)
val timeInterval = Duration(sensorPublishInterval.interval)
// Create kafka broker
val kafka = KafkaLocalServer(true)
kafka.start()
println(s"Kafka Cluster created")
// Create sender
val sender = MessageSender[Array[Byte], Array[Byte]](kafkaConfig.brokers)
// Start listener for control signal
val listener = MessageListener(brokers, kafkaConfig.heaterinputtopic,
kafkaConfig.heatersourcegroup, new ControlProcessor())
listener.start()
// Loop producing temperature reading
val bos = new ByteArrayOutputStream
var temperature = 42.0
while(true){
// Calculate output temperature
heaterOperation match {
case 0 => // Heater is on - increase temperature
temperature = temperature + timeInterval.toMillis.toDouble/temperatureUp.toMillis.toDouble
case _ => // Heater is off - decrease temperature
temperature = temperature - timeInterval.toMillis.toDouble/temperatureDown.toMillis.toDouble
// Send it to Kafka
val sd = new SensorData(sensorID, temperature)
bos.reset()
sd.writeTo(bos)
sender.writeValue(kafkaConfig.heateroutputtopic, bos.toByteArray)
println(s"Send sensor data $sd")
// Pause
pause(timeInterval)
def pause(timeInterval : Duration): Unit = Thread.sleep(timeInterval.toMillis)
}
This simple implementation is also listening on the control input, leveraging the following class
class ControlProcessor extends RecordProcessorTrait[Array[Byte], Array[Byte]] {
import Heater._
override def processRecord(record: ConsumerRecord[Array[Byte], Array[Byte]]): Unit = {
try{
heaterOperation = HeaterControl.parseFrom(record.value()).command.index
println(s"Updated heater control to $heaterOperation")
catch {
case t: Throwable => println(s"Error reading heater control $t")
}
Which listens on a predefined queue and updates heater operation with the value of received control.
Finally, a little method, below, generates new control settings every five minutes:
def controltemperature(sender : MessageSender[Array[Byte], Array[Byte]]) : Future[Unit] = Future{
var desired = 45.0
val controlInterval = 10.minute
val generator = new Random
val bos = new ByteArrayOutputStream
while(true) {
val temperatureControl = TemperatureControl(sensorID, desired, 1.0, 1.0)
bos.reset()
temperatureControl.writeTo(bos)
sender.writeValue(kafkaConfig.temperaturesettopic, bos.toByteArray)
println(s"Send new temperature control $temperatureControl")
desired = desired + (generator.nextInt(10) - 5)
pause(controlInterval)
}
With this in place, we can proceed to the controlled streams implementations. Just select the technology you’re interested in using below:
AKKA STREAMS KAFKA STREAMS APACHE FLINK SPARK STREAMING
1 Although it sounds simple, depending on the size, data caching can be quite complex ↩