FREE Training Courses — 100+ Hours of Akka and Reactive Architecture Expertise Get Started
Support
cloud-native fast-data spark flink akka-streams kafka-streams

Managing Streaming And Queryable State In Spark, Akka Streams, Kafka Streams, And Flink

Boris Lublinsky Principal Architect, Lightbend, Inc.

Design Techniques for Building Streaming Data, Cloud-Native Applications

(20 Min read)

When implementing stream processing, there are some considerations when deciding whether your system should be stateless or stateful.

In stateless stream processing, the way each event is handled is completely independent from the system’s state. All processing is based on each incoming event. Given an event, the stream processor will process it exactly the same way every time, no matter what the state of the system is. For example, you can think of a bank teller that "processes" a stream of customers, one at a time.

In stateful stream processing, on the other hand, there is a “state” that impacts event processing. As a result, either static state or past events can influence the way incoming events are processed. For example, if you want to average the value for the last 10 minutes of events that occur every second, this means that you need to keep the state containing the last 600 events.

In order to implement stream processing, you need to define the way to keep and manage your state. A simplistic solution here is to use local, in-memory storage, but this creates the following problems:

  • Failover - restarting the stream processing system will not restore the state
  • Scaling - scaling up and down will typically require state to be moved from one processor to another.

An improved solution would be to use an external store (such as a database). This solves the problems with local storage and adds the ability to view state in real time. Unfortunately, it comes with its own drawbacks:

  • Adding additional components to the system, which will make the overall implementation more complex and consequently harder to manage.
  • Reliability concerns; if the external database goes down, the whole system is down
  • Performance implications - performance of the external store limits the performance of the overall stream processing. For example, it can become a bottleneck.

Another problem that usually arises with state is its visibility. There are many ways for accessing this state ranging from logging state changes to creation/addition of an external database where the state is stored (in addition to keeping it locally). An alternative approach - queryable state - was introduced by Kafka Streams. Queryable state allows you to make streams internal state stores visible to external queries. The benefits of this approach include:

  • Avoids duplicating data
  • Minimizes I/O overhead to communicate state externally, such as writes to a database
  • Leads to fewer moving pieces in the end-to-end architecture by avoiding unneeded state stores

Although this implementation was initially introduced in Kafka Streams, it was subsequently implemented in Flink, and it can be implemented in other streaming frameworks, too.

In this blog post we will take a look at the built-in support for state management and queryable state, when available, or how they can be implemented in two popular stream processing engines—Apache Spark and Apache Flink—and two popular stream processing libraries—Kafka Streams and Akka Streams.

Supporting state in stream processing engines

One of the important characteristics of state support in streaming engines is the fact that it manages state for all streaming data as a whole. As the engine scales, it also scales its state representation. Additionally stream processing engines are providing failover through checkpointing. Both Spark and Flink provide powerful support for state management, but the implementations are very different and provide different capabilities.

Supporting state in Apache Spark

Spark supports both batch and two flavors of stream processing - an extension of the core Spark API Spark Streaming and Spark Structured Streaming. Here we will only cover Spark Structured Streaming, which supports arbitrary stateful operations that can be used to implement stateful streams.

From: https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html

MapGroupsWithState introduced in Spark Structured Streaming supports bringing together both current data from the stream and evolving state to implement calculations, update the state, and return results. The main features of mapGroupsWithState are:

  • Keeping state between application updates - One of the biggest caveats of mapGroupsWithState is the fact that you are forced to drop the in-memory data between upgrades.
  • Internal data storage for state - mapGroupsWithState uses java.util.ConcurrentHashMap[K, V] which uses an underlying structure called UnsafeRow for both key and value instead of a plain JVM object. These unsafe rows are wrappers around the bytes of data generated by the encoder for the keys and values, and applies on-demand conversions between the unsafe representation to our JVM object structure (marshalling/unmarshalling) when it needs to pass the key value pair to our state function.
  • State versioning - the implementation uses a HDFSBackedStateStore per version of the state, meaning it only needs to keep the latest version of the state in memory while letting older versions reside in the backing state store, loading them as needed on demand.
  • Checkpointing - mapGroupsWithState checkpointing is done incrementally for updated keys only This means that there should be a significant reduction in checkpointing overhead.

