Introduction

In many real-world applications, it is required to process data as it arrives as a stream of real time events. Examples of such applications include financial transactions, cars in motion emitting GPS signals, measurements from industrial sensors, web traffic, and so on.

One of the main requirements for such systems is low latency processing of the streaming data, but it is not the only requirement. As such systems are moving to the forefront of enterprise application architecture, other requirements for a well designed architecture include:

  • Microservices approach that refers to breaking functions in large systems into simple, generally single-purpose services that can be built and maintained independently and use the best technology for implementing certain functionality. Such approach additionally requires
    • An ability to scale each individual service based on the varied load on the system
    • Ease of update and deployment of complex service topologies.
    • Monitoring of such complex service topologies
  • Reactiveness of the stream processing, including responsiveness, resilience and elasticity.
  • An ability to restart after a failure in a manner that produces accurate results; in other words, there’s an advantage to being fault-tolerant with exactly-once guarantees.

Many of these capabilities are implemented by the open source project Cloudflow, which helps developers and architects quickly develop, orchestrate, and operate distributed streaming applications on Kubernetes.

Cloudflow supports the building of streaming applications by using a set of small, composable components communicating over Kafka and wired together with schema-based contracts. This approach can significantly improve component reuse, saving time, and dramatically accelerates streaming application development. At the time of this writing, such components can be implemented using Akka Streams, Apache Flink and Apache Spark.

Everything in Cloudflow is done in the context of an application, which represents a self-contained distributed system (graph) of data processing services connected together by data streams over Kafka.

In the following figure we show a Cloudflow application that contains four components that communicate through message passing, leveraging Kafka topics.

This application has two Akka Streams components, one Flink component and one Spark component.

Cloudflow supports:

  • Development: By generating a lot of boilerplate code, it allows developers to focus on business logic.
  • Build: It provides all the tooling for going from business logic to a deployable Docker image.
  • Deploy: It provides Kubernetes tooling to deploy your distributed application with a single command.
  • Operate: It provides all the tools you need to get insights, observability, and lifecycle management for your distributed streaming application. Another important operational concern directly supported by cloudflow is an ability to scale individual components of the stream.

With multiple runtimes natively supported by Cloudflow, we often hear questions about which runtime to use for a specific implementation. In this series of blog posts, we will look at some of the capabilities of Apache Flink and Akka Streams Cloudflow runtimes and show how to implement the same functionality for both.

Using Apache Flink with Cloudflow

Apache Flink is an open source streaming platform which supports real-time data processing pipelines in a fault-tolerant way at scale–i.e. millions of events per second. As defined here, the main features of Flink are:

  • High throughput and low latency: Flink is based on the DataFlow model i.e. processing the elements as and when they arrive which allows flink to process millions of records per minute at milliseconds of latencies on a single machine. Here are some benchmarks.
  • Fault tolerance: Flink provides robust fault-tolerance using checkpointing (periodically saving internal state to external sources such as HDFS or S3). Additionally Flink’s checkpointing mechanism can be made incremental (save only the changes and not the whole state) which really reduces the amount of data in HDFS and the I/O duration. The checkpointing overhead is almost negligible which enables users to have large states inside Flink applications.
  • Ease of use/expressiveness: Flink provides an extremely simple high level api in the form of Map/Reduce, Filters, Window, GroupBy, Sort and Joins. This provides a developer lot of flexibility and speeds up the development while writing new jobs.
  • Stateful processing: Flink provides a simple API to interact with state like you would interact with a Java object. States can be backed by Memory, Filesystem or RocksDB, which are check-pointed and are thus fault tolerant.
  • Correct time/window semantics: Flink natively supports infinite stream data processing. In Flink, the Window operation can split infinite streams into finite streams, and is the core component for processing finite streams. Flink supports both time-driven (referred to as the Time Window) or data-driven (Count Window). Flink also supports different time concepts in streaming programs: event time - the time that each event occurs on its production equipment, ingestion time - the time when data enters Flink and processing time - the system time of the machine executing the corresponding operator operation.
  • Variety of sources and sinks: Flink provides seamless connectivity to a variety of data sources and sinks including: Apache Cassandra, Elasticsearch, Kafka, RabbitMQ, Hive, etc.

Flink is an application server and as such is based on organizing computations in blocks and leveraging cluster architectures1. Splitting computations in blocks enables execution parallelism, where different blocks run on different threads on the same machine, or on different machines. It also enables failover by moving execution blocks from failed machines to the healthy ones.

As with any application engine, Flink requires a developer to adhere to its programming model and deployment. They also often require a steeper learning curve for mastering their functionality.

Fortunately Cloudflow hides many of the details of packaging and deployment of Flink applications, which means that the user just writes a basic functionality that he needs to implement as (a set) of classes.

Using Akka Streams with Cloudflow

Akka-streams is a library that is built atop of the Akka actor toolkit, which follows to the Reactive Streams initiative and protocols. The Akka Streams API allows us to easily compose data transformation flows from independent steps. It provides easy-to-use APIs to create streams that leverage the power of Akka without explicitly defining actor behaviors and messages (you can, of course, explicitly use Actors in Akka Streams, as we will show later in this blog post).

Akka Streams provide a higher-level abstraction over Akka’s existing actor model. This allows you to focus on business logic and forget about all of the boilerplate code required to manage the actor. Akka Streams is based on the following core API concepts:

  • Source: a component that emits elements asynchronously downstream. It’s a starting point of every streaming topology that only has output. A source can be created from any Scala collection, it can be a Kafka topic, it can be created from constantly polling some HTTP endpoint, etc. Additionally it is possible to implement a custom source, or use multiple integrations that are provided by Alpakka library.
  • Sink: a component that receives elements, and as the opposite of the Source, it’s usually the final stage in a stream processing topology. It only has input. A sink can be a Scala collection, Kafka Topic, or even an Akka Actor. Here is a list of all possible sinks. Additionally it is possible to implement a custom sink.
  • Flow: a component that has both input and output. It’s basically a component that receives the data, transforms it, and passes it downstream to the sink or to the next flow in topology. Akka Streams (and Cloudflow) provides a fair amount of pre-built flows, but you can always create a custom one.

The Akka Streams API provides us a builder pattern syntax to chain source, flow, and sink components in order to create a graph (RunnableGraph in Akka Streams terminology). A graph must have at least one Source, one Sink and any amount of flows. Our graph represents just a description of the topology, and it’s completely “lazy”: it starts only when a Source starts emitting data. Running a graph will allocate the needed resources to execute topology, like actors and threads.

The main features of Akka Streams are:

  • Backpressure: a mechanism where downstream components inform upstream components about the number of messages that can be received and buffered. Akka Streams has a back-pressure mechanism implemented and the user of the library doesn’t have to write any explicit back-pressure handling code.
  • Operators: Akka streams provide a lot of operators to write declaratively transformations over streams easily.
  • Reusable and composable: Once we design a Data Flow Graph (DFG), we can reuse it any number of times. We can compose Akka Streams components easily to solve complex data-processing problems.
  • Scalability: Akka Streams is quite economical in some cases when it comes to threads, which provides very high scalability of Akka Streams

Unlike Flink, Akka Streams is not an application server, but rather a stream processing library, with a Domain Specific language (DSL) providing a set of constructs simplifying building streaming applications. As outlined in Jay Kreps' blog, stream processing engines and stream processing libraries are two very different approaches to building streaming applications.

Stream processing libraries are typically easier to use, providing more flexibility, but require specific implementation of deployment, scalability and load balancing. Fortunately Cloudflow implements most of these concerns, so similar to Flink, the user just writes a basic functionality that he needs to implement as (a set) of classes.

Additionally, the Akka Streams runtime in Cloudflow is fully integrated with Akka Cluster, leveraging Kafka aware cluster sharding, which allows for very powerful stateful processing as described later in this blog.

Implementing windowing in Akka Streams applications

One of the common considerations for choosing a runtime is the requirement for windowing. While Flink out of the box provides a very well defined support for time and windowing, similar support in Akka Streams is fairly weak. Currently Akka Streams provides several operators that can be (sort of) used for windowing:

  • groupedWithin: Chunk up the stream into groups of elements received within a time window, or limited by the number of the elements. This can be considered as a tumbling window.
  • sliding: Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.
  • sessionWindow implements time-based session windows - helps to identify periods of activity.

None of these implementations really provide the functionality necessary for true time/window support. Does this mean that every time a service has a time/window requirement, one has to use Flink? Not necessarily.

Over the time there were quite a few attempts to enhance Akka Streams with windowing. This blog post provides an excellent overview of windowing along with an implementation, other implementations are provided here and here. Unfortunately none of these implementations provide time functionality - they are all based on processing time, not event time. They also do not ensure absence of data lost in the case of execution restarts, which in Cloudflow is implemented leveraging Kafka commit after the message was processed. As a result we will here provide a new implementation based on Akka Stream’s custom graph stage. We implemented the following timing windows:

  • Tumbling window - windows that have a fixed (time) size and do not overlap. In this case time is divided into non-overlapping parts and each data element belongs to a single window.
  • Sliding windows - windows parameterized by length and step. These windows overlap, and each data element can belong to multiple windows.

Both types of timing windows support both events (stored inside the message) and processing time and watermarking (for the cases of event time). We also provide two types of session windows:

  • Inactivity based session windows - windows defining session on the inactivity period (session completes if no data arrives within inactivity data interval).
  • Value based session windows - windows defining session based on the message parameter (in addition session in this case will complete in the case of inactivity)

Both types of session windows support both events (stored inside the message) and processing time. All of the windows implementations support maximum window data size. If this size is met, then we would leave only the most recent data.