An example of mapGroupsWithState usage can be found in this Lightbend tutorial on ML/AI model serving. In particular, the SparkStructuredModelServer class in this project defines a function that leverages state as follows:

def modelServing(key: String, values: Iterator[DataWithModel], state: GroupState[ModelState]) :
       Seq[ServingResult[Double]] = {
	    values.foreach(value => {
		………………………
       }
}

The following are the inputs and outputs defined here:

  • key - a key for which a function is invoked
  • values - an iterator of values for this key
  • state - the evolving state, in this case a machine learning model
  • result - the result returned from the function, in this case a record scored with the model. Note that a function can return a sequence of results, too.

GroupState[T] represents the state kept by Spark for a given group. In this case there is one instance of GroupState[T] per group. It defines the required methods that deal with the state lifecycle:

  • exists - tells if the state for given group was set. It returns false if a group is processed for the first time or when the state has just expired.
  • get - gets the state associated with given group. However it throws NoSuchElementException when the state doesn't exist. getOption is a safer method; it returns the state wrapped in an Option or None if it doesn't exist.
  • update(newState: S) - overrides existing state by the newState defined in the parameter.
  • remove() - removes the state.
  • hasTimedOut - returns true if the state has just expired
  • setTimeoutDuration(...) - defines the timeout value for processing-time configuration. It accepts either a Long (ms) or a stringified representation of time
  • setTimeoutTimestamp(...) - defines the timeout as milliseconds in epoch time

With modelServing in place, it can be invoked as follows:

val servingresultsstream = datamodelstream
    .filter(_.dataType.length > 0)
    .groupByKey(_.dataType) .mapGroupsWithState(GroupStateTimeout.NoTimeout())
                                            (modelServing).as[Seq[ServingResult[Double]]]
    .withColumn("value", explode($"value"))
    .select("value.name", "value.dataType", "value.duration", "value.result")

Here mapGroupsWithState takes two argument lists, each of which has a single parameter:

  • GroupStateTimeout - The kind of timeout. mapGroupsWithState accepts one of three different timeout values: no timeout, processing-time based or event-time based (which works only if a watermark is defined). Every time when at least one record is sent to the group, the timeout is updated. The state is considered as expired only when given group hasn't received any data during the specified threshold. The timeout is defined inside the specific GroupState object so it completely possible to specify different timeout configurations depending on processed group.
  • Processing function (see above), which defines how the incoming data will be processed in order to generate the state. This function is called when one of two conditions are met: either the group has new values to process or the state has expired. In this second case, the function is called with an empty list of values.

Currently there is no queryable state support available in Spark and there seems to be no work in the Spark community to add this support.

Supporting state in Apache Flink

Flink provides much richer support for state compared to Spark. It supports two types of state - keyed or operator.

Keyed state can only be used in functions and operators on a KeyedStream, where each record has an explicit key. It can be thought of as a partitioned or sharded hashmap, with exactly one state-partition per key.

Keyed state is organized into key groups. Key groups are atomic units by which Flink can redistribute keyed state 1. There are exactly as many key groups as the defined maximum parallelism. Flink guarantees processing of all keys in a given key group in a same task manager.

For operator (non-keyed) state, each operator state is bound to one parallel operator instance. A good example of operator state can be found in Kafka Connector implementation - there is one instance of the connector running on every node. Each consumer maintains a map of topic partitions and offsets as its operator state. Operator state also supports redistributing state among parallel operator instances when the parallelism is changed.

Both keyed and operator state can exist in two forms: managed and raw.

Managed state is implemented by data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB. Flink’s runtime encodes the states and writes them into the checkpoints as part of checkpointing implementation. The types of managed state include ValueState, ListState, etc.

From: https://www.slideshare.net/dataArtisans/apache-flink-training-working-with-state

Raw State is state that operators keep in their own data structures. Flink knows nothing about the state’s data structures and sees only the raw bytes. When checkpointed, Flink only writes a sequence of bytes into the checkpoint.

All datastream functions can use managed state, but the raw state interfaces can only be used when implementing operators. Flink documentation advocates usage of managed state (rather than raw state), which will allow Flink to automatically redistribute state when the parallelism is changed, and also do better memory management.

The example below, from this project, shows how to define and use a keyed state (class DataProcessorKeyed)

/* Definition */
var modelState: ValueState[ModelToServeStats] = _
var currentModel : ValueState[Option[Model[RECORD, RESULT]]] = _

/* Called when an instance is created */
override def open(parameters: Configuration): Unit = {

  val modelStateDesc = new ValueStateDescriptor[ModelToServeStats](
      "currentModelState",                         		// state name
      createTypeInformation[ModelToServeStats])       // type information
  modelState = getRuntimeContext.getState(modelStateDesc)

  val modelDesc = new ValueStateDescriptor[Option[Model[RECORD, RESULT]]](
      "currentModel",                               		// state name
      new ModelTypeSerializer[RECORD, RESULT])  // type information
  currentModel = getRuntimeContext.getState(modelDesc)
}
…………………………………………..
currentModel.value.foreach(_.cleanup())
// Update model
currentModel.update(Some(md))
modelState.update(ModelToServeStats(model))

The definition and usage of an operator state can be found in DataProcessorMap class:

private var currentModels = Map[String, (String,Model[RECORD, RESULT])]()
  // Checkpointing state
  @transient private var checkpointedState: ListState[ModelWithType[RECORD, RESULT]] = _

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    checkpointedState.clear()
    currentModels.foreach(entry => checkpointedState.add(new ModelWithType[RECORD, RESULT](entry._1, entry._2)))
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    val checkPointDescriptor = new ListStateDescriptor[ModelWithType[RECORD, RESULT]] (
        "modelState",
        new ModelWithTypeSerializer[RECORD, RESULT])
    checkpointedState = context.getOperatorStateStore.getListState (checkPointDescriptor)

    if (context.isRestored) {
      currentModels = Map(checkpointedState.get().iterator().asScala.toList.map(modelWithType =>
        (modelWithType.dataType -> modelWithType.modelWithName)): _*)
    }
  }