As we mentioned above, all windows implementations are based on custom graph stage, which internally captures the content of the window and a list of Kafka offsets for each message inside the window.

As a result of such implementation, none of the messages are committed until the window is processed as a whole. This means that although a window's data is collected in memory, if the execution fails and the instance restarts, all of the messages of the window(s) that have not been processed will be reread.

Support for watermarks for time based windows and inactivity for the session windows is based on timer (we are using TimerGraphStageLogic as the base class for the GraphStages logic). We are here using timers with fixed intervals (which can be controlled by windows parameters) and checking required conditions at each timer firing.

Below is the code for the tumbling window implementation:

case class TumblingWindow[T](duration: FiniteDuration, 
time_extractor: T ⇒ Long = (_: T) ⇒ System.currentTimeMillis(),
            watermark: FiniteDuration = 0.millisecond, maxsize: Int = 1000, 
watermakInterval: FiniteDuration = 100.millisecond)
      extends GraphStage[FlowShape[(T, Committable), (Seq[T], Committable)]] {

 // Define inlets and outlets and shape 
 private val inlet = Inlet[(T, Committable)]("TumblingWindow.inlet")
 private val outlet = Outlet[(Seq[T], Committable)]("TumblingWindow.outlet")
 override val shape: FlowShape[(T, Committable), (Seq[T], Committable)] = 
FlowShape(inlet, outlet)

 // Create stage logic with timer
 override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
   new TimerGraphStageLogic(shape) with InHandler with OutHandler {
    // Internal parameters
     private val buffer = new ListBuffer[T] // Data collector
     private val offsets = new ListBuffer[Committable] // Context collector
     private var windowStart: Long = -1 // Collection start time
     private var windowEnd: Long = -1 // Collection end time
     private val completed = new ListBuffer[ReadyWindow[T]] // Windows waiting for watermark

     // Convert timings to milliseconds
     private val durationMS = duration.toMillis
     private val watermarkMS = watermark.toMillis
     // Setup handlers
     this.setHandlers(inlet, outlet, this)
     // pre start - start timer
     override def preStart(): Unit =
       scheduleWithFixedDelay(shape, 0.millisecond, watermakInterval)
     // post stop - cleanup
     override def postStop(): Unit = {
       buffer.clear()
       offsets.clear()
       completed.clear()
     }
     // Get new event
     override def onPush(): Unit = {
       // Get element and its time
       val event = grab(inlet)
       val time = time_extractor(event._1)
       // First time through - initial setup
       if (windowStart < 0) {
         setup(time)
       }
       if ((windowStart until windowEnd) contains time) {
         // We are in the current window the window
         updateData[T](event._1, buffer, maxsize)
       } else if (time >= windowEnd) {
         val currentTime = System.currentTimeMillis()
         // Event pass the window
         completed +=
           ReadyWindow[T](windowStart until windowEnd, buffer.clone(), maxsize, 
offsets.clone(), currentTime + watermarkMS)
         // Reset for next window
         setup(windowEnd)
         // Store this event
         buffer += event._1
         // Push as quickly as possible
         submitReadyWindows(currentTime)
       } else {
         // Late arriving event - store it if window is still around
         for (window ← completed) {
           if (window.range.contains(time))
             window.addLateEvent(event._1)
         }
       }
       // Regardless of where the actual data goes, the offset should go here 
       // to avoid premature commits
       offsets += event._2
       // Get next request
       if (!hasBeenPulled(inlet)) pull(inlet)
     }

     // new window request
     override def onPull(): Unit = {
       if (!hasBeenPulled(inlet)) pull(inlet)
     }

     // New timer event
     override protected def onTimer(timerKey: Any): Unit = {
       // Timer event, see if we can submit some waitinfg events
       submitReadyWindows(System.currentTimeMillis())
     }

     // Support methods
     private def setup(time: Long): Unit = {
       buffer.clear()
       offsets.clear()
       windowStart = time
       windowEnd = windowStart + durationMS
     }

     private def submitReadyWindows(currentTime: Long): Unit = for (window ← completed) {
       if (window.isReady(currentTime)) {
         val commitable = CommittableOffsetBatch(window.committable)
         println(s"Send committable $commitable")
         // Make sure to remove feom completed
         completed -= window
         emit(outlet, (window.buffer, commitable))
       }
     }
   }
}

The rest of the windows implementations can be found in Github for Sliding window, SessionInactivityWindow and SessionValueWindow.

With these implementations in place we can now use them. Here is an example of usage of tumbling windows:

def runnableGraph() = sourceWithCommittableContext(in)
 .via(TumblingWindow[SimpleMessage](duration = 3.second, time_extractor = (msg) ⇒ msg.ts, 
watermark = 1.2.second))
 .map(records ⇒ {
   println("Got new Tumbling window")
   records.foreach(record ⇒ println(s"      time ${record.ts} - value ${record.value}"))
 })
 .to(committableSink(committerSettings))