In addition to managing state, Flink also supports queryable state, but only for keyed state.

In order to use this functionality, a state needs to be explicitly made queryable by using one of two methods:

  • Using a QueryableStateStream, which is a convenience object which acts as a sink and offers its incoming values as queryable state.
  • Using the stateDescriptor.setQueryable(String queryableStateName) method, which makes the keyed state represented by the state descriptor, queryable.

An example below is from class DataProcessorKeyed

val modelStateDesc = new ValueStateDescriptor[ModelToServeStats](
      "currentModelState",                         			// state name
      createTypeInformation[ModelToServeStats])           	// type information
      modelStateDesc.setQueryable("currentModelState")   	// Expose it for queryable state

Flink’s implementation of queryable state consists of three main entities:

  1. QueryableStateClient, which runs outside the Flink cluster and allows users to submit queries and get the results back.
  2. QueryableStateClientProxy, which runs inside the Flink cluster (on each TaskManager) and is responsible for receiving the client’s queries, fetching the requested state from the responsible Task Manager on his behalf, and returning it to the client.
  3. QueryableStateServer, a local server responsible for accessing locally stored state running on each TaskManager.

The client connects to one of the proxies and sends a request for the state associated with a specific key, k. A proxy queries JobManager to find a TaskManager responsible for the key group holding k, It then query the QueryableStateServer running on that TaskManager for the state associated with k, and forward the response back to the client.

The implementation of the client leveraging queryable state (from the class ModelStateQuery) looks as follows:

val defaulttimeInterval = 1000 * 20        // 20 sec

 def query(job: String, keys: Seq[String], host: String = "127.0.0.1", port: Int = 9069,
            timeInterval: Long=defaulttimeInterval): Unit = {
    val jobId = JobID.fromHexString(job)
    val client = new QueryableStateClient(host, port)
    val descriptor = new ValueStateDescriptor[ModelToServeStats](
      "currentModel",   							// state name
      createTypeInformation[ModelToServeStats].createSerializer(new ExecutionConfig)
    )
    val keyType = BasicTypeInfo.STRING_TYPE_INFO
    val format       = "| %-50s | %-38s | %-19s | %8.5f | %3d | %3d |\n"
    val headerFormat = "| %-50s | %-38s | %-19s | %-8s | %-3s | %-3s |\n"
    printf(headerFormat, "Name", "Description", "Since", "Average", "Min", "Max")
    printf(headerFormat, "-" * 50, "-" * 38, "-" * 19, "-" * 8, "-" * 3, "-" * 3)
    while(true) {
      for (key <- keys) {
        try {
          val future = client.getKvState(jobId, "currentModelState", key, keyType, descriptor)
          val stats = future.join().value()
          printf(format, stats.name, stats.description,
            new DateTime(stats.since).toString("yyyy/MM/dd HH:MM:SS"),
            stats.duration/stats.usage, stats.min, stats.max)
        }
        catch {case e: Exception => e.printStackTrace()}
      }
      Thread.sleep(timeInterval)
    }
  }

Comparing state support in stream processing engines

When comparing state support in Spark vs Flink it is important to realize the differences in their architectures:

  • Flink: iterations are executed as cyclic data flows; a program (with all its operators) is scheduled just once and the data is fed back from the tail of an iteration to its head. This allows Flink to keep all additional data locally.
  • Spark: each iteration is a new set of tasks scheduled and executed. Each iteration operates on the result of the previous iteration which is held in memory. For each new execution cycle, the results have to be moved to the new execution processes.

These differences have a profound impact on state management features:

  • In Flink, all state data is kept locally, so arbitrarily complex structures can be used for its storage (although serializers are required for checkpointing). The serializers are only invoked out of band.
  • In Spark, all state data is stored external to each mini-batch in binary format, so it has to be marshaled/unmarshalled for every mini-batch (for every message in continuous execution) to make this data available for mapGroupsWithState function.
  • Additionally Spark Structured Streaming is based on SQL data types, which makes data storage even more complex, due to the complexities of converting between JVM and SQL data types.

Supporting state in streaming processing libraries

Both Kafka Streams and Akka Streams support the management of, and access to, state. When looking at the state implementation in library-based applications, it is important to understand their scaling considerations and approaches. One of the main characteristics is that they are JVM bound. As a result, they typically serve not a stream as a logical whole (compared to the streaming engines above), but rather a part of the stream; each instance might handle a single stream’s partition. Serving the whole data stream typically requires an ensemble of library-based application instances, where horizontal scaling must be handled manually by the user. This additional stream partitioning has an important impact on storing and accessing the state.

Supporting state in Kafka Streams

The introduction of Kafka Streams formalized a stream vs. table duality and two main abstractions that represent them - KStream, which is an abstraction of a record stream where the consumers see every record, and KTable, which is an abstraction of a changelog stream, where each data record represents an update (see for example the streams DSL).

From: https://www.slideshare.net/sbaltagi/kafka-streams-for-java-enthusiasts

KTable is implemented as a key/value state store - the foundation of state management in Kafka Streams. It can be used by stream processing applications to store and query data. Every task in Kafka Streams can have one or more state stores that can be accessed via APIs to store and query state data required for processing. Kafka Streams supports two types of state stores - a persistent key-value store based on RocksDB or an in-memory hashmap. There is also an option of implementing a custom key store, see below.

An example of a state store creation (using the Kafka Streams Processor APIs) can be found in this Lightbend tutorial. See, in particular, the StandardStoreStreamBuilder class:

val logConfig = new HashMap[String, String]
    val storeSupplier = Stores.inMemoryKeyValueStore(STORE_NAME)
    val storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.Integer, new ModelStateSerde).withLoggingEnabled(logConfig)
    val builder = new StreamsBuilderS
    …………………………………….
    builder.addStateStore(storeBuilder)