Here we are running the source (messages from Kafka) through a tumbling window with a duration of 3 seconds, using event time (located in the ts element of the message) with the watermark of 1.2 seconds.

Usage of the sliding window is very similar:

def runnableGraph() = sourceWithCommittableContext(in)
 .via(SlidingWindow[SimpleMessage](duration = 3.second, slide = 1.5.second, 
time_extractor = (msg) ⇒ msg.ts, watermark = 1.2.second))
 .map(records ⇒ {
   println("Got new Sliding window")
   records.foreach(record ⇒ println(s"      time ${record.ts} - value ${record.value}"))
 })
 .to(committableSink(committerSettings))

The structure of the code, in this case is identical, and the window parameters are very similar as well, with addition of the slide parameter, defining when in the current window to start the next window. Usage of the session windows can be found in GitHub for SessionInactivityWindow and SessionValueWindow.

Using windowing in applications

Windowing is a very important technique in stream processing, but while using it it is important to understand, that it is not free. Windowing requires memory, typically proportional to the window size. The implementation allows you to limit the amount of memory used for windowing data (not offsets), but even with this, when designing windows, it is necessary to consider memory requirements for windows, to make sure that you have enough memory.

Windowing implementation described here allows users to leverage Akka Streams for cases when time/windowing functionality is required.

Check out my free eBook: Serving Machine Learning Models

Implementing stateful stream processing

Any application that processes a stream of events and does not just perform trivial record-at-a-time transformations needs to be stateful, with the ability to store and access intermediate data.

State in Flink

As we have mentioned above, stateful processing is one of the main capabilities of Flink. It supports two types of state: keyed state and operator state.

Keyed state can only be used in functions and operators on a KeyedStream, where each record has an explicit key. It can be thought of as a partitioned or sharded hashmap, with exactly one state-partition per key. Keyed state is organized into key groups. Key groups are atomic units by which Flink can redistribute keyed states. The amount of key groups is defined by application’s parallelism. Flink guarantees processing of all keys from a given key group in the same task manager.

For operator (non-keyed) state, each operator state is bound to one parallel operator instance. A good example of operator state can be found in Kafka Connector implementation - there is one instance of the connector running on every node. Each consumer maintains a map of topic partitions and offsets as its operator state. Operator state also supports redistributing state among parallel operator instances when the parallelism is changed.

Both keyed and operator state can exist in two forms: managed and raw.

Managed state is implemented by data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB. Flink’s runtime encodes the states and writes them into the checkpoints as part of checkpointing implementation. The types of managed state include ValueState, ListState, etc.

Raw state is a state that implementations keep in their own data structures. Flink knows nothing about the state’s data structures and sees only the raw bytes. When checkpointed, Flink only writes a sequence of bytes into the checkpoint.

State in Akka Streams

Although Akka Streams by itself is stateless, there are many ways to implement stateful streaming in Akka Streams by leveraging existing elements like statefulMapConcat or a custom graph stage. However, the most straightforward and natural way of doing this is integration with Akka actors.

In this case, Akka Streams manage the overall execution flow, while individual actors both maintain state and implement stateful operations. Coupled with Akka Persistence, this approach provides a great foundation of stateful, fault-tolerant stream processing using Akka streams.

Here we will use a simplified version of TaxiRide application to show how stateful stream processing can be implemented in Cloudflow using both Flink and Akka Streams runtimes.

Implementing stateful stream processing using Flink

The main part of the Flink implementation is a processor that reads two streams–taxi ride and taxi fare–and for each rideID calculates the fare for a given ride.

The actual merge is implemented by a rich function - RichCoFlatMapFunction, that leverages shared keyed state:

class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, TaxiFare, TaxiRideFare] {

 @transient var rideState: ValueState[TaxiRide] = null
 @transient var fareState: ValueState[TaxiFare] = null

 override def open(params: Configuration): Unit = {
   super.open(params)
  rideState = 
getRuntimeContext.getState(new ValueStateDescriptor[TaxiRide]("saved ride", classOf[TaxiRide]))
   fareState = getRuntimeContext.getState(new ValueStateDescriptor[TaxiFare]("saved fare", classOf[TaxiFare]))
 }

 override def flatMap1(ride: TaxiRide, out: Collector[TaxiRideFare]): Unit = {
   val fare = fareState.value
   if (fare != null) {
     fareState.clear()
     out.collect(new TaxiRideFare(ride.rideId, fare.totalFare))
   } else {
     rideState.update(ride)
   }
 }

 override def flatMap2(fare: TaxiFare, out: Collector[TaxiRideFare]): Unit = {
   val ride = rideState.value
   if (ride != null) {
     rideState.clear()
     out.collect(new TaxiRideFare(ride.rideId, fare.totalFare))
   } else {
     fareState.update(fare)
   }
 }
}

This function is used by a main logic of the Flink implementation:

override def createLogic() = new FlinkStreamletLogic {
 override def buildExecutionGraph = {
   val rides: DataStream[TaxiRide] =
     readStream(inTaxiRide)
       .filter { ride ⇒
         ride.isStart.booleanValue
       }
       .keyBy("rideId")

   val fares: DataStream[TaxiFare] =
     readStream(inTaxiFare)
       .keyBy("rideId")

   val processed: DataStream[TaxiRideFare] =
     rides
       .connect(fares)
       .flatMap(new EnrichmentFunction)

   writeStream(out, processed)
 }
}

Here we read both streams, key them by the same key (ride ID) and connect these streams using EnrichmentFunction (shown above).

The overall implementation is quite simple, very expressive and easy to read. Because Cloudflow is running a Flink server with checkpointing, we are guaranteed that the state here is persistent, which means that in the case of restart of any of the task manager the state will be restored. In the current version (Cloudflow is currently on Flink v1.10), this does not ensure the full high availability–in the case of job manager failures, the snapshots are not getting restored.

High availability for Flink kubernetes deployments was introduced in Flink 1.12. Once cloudflow supports this latest Flink version, then full high availability for Flink stateful stream processing will be in place.

Implementing stateful stream processing using Akka Streams

As described above, Akka Streams stateful streaming is implemented using Akka Actors with cluster sharding. To use Kafka aware sharding efficiently, both streams should come from the same topic, to ensure that their sharding is the same. As a result our Akka Streams processor implementation has to contain two streamlets: streams merger and the actual processor.

For merger implementation we introduce an additional data type allowing to send either taxi ride and taxi fare (we are using protobufs here):

message TaxiRideOrFare {
   int64 rideId = 1;
   oneof message_type {
       TaxiFare fare = 2;
       TaxiRide ride = 3;
   }
}

With this new combined data type the implementation of the merger streamlet is straightforward:

class MessageMerger extends AkkaStreamlet {

 val inTaxiRide = ProtoInlet[TaxiRide]("in-taxiride")
 val inTaxiFare = ProtoInlet[TaxiFare]("in-taxifare")
 val out        = ProtoOutlet[TaxiRideOrFare]("taxirideorfare", _.rideId.toString)

 val shape = StreamletShape.withInlets(inTaxiRide, inTaxiFare).withOutlets(out)

 override protected def createLogic(): AkkaStreamletLogic = new RunnableGraphStreamletLogic() {

   def runnableGraph = {

     Merger.source(Seq(
       sourceWithCommittableContext(inTaxiFare).map(f => TaxiRideOrFare(rideId = f.rideId, messageType = MessageType.Fare(f))),
       sourceWithCommittableContext(inTaxiRide).map(r => TaxiRideOrFare(rideId = r.rideId, messageType = MessageType.Ride(r)))
     )).to(committableSink(out))
   }
 }
}

Once we have a combined input, similar to Flink, the implementation, an Akka Streams implementation is comprised of two parts - the actor, implementing data merger (compare with EnrichmentFunction implementation for Flink) and the main logic leveraging this actor (compare with implementation of main logic for Flink). The actor can be implemented as follows:

object RideShare {
def apply(rideid: String): Behavior[ProcessMessage] = {

   // Execute behavior with the current state
   def executionstate(rideState: Option[TaxiRide], fareState: Option[TaxiFare]): Behavior[ProcessMessage] = {
     // Behaviour describes processing of the actor
     Behaviors.receive { (context, msg) => {
       msg.record.messageType match {
         // Ride message
         case MessageType.Ride(ride) =>
           fareState match {
             case Some(fare) =>
               // We have a fare with the same ride ID - produce result
               msg.reply ! Some(TaxiRideFare(ride.rideId, fare.totalFare))
               executionstate(rideState, None)
             case _ =>
               // Remember the ride to be used when the fare with the same ride ID arrives
               msg.reply ! None
               executionstate(Some(ride), fareState)
           }

         // Fare message
         case MessageType.Fare(fare) =>
           rideState match {
             case Some(ride) =>
               // We have a ride with the same ride ID - produce result
               msg.reply ! Some(TaxiRideFare(ride.rideId, fare.totalFare))
               executionstate(None, fareState)
             case None =>
               // Remember the fare to be used when the ride with the same ride ID arrives
               msg.reply ! None
               executionstate(rideState, Some(fare))
           }

         // Unknown message type - ignore
         case MessageType.Empty =>
           executionstate(rideState, fareState)
       }
     }}
   }
   // Initialize state
   executionstate(None, None)
 }
}
}

We can see that this implementation is virtually identical to the rich function in Flink. The two main differences are:

  • Flink implementation provides a separate method for every stream, currently limited to two, while Akka implementation is using match thus supporting as many message types as required.
  • While Flink implementation is using mutable state, backed up by the Flink's snapshotting, in Akka, state itself is immutable: behavior execution returns a new behavior containing a new state after a message is being handled.