Once the store is created, processors can access it, e.g., the DataProcessor class:

override def init(context: ProcessorContext): Unit = {
    modelStore = context.getStateStore(STORE_NAME).asInstanceOf[KeyValueStore[Integer, StoreState]]
    Objects.requireNonNull(modelStore, "State store can't be null")
}
…………...
var state = modelStore.get(STORE_ID)
……………..
state.currentState = state.currentState.map(_.incrementUsage(duration))
……………...

As mentioned above, it is also possible to create a custom state store. An example of such an implementation can be found in the same Lightbend tutorial.

The state stores are backed by a special Kafka topic, thus providing fault-tolerance and automatic recovery. Log compaction is enabled on these topics so that old data can be purged safely to prevent the topics from growing indefinitely. When a Kafka Streams job is restarted, Kafka Streams guarantees that it will restore the associated state stores to the content before the failure, by replaying the corresponding changelog topics prior to resuming the processing in the newly-started tasks.

This means that the cost of task (re)initialization depends on the time for restoring the state by replaying the state stores' associated changelog topics, which can be fairly long. To minimize this restoration time, users can configure their applications to have standby replicas of local states. When a task migration happens, Kafka Streams then attempts to assign a task to an application instance where such a standby replica already exists in order to minimize the task (re-)initialization cost.

Also keep in mind that state store topic is partitioned the same way as the input stream, thus simplifying rescaling of the applications.

In the case when you have more then one input topic, make sure that all of your input topics are partitioned the same way. Keep in mind that a Kafka streams application only allows you to define one consumers group, which is applied to all consumers for all topics, in order to enforce the same partitioning requirement.

Kafka Streams also introduced queryable state - which allow you to leverage the state of an application (its state stores) from outside of the application. The full state of a Kafka Streams application is typically split across many distributed instances (as discussed above), and across many instances of the state store that are managed locally by the corresponding application instances.

Kafka Streams separates the queryable state implementation into two parts - access to local stores and access to remote stores.

An application instance can query the locally-managed portion of the state by directly querying its own local state stores. These queries are always read-only to guarantee that the underlying state stores will never be mutated out-of-band. An example of such a query can be found in previously-mentioned tutorial, the RestServiceStore class. It looks as follows:

val store = streams.store(STORE_NAME, standardStoreType)
if (store == null) throw new NotFoundException
complete(store.get(STORE_ID).currentState)

In addition, Kafka Streams supports queries for state store metadata, for example where a particular state store instance for a given name is running. An example of such a query can be found in the MetadataService class. It look as follows:

def streamsMetadataForStore(store: String, port: Int): util.List[HostStoreInfo] = {     
   val metadata = streams.allMetadataForStore(store).asScala.toSeq match{
      case list if !list.isEmpty => list
      case _ => Seq(new StreamsMetadata(
        new HostInfo("localhost", port),
        new util.HashSet[String](util.Arrays.asList(ApplicationKafkaParameters.STORE_NAME)), util.Collections.emptySet[TopicPartition]))
    }
    mapInstancesToHostStoreInfo(metadata)
  }
private def mapInstancesToHostStoreInfo(metadatas: Seq[StreamsMetadata]) = metadatas.map(convertMetadata(_)).asJava

  private def convertMetadata(metadata: StreamsMetadata) : HostStoreInfo = {
    val currentHost = metadata.host match{
      case host if host equalsIgnoreCase("localhost") =>
        try{InetAddress.getLocalHost.getHostAddress}
        catch {case t: Throwable => ""}
      case host => host
    }
     new HostStoreInfo(currentHost, metadata.port, metadata.stateStoreNames.asScala.toSeq)
  }

To obtain the full state of an application, it is necessary to connect to all of the instances containing parts of the state and then combine them together. The implementation contains the following steps:

  • Obtain the store’s metadata to discover all running instances of your application containing state stores of interest
  • Request the state store information from every instance
  • Combine all the requests together