With the actor in place, it can be used to implement a main logic:

override protected def createLogic(): AkkaStreamletLogic = new RunnableGraphStreamletLogic() {

 val typeKey = EntityTypeKey[ProcessMessage]("RideShare")

 val entity = Entity(typeKey)(createBehavior = entityContext => RideShare(entityContext.entityId))

 val sharding = clusterSharding()

 def runnableGraph = {
   shardedSourceWithCommittableContext(inTaxiMessage, entity).via(messageFlow).to(committableSink(out))
 }

 private def messageFlow =
   FlowWithCommittableContext[TaxiRideOrFare]
     .mapAsync(1)(msg ⇒ {
       val actor = sharding.entityRefFor(typeKey, msg.rideId.toString)
       actor.ask[Option[TaxiRideFare]](ref => ProcessMessage(ref, msg))
     }).collect{ case Some(v) => v }
}

Unlike Flink implementation, here there is no explicit keyBy clause, instead it relies explicitly on the key from the incoming message and chooses the actor’s instance based on this key. This is done to ensure the locality of the actors for a running instance Akka Streams (see scaling applications below)

KeyBy in Flink and Akka Streams

Flink data model is not based on key-value pairs. Therefore, it does not require physically packing the data set types into keys and values. Keys are “virtual”: they are defined as functions over the actual data (not the message key) to guide the grouping operator. Although this is also a case in Akka Stream implementation, in order to ensure “locality” of the actor, key has to be defined at the message level and effectively is a key in the Kafka message. Of course sharding can be defined differently, but this will lead to cross instance communications.

Additionally, unlike Flink implementation, where streams are connected before invoking RichCoFlatMapFunction, in the case of Akka, we do merge as a separate streamlet and then process messages, which can contain either ride or fare information.

This implementation works, but it lacks persistence. All of the object state is in memory and will be destroyed on an instance restart. Let's take a look at how to add persistence to this implementation.

Adding persistence to Akka Streams

One of the important features of the Akka toolkit is its richness. It includes a wealth of features, including persistence. Persistence here allows us to persist the state of actors2, which enables both restore actors in the case of implementation failures and move actors from one instance to another (we will talk about it later when we will discuss scaling).

Akka Persistence supports multiple options for the database backend, including Cassandra and several relational databases: Postgres, MySQL, H2, Oracle, SQL Server. For our implementation we decided to use Postgres database. The first step for persistence implementation is to add the following additional jars to our build:

val akkaVersion     = "2.6.10"
val jdbcVersion     = "4.0.0"
val postgreVersion  = "42.2.16"

     "com.typesafe.akka"         %% "akka-persistence-typed"       % akkaVersion,
     "com.typesafe.akka"         %% "akka-serialization-jackson"   % akkaVersion,
     "com.typesafe.akka"         %% "akka-persistence-query"       % akkaVersion,
     "com.lightbend.akka"        %% "akka-persistence-jdbc"        % jdbcVersion,
     "org.postgresql"            % "postgresql"                    % postgreVersion,

Setting up Postgres for Akka Persistence

  • Deploy Postgres to store persistence. Note that Cloudflow does not provide any built in support for that, so you need to do it yourself. For local testing on Mac, you can install it with Homebrew following this post. Alternatively for local cases you can use H2
  • For cluster, you can use Postgres Helm chart, following this post
  • Create tables for Akka persistence, following this documentation
  • Create application config for Postgres instance following this make sure that you merge it with default Cloudflow Akka configuration locally or cluster.

Additionally we need to create and initialize our Postgres database. With this in place, we need to first modify the actor's implementation following Cluster sharding persistence documentation. We first need to create case classes used for implementation:

// Command
final case class TaxiRideMessage(reply: ActorRef[Option[TaxiRideFare]], record : TaxiRideOrFare) extends CborSerializable
// Event
final case class TaxiRideEvent(state : TaxiState) extends CborSerializable
// State
final case class TaxiState(rideState: Option[TaxiRide], fareState: Option[TaxiFare]) extends CborSerializable{
 def reset(updated: TaxiState): TaxiState = copy(updated.rideState, updated.fareState)
}

With this in place actor’s implementation looks as follows:

object RideShare{

 private val commandHandler: (TaxiState, TaxiRideMessage) => Effect[TaxiRideEvent, TaxiState] = { (state, cmd) =>
   cmd match {
     case cmd: TaxiRideMessage => processTaxiMessage(state, cmd)
   }
 }

 private val eventHandler: (TaxiState, TaxiRideEvent) => TaxiState = {
   (state, evt) => state.reset(evt.state)
 }

 def apply(entityId: String, persistenceId: PersistenceId): Behavior[TaxiRideMessage] = {
   Behaviors.setup { context =>
     EventSourcedBehavior(persistenceId, emptyState = TaxiState(None, None), commandHandler, eventHandler)
       .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 50, keepNSnapshots = 3))
   }
 }
}