This Lightbend Scala library provides a support framework and implementation examples for implementing these steps.

Supporting state in Akka Streams

There are many ways to implement stateful streaming in Akka Streams by leveraging existing elements like statefulMapConcat or a custom graph stage. However, the most straightforward and natural way of doing this is integration with Akka actors. In this case, Akka streams manages the overall execution flow, while individual actors both maintain state and implement stateful operations. Coupled with Akka persistence , this approach provides a great foundation of stateful, fault-tolerant stream processing using Akka streams.

Invoking an actor’s method from an Akka Streams flow is as simple as shown in the following example, taken from the model serving tutorial’s AkkaModelServer class.

.via(ActorFlow.ask(1)(modelServerManager)((elem, replyTo : ActorRef[Option[ServingResult[Double]]]) => new ScoreData(replyTo, elem)))

This example uses the new Akka typed API, which requires a few changes compared to the untyped API:

  • Explicit definition of the replyTo method
  • The replyTo method is typed. Note that the replyTo type is defined by the type of message that Akka streams is going to receive. It will change, even if the same actor is invoked, but reply is different.

An implementation of the actor itself follows this documentation. An example can be found here.

The above implementation works fine, but only supports in-memory state. To make state persistent, it is necessary to use a persistent actor. A complete walkthrough and example for Akka typed persistence is available in this Akka documentation.

Although Akka streams does not officially support queryable state, adding observability to individual actors (and consequently) state as a whole is quite straightforward. An implementation requires:

  • Adding an additional method supporting the actor’s state access. See the model serving tutorial’s ModelServerBehavior class for an example.
  • Create an Akka HTTP resource to access this method. See the tutorial’s QueriesAkkaHttpResource class for an example.
  • Start a REST server supporting this resource. See the tutorial’s AkkaModelServer class for an example.

Comparing state support in stream processing libraries

As mentioned above, the usage of streaming libraries leads to the architecture where instead of a single engine, you deploy an ensemble of executors each responsible for processing a specific portion (“partition”) of the stream. As a result, an execution state is partitioned across multiple executors. As a result, the important concerns in this case are:

  • Scalability, including support for rescaling and recreating the state
  • Failover and recreation the state
  • Combining observable state from individual machines

Both Kafka Streams and Akka Streams scalability in the case of reading from Kafka is based on scalability of Kafka itself. As long as the source Kafka topic is partitioned, both allow you to scale up to the amount of Kafka partitions and scale down to one. If the state is keyed based on the Kafka message keys, repartitioning of state will work correctly. While Kafka Streams ensures this, for Akka Streams it is up to developer to ensure this. Once the new instance of the application starts up, some of the Kafka topic’s partition are going to be redirected to it and state for the keys belonging to this partition will be populated in the state store (in the case of Kafka Streams) or actors with the appropriate key will be created (in the case of Akka Streams).

The difference here is how fast the state is going to be recreated. Both Kafka Streams (using backing topic) and Akka Streams (using event sourcing). This means that state restoration can be sufficiently long and is based on the amount of messages that have to be replayed for complete state restoration. Both Kafka and Akka Streams provide optimization techniques to speed up state restoration process - compaction for Kafka Streams) and snapshotting for Akka Streams.

The same mechanism works for failover in the case of both Kafka and Akka Streams.

Finally, when it comes to queryable state, Kafka Streams provides APIs for querying the state store and metadata from outside the application. Akka Streams, although does not provide “standard” state query API, makes it very simple to query an actors’ state. However, it does not provide any metadata query capability, because all the actors are instance bound. As a result, in order to combine the state “spread” between multiple instances, it is necessary to query every instance for the required information and then combine results.

Additional Resources

Last year, I wrote an O'Reilly eBook titled Serving Machine Learning Models: A Guide to Architecture, Stream Processing Engines, and Frameworks, which I encourage you to download for additional learning. I hope you enjoy it!

GET THE FREE EBOOK

 


Notes

1 This also means that if the keys for a stream are significantly skewed, Flink execution is going to have “hot” executors and state.