The implementation consists of three main parts:

  • Command handler which defines how to handle command by producing Effects e.g. persisting events, stopping the persistent actor, etc. In our implementation, command handler is using processTaxiMessage, which we will discuss below
  • EventHandler returns the new state given the current state when an event has been persisted.
  • Apply method creates an instance of an actor. Note that during creation we are defining a snapshot strategy (creating a snapshot after every 50 events). Usage of snapshotting here is necessary3 to ensure timely restoration of the actor’s state.

The implementation above is using process taxi message to implement taxi ride specific logic:

private def processTaxiMessage(state: TaxiState, cmd: TaxiRideMessage): Effect[TaxiRideEvent, TaxiState] = {
   def processMessage(): (TaxiState, Option[TaxiRideFare]) =
     cmd.record.messageType match {
       // Ride message
       case MessageType.Ride(ride) =>
         state.fareState match {
           case Some(fare) =>
             // We have a fare with the same ride ID - produce result
             (TaxiState(state.rideState, None), Some(TaxiRideFare(ride.rideId, fare.totalFare)))
           case _ =>
             // Remeber the ride to be used when the fare with the same ride ID arrives
             (TaxiState(Some(ride), state.fareState), None)
         }
       // Fare message
       case MessageType.Fare(fare) =>
         state.rideState match {
           case Some(ride) =>
             // We have a ride with the same ride ID - produce result
             (TaxiState(None, state.fareState), Some(TaxiRideFare(ride.rideId, fare.totalFare)))
           case None =>
             // Remeber the fare to be used when the ride with the same ride ID arrives
             (TaxiState(state.rideState, Some(fare)), None)
         }
       // Unknown message type - ignore
       case MessageType.Empty =>
         (state, None)
     }

   // Calculate new State and reply
   val stateWithReply = processMessage()
   // Persist state and send reply
   Effect.persist(TaxiRideEvent(stateWithReply._1)).thenRun(state => cmd.reply ! stateWithReply._2)
 }

This implementation is very similar to the original one, with the difference that we are using the persist effect to store data. Finally we need to modify streamlet implementation slightly to add persistence ID

override protected def createLogic(): AkkaStreamletLogic = new RunnableGraphStreamletLogic() {
 val typeKey = EntityTypeKey[TaxiRideMessage]("RideShare")

 val entity = Entity(typeKey)(createBehavior = entityContext =>
   RideShare(entityContext.entityId, PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId)))

 val sharding = clusterSharding()

 def runnableGraph = {
   shardedSourceWithCommittableContext(inTaxiMessage, entity).via(messageFlow).to(committableSink(out))
 }

 private def messageFlow =
   FlowWithCommittableContext[TaxiRideOrFare]
     .mapAsync(1)(msg ⇒ {
       val actor = sharding.entityRefFor(typeKey, msg.rideId.toString)
       actor.ask[Option[TaxiRideFare]](ref => TaxiRideMessage(ref, msg))
     }).collect{ case Some(v) => v }
}

Adding persistence to our Akka streams implementation makes it fully reliable. In the case of any application restarts, the state will be fully restored. This implementation is slightly less readable compared to the Flink one, it provides the same functionality, and even better reliability.

Optimizing the Akka Streams implementation

The application that we are using is a little bit unusual. In our application, the new keys are constantly created and are fairly short lived. As a result, actors are being created and garbage collection is happening all the time.

To avoid this constant churn of actors, we can remember that Akka cluster sharding is synchronised with Kafka partitioning. This means that we can create an actor per Kafka partition4, holding all the state for this partition.

To do this let's remember that default Kafka partitioner uses a 32-bit murmur2 hash to compute the partition id based on the key (bytes) and the amount of partition). If we assume that the default partitioner is used, we can implement key calculation as following:

def convertRideToPartition(rideID : Long) : String = {
 // hash the keyBytes to choose a partition
 val bytes = BigInt(rideID).toByteArray
 val converted = Utils.toPositive(Utils.murmur2(bytes)) % numberOfPartitions
 converted.toString
}

With this function for calculating actor key we can modify message flow to pick the actor id based on the shards:

private def messageFlow =
   FlowWithCommittableContext[TaxiRideOrFare]
     .mapAsync(1)(msg ⇒ {
       val rideId = if(msg.messageType.isFare) msg.messageType.fare.get.rideId else msg.messageType.ride.get.rideId
       val actor = sharding.entityRefFor(typeKey, KafkaSupport.convertRideToPartition(rideId))
       actor.ask[Option[TaxiRideFare]](ref => TaxiRideMessage(ref, msg))
     }).collect{ case Some(v) => v }
}

The implementation of the actor itself has to be changed as well. First we need to change supporting state classes to reflect that the state now is not Options, but rather Maps, that can contain information for multiple ride ids belonging to a given partition.

// Command
final case class TaxiRideMessage(reply: ActorRef[Option[TaxiRideFare]], record : TaxiRideOrFare) extends CborSerializable
// Event
final case class TaxiRideEvent(state : TaxiState) extends CborSerializable
// State
final case class TaxiState(rideState: Map[Long, TaxiRide], fareState: Map[Long, TaxiFare]) extends CborSerializable{
 def reset(updated: TaxiState): TaxiState = copy(updated.rideState, updated.fareState)
}

With this in place, both command and event handlers state unchanged, while an apply method has to change to reflect a new state definition:

def apply(entityId: String, persistenceId: PersistenceId): Behavior[TaxiRideMessage] = {
 Behaviors.setup { context =>
   EventSourcedBehavior(persistenceId, emptyState = TaxiState(Map(), Map()), commandHandler, eventHandler)
     .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 50, keepNSnapshots = 3))
 }
}

The biggest change in this case is in the process taxi message:

private def processTaxiMessage(state: TaxiState, cmd: TaxiRideMessage): Effect[TaxiRideEvent, TaxiState] = {
 def processMessage(): (TaxiState, Option[TaxiRideFare]) =
   cmd.record.messageType match {
     // Ride message
     case MessageType.Ride(ride) =>
       state.fareState.get(ride.rideId) match {
         case Some(fare) =>
           // We have a fare with the same ride ID - produce result
           (TaxiState(state.rideState, state.fareState - ride.rideId), Some(TaxiRideFare(ride.rideId, fare.totalFare)))
         case _ =>
           // Remeber the ride to be used when the fare with the same ride ID arrives
           (TaxiState(state.rideState + (ride.rideId -> ride), state.fareState), None)
       }
     // Fare message
     case MessageType.Fare(fare) =>
       state.rideState.get(fare.rideId) match {
         case Some(ride) =>
           // We have a ride with the same ride ID - produce result
           (TaxiState(state.rideState - ride.rideId, state.fareState), Some(TaxiRideFare(ride.rideId, fare.totalFare)))
         case None =>
           // Remeber the fare to be used when the ride with the same ride ID arrives
           (TaxiState(state.rideState, state.fareState + (fare.rideId -> fare)), None)
       }
     // Unknown message type - ignore
     case MessageType.Empty =>
       (state, None)
   }

 // Calculate new State and reply
 val stateWithReply = processMessage()
 // Persist state and send reply
 Effect.persist(TaxiRideEvent(stateWithReply._1)).thenRun(state => cmd.reply ! stateWithReply._2)
}

As we have shown here, we can successfully use Akka Streams runtime (along with Akka cluster) for implementing reliable stateful streaming applications.

Learn more in this O'Reilly eBook: Serving Machine Learning Models

Scaling Flink and Akka Streams

When it comes to scaling of the streaming applications, Flink and Akka Streams runtimes implement it completely differently.

In the case of Flink, an implementation is running inside the Flink cluster. As defined here, a Flink execution is split into several parallel instances for execution and each parallel instance processes a subset of the task’s input data. The number of parallel instances of a task is called its parallelism. So scaling of Flink based applications requires changing of parallelism–and potentially the amount and resources of the task manager, where applications are running.

Currently there is no way to rescale a Flink job dynamically. Any scaling of Flink’s job requires stopping a cluster and restarting it. In order to preserve execution state for such situations, Flink provides a savepoint mechanism, which is a way to create a consistent image of the execution state of a streaming job. So a scaling operation in Flink involves taking a savepoint, stopping the complete job and restarting it from this savepoint with new parallelism and potentially new resources. Currently Cloudflow does not support savepointing, so scaling Flink in Cloudflow leads to the loss of state.

In the case of Akka Streams, scaling is based on Kafka consumer groups, a set of consumers that cooperate to consume data from some topics. By default all instances of a given service belong to the same consumer group.

The partitions of all the topics are divided among the consumers in the group. As new group members arrive and old members leave, the partitions are re-assigned so that each member receives a proportional share of the partitions. This is known as “rebalancing the group”.

Because our implementation is leveraging Kafka-aware sharding, all of the actors for a given partition will be relocated to the instance receiving messages from this partition without losing state. As a result, scaling of the Akka streams based streaming stateful applications entails adding/removing a new instance of an application.

Scaling with Akka Streams happens with no interruption to the execution.

Conclusion

In this blog post we have looked at some of the features of Flink and Akka Streams runtimes for Cloudflow and showed how they can be implemented. Both runtimes have their strengths and weaknesses, and we hope that examples in this blog post will allow you to make a more informed choice, while building your own implementations.


1 Compare to Map Reduce architecture

2 The only state in our implementation is contained in actors, as a result persisting the state of actors is sufficient to make overall implementation persistent.

3 Without snapshotting, an actor’s restore will replay back all of the messages (commands) sent to the actor, which can take a considerable amount of time.

4 Kafka partitions are the basic unit of scalability (see below), that is why using partition-based actors is an optimal actor’s granularity.

Share



Filter by Tag