(20 Min read)
When implementing stream processing, there are some considerations when deciding whether your system should be stateless or stateful.
In stateless stream processing, the way each event is handled is completely independent from the system’s state. All processing is based on each incoming event. Given an event, the stream processor will process it exactly the same way every time, no matter what the state of the system is. For example, you can think of a bank teller that "processes" a stream of customers, one at a time.
In stateful stream processing, on the other hand, there is a “state” that impacts event processing. As a result, either static state or past events can influence the way incoming events are processed. For example, if you want to average the value for the last 10 minutes of events that occur every second, this means that you need to keep the state containing the last 600 events.
In order to implement stream processing, you need to define the way to keep and manage your state. A simplistic solution here is to use local, in-memory storage, but this creates the following problems:
An improved solution would be to use an external store (such as a database). This solves the problems with local storage and adds the ability to view state in real time. Unfortunately, it comes with its own drawbacks:
Another problem that usually arises with state is its visibility. There are many ways for accessing this state ranging from logging state changes to creation/addition of an external database where the state is stored (in addition to keeping it locally). An alternative approach - queryable state - was introduced by Kafka Streams. Queryable state allows you to make streams internal state stores visible to external queries. The benefits of this approach include:
Although this implementation was initially introduced in Kafka Streams, it was subsequently implemented in Flink, and it can be implemented in other streaming frameworks, too.
In this blog post we will take a look at the built-in support for state management and queryable state, when available, or how they can be implemented in two popular stream processing engines—Apache Spark and Apache Flink—and two popular stream processing libraries—Kafka Streams and Akka Streams.
One of the important characteristics of state support in streaming engines is the fact that it manages state for all streaming data as a whole. As the engine scales, it also scales its state representation. Additionally stream processing engines are providing failover through checkpointing. Both Spark and Flink provide powerful support for state management, but the implementations are very different and provide different capabilities.
Spark supports both batch and two flavors of stream processing - an extension of the core Spark API Spark Streaming and Spark Structured Streaming. Here we will only cover Spark Structured Streaming, which supports arbitrary stateful operations that can be used to implement stateful streams.
MapGroupsWithState introduced in Spark Structured Streaming supports bringing together both current data from the stream and evolving state to implement calculations, update the state, and return results. The main features of mapGroupsWithState are:
An example of mapGroupsWithState usage can be found in this Lightbend tutorial on ML/AI model serving. In particular, the SparkStructuredModelServer class in this project defines a function that leverages state as follows:
def modelServing(key: String, values: Iterator[DataWithModel], state: GroupState[ModelState]) :
Seq[ServingResult[Double]] = {
values.foreach(value => {
………………………
}
}
The following are the inputs and outputs defined here:
GroupState[T] represents the state kept by Spark for a given group. In this case there is one instance of GroupState[T] per group. It defines the required methods that deal with the state lifecycle:
With modelServing
in place, it can be invoked as follows:
val servingresultsstream = datamodelstream
.filter(_.dataType.length > 0)
.groupByKey(_.dataType) .mapGroupsWithState(GroupStateTimeout.NoTimeout())
(modelServing).as[Seq[ServingResult[Double]]]
.withColumn("value", explode($"value"))
.select("value.name", "value.dataType", "value.duration", "value.result")
Here mapGroupsWithState takes two argument lists, each of which has a single parameter:
Currently there is no queryable state support available in Spark and there seems to be no work in the Spark community to add this support.
Flink provides much richer support for state compared to Spark. It supports two types of state - keyed or operator.
Keyed state can only be used in functions and operators on a KeyedStream, where each record has an explicit key. It can be thought of as a partitioned or sharded hashmap, with exactly one state-partition per key.
Keyed state is organized into key groups. Key groups are atomic units by which Flink can redistribute keyed state 1. There are exactly as many key groups as the defined maximum parallelism. Flink guarantees processing of all keys in a given key group in a same task manager.
For operator (non-keyed) state, each operator state is bound to one parallel operator instance. A good example of operator state can be found in Kafka Connector implementation - there is one instance of the connector running on every node. Each consumer maintains a map of topic partitions and offsets as its operator state. Operator state also supports redistributing state among parallel operator instances when the parallelism is changed.
Both keyed and operator state can exist in two forms: managed and raw.
Managed state is implemented by data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB. Flink’s runtime encodes the states and writes them into the checkpoints as part of checkpointing implementation. The types of managed state include ValueState, ListState, etc.
Raw State is state that operators keep in their own data structures. Flink knows nothing about the state’s data structures and sees only the raw bytes. When checkpointed, Flink only writes a sequence of bytes into the checkpoint.
All datastream functions can use managed state, but the raw state interfaces can only be used when implementing operators. Flink documentation advocates usage of managed state (rather than raw state), which will allow Flink to automatically redistribute state when the parallelism is changed, and also do better memory management.
The example below, from this project, shows how to define and use a keyed state (class DataProcessorKeyed)
/* Definition */
var modelState: ValueState[ModelToServeStats] = _
var currentModel : ValueState[Option[Model[RECORD, RESULT]]] = _
/* Called when an instance is created */
override def open(parameters: Configuration): Unit = {
val modelStateDesc = new ValueStateDescriptor[ModelToServeStats](
"currentModelState", // state name
createTypeInformation[ModelToServeStats]) // type information
modelState = getRuntimeContext.getState(modelStateDesc)
val modelDesc = new ValueStateDescriptor[Option[Model[RECORD, RESULT]]](
"currentModel", // state name
new ModelTypeSerializer[RECORD, RESULT]) // type information
currentModel = getRuntimeContext.getState(modelDesc)
}
…………………………………………..
currentModel.value.foreach(_.cleanup())
// Update model
currentModel.update(Some(md))
modelState.update(ModelToServeStats(model))
The definition and usage of an operator state can be found in DataProcessorMap class:
private var currentModels = Map[String, (String,Model[RECORD, RESULT])]()
// Checkpointing state
@transient private var checkpointedState: ListState[ModelWithType[RECORD, RESULT]] = _
override def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointedState.clear()
currentModels.foreach(entry => checkpointedState.add(new ModelWithType[RECORD, RESULT](entry._1, entry._2)))
}
override def initializeState(context: FunctionInitializationContext): Unit = {
val checkPointDescriptor = new ListStateDescriptor[ModelWithType[RECORD, RESULT]] (
"modelState",
new ModelWithTypeSerializer[RECORD, RESULT])
checkpointedState = context.getOperatorStateStore.getListState (checkPointDescriptor)
if (context.isRestored) {
currentModels = Map(checkpointedState.get().iterator().asScala.toList.map(modelWithType =>
(modelWithType.dataType -> modelWithType.modelWithName)): _*)
}
}
In addition to managing state, Flink also supports queryable state, but only for keyed state.
In order to use this functionality, a state needs to be explicitly made queryable by using one of two methods:
An example below is from class DataProcessorKeyed
val modelStateDesc = new ValueStateDescriptor[ModelToServeStats](
"currentModelState", // state name
createTypeInformation[ModelToServeStats]) // type information
modelStateDesc.setQueryable("currentModelState") // Expose it for queryable state
Flink’s implementation of queryable state consists of three main entities:
The client connects to one of the proxies and sends a request for the state associated with a specific key, k. A proxy queries JobManager to find a TaskManager responsible for the key group holding k, It then query the QueryableStateServer running on that TaskManager for the state associated with k, and forward the response back to the client.
The implementation of the client leveraging queryable state (from the class ModelStateQuery) looks as follows:
val defaulttimeInterval = 1000 * 20 // 20 sec
def query(job: String, keys: Seq[String], host: String = "127.0.0.1", port: Int = 9069,
timeInterval: Long=defaulttimeInterval): Unit = {
val jobId = JobID.fromHexString(job)
val client = new QueryableStateClient(host, port)
val descriptor = new ValueStateDescriptor[ModelToServeStats](
"currentModel", // state name
createTypeInformation[ModelToServeStats].createSerializer(new ExecutionConfig)
)
val keyType = BasicTypeInfo.STRING_TYPE_INFO
val format = "| %-50s | %-38s | %-19s | %8.5f | %3d | %3d |\n"
val headerFormat = "| %-50s | %-38s | %-19s | %-8s | %-3s | %-3s |\n"
printf(headerFormat, "Name", "Description", "Since", "Average", "Min", "Max")
printf(headerFormat, "-" * 50, "-" * 38, "-" * 19, "-" * 8, "-" * 3, "-" * 3)
while(true) {
for (key <- keys) {
try {
val future = client.getKvState(jobId, "currentModelState", key, keyType, descriptor)
val stats = future.join().value()
printf(format, stats.name, stats.description,
new DateTime(stats.since).toString("yyyy/MM/dd HH:MM:SS"),
stats.duration/stats.usage, stats.min, stats.max)
}
catch {case e: Exception => e.printStackTrace()}
}
Thread.sleep(timeInterval)
}
}
When comparing state support in Spark vs Flink it is important to realize the differences in their architectures:
These differences have a profound impact on state management features:
Both Kafka Streams and Akka Streams support the management of, and access to, state. When looking at the state implementation in library-based applications, it is important to understand their scaling considerations and approaches. One of the main characteristics is that they are JVM bound. As a result, they typically serve not a stream as a logical whole (compared to the streaming engines above), but rather a part of the stream; each instance might handle a single stream’s partition. Serving the whole data stream typically requires an ensemble of library-based application instances, where horizontal scaling must be handled manually by the user. This additional stream partitioning has an important impact on storing and accessing the state.
The introduction of Kafka Streams formalized a stream vs. table duality and two main abstractions that represent them - KStream, which is an abstraction of a record stream where the consumers see every record, and KTable, which is an abstraction of a changelog stream, where each data record represents an update (see for example the streams DSL).
KTable is implemented as a key/value state store - the foundation of state management in Kafka Streams. It can be used by stream processing applications to store and query data. Every task in Kafka Streams can have one or more state stores that can be accessed via APIs to store and query state data required for processing. Kafka Streams supports two types of state stores - a persistent key-value store based on RocksDB or an in-memory hashmap. There is also an option of implementing a custom key store, see below.
An example of a state store creation (using the Kafka Streams Processor APIs) can be found in this Lightbend tutorial. See, in particular, the StandardStoreStreamBuilder class:
val logConfig = new HashMap[String, String]
val storeSupplier = Stores.inMemoryKeyValueStore(STORE_NAME)
val storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.Integer, new ModelStateSerde).withLoggingEnabled(logConfig)
val builder = new StreamsBuilderS
…………………………………….
builder.addStateStore(storeBuilder)
Once the store is created, processors can access it, e.g., the DataProcessor class:
override def init(context: ProcessorContext): Unit = {
modelStore = context.getStateStore(STORE_NAME).asInstanceOf[KeyValueStore[Integer, StoreState]]
Objects.requireNonNull(modelStore, "State store can't be null")
}
…………...
var state = modelStore.get(STORE_ID)
……………..
state.currentState = state.currentState.map(_.incrementUsage(duration))
……………...
As mentioned above, it is also possible to create a custom state store. An example of such an implementation can be found in the same Lightbend tutorial.
The state stores are backed by a special Kafka topic, thus providing fault-tolerance and automatic recovery. Log compaction is enabled on these topics so that old data can be purged safely to prevent the topics from growing indefinitely. When a Kafka Streams job is restarted, Kafka Streams guarantees that it will restore the associated state stores to the content before the failure, by replaying the corresponding changelog topics prior to resuming the processing in the newly-started tasks.
This means that the cost of task (re)initialization depends on the time for restoring the state by replaying the state stores' associated changelog topics, which can be fairly long. To minimize this restoration time, users can configure their applications to have standby replicas of local states. When a task migration happens, Kafka Streams then attempts to assign a task to an application instance where such a standby replica already exists in order to minimize the task (re-)initialization cost.
Also keep in mind that state store topic is partitioned the same way as the input stream, thus simplifying rescaling of the applications.
In the case when you have more then one input topic, make sure that all of your input topics are partitioned the same way. Keep in mind that a Kafka streams application only allows you to define one consumers group, which is applied to all consumers for all topics, in order to enforce the same partitioning requirement.
Kafka Streams also introduced queryable state - which allow you to leverage the state of an application (its state stores) from outside of the application. The full state of a Kafka Streams application is typically split across many distributed instances (as discussed above), and across many instances of the state store that are managed locally by the corresponding application instances.
Kafka Streams separates the queryable state implementation into two parts - access to local stores and access to remote stores.
An application instance can query the locally-managed portion of the state by directly querying its own local state stores. These queries are always read-only to guarantee that the underlying state stores will never be mutated out-of-band. An example of such a query can be found in previously-mentioned tutorial, the RestServiceStore class. It looks as follows:
val store = streams.store(STORE_NAME, standardStoreType)
if (store == null) throw new NotFoundException
complete(store.get(STORE_ID).currentState)
In addition, Kafka Streams supports queries for state store metadata, for example where a particular state store instance for a given name is running. An example of such a query can be found in the MetadataService class. It look as follows:
def streamsMetadataForStore(store: String, port: Int): util.List[HostStoreInfo] = {
val metadata = streams.allMetadataForStore(store).asScala.toSeq match{
case list if !list.isEmpty => list
case _ => Seq(new StreamsMetadata(
new HostInfo("localhost", port),
new util.HashSet[String](util.Arrays.asList(ApplicationKafkaParameters.STORE_NAME)), util.Collections.emptySet[TopicPartition]))
}
mapInstancesToHostStoreInfo(metadata)
}
private def mapInstancesToHostStoreInfo(metadatas: Seq[StreamsMetadata]) = metadatas.map(convertMetadata(_)).asJava
private def convertMetadata(metadata: StreamsMetadata) : HostStoreInfo = {
val currentHost = metadata.host match{
case host if host equalsIgnoreCase("localhost") =>
try{InetAddress.getLocalHost.getHostAddress}
catch {case t: Throwable => ""}
case host => host
}
new HostStoreInfo(currentHost, metadata.port, metadata.stateStoreNames.asScala.toSeq)
}
To obtain the full state of an application, it is necessary to connect to all of the instances containing parts of the state and then combine them together. The implementation contains the following steps:
This Lightbend Scala library provides a support framework and implementation examples for implementing these steps.
There are many ways to implement stateful streaming in Akka Streams by leveraging existing elements like statefulMapConcat or a custom graph stage. However, the most straightforward and natural way of doing this is integration with Akka actors. In this case, Akka streams manages the overall execution flow, while individual actors both maintain state and implement stateful operations. Coupled with Akka persistence , this approach provides a great foundation of stateful, fault-tolerant stream processing using Akka streams.
Invoking an actor’s method from an Akka Streams flow is as simple as shown in the following example, taken from the model serving tutorial’s AkkaModelServer class.
.via(ActorFlow.ask(1)(modelServerManager)((elem, replyTo : ActorRef[Option[ServingResult[Double]]]) => new ScoreData(replyTo, elem)))
This example uses the new Akka typed API, which requires a few changes compared to the untyped API:
An implementation of the actor itself follows this documentation. An example can be found here.
The above implementation works fine, but only supports in-memory state. To make state persistent, it is necessary to use a persistent actor. A complete walkthrough and example for Akka typed persistence is available in this Akka documentation.
Although Akka streams does not officially support queryable state, adding observability to individual actors (and consequently) state as a whole is quite straightforward. An implementation requires:
As mentioned above, the usage of streaming libraries leads to the architecture where instead of a single engine, you deploy an ensemble of executors each responsible for processing a specific portion (“partition”) of the stream. As a result, an execution state is partitioned across multiple executors. As a result, the important concerns in this case are:
Both Kafka Streams and Akka Streams scalability in the case of reading from Kafka is based on scalability of Kafka itself. As long as the source Kafka topic is partitioned, both allow you to scale up to the amount of Kafka partitions and scale down to one. If the state is keyed based on the Kafka message keys, repartitioning of state will work correctly. While Kafka Streams ensures this, for Akka Streams it is up to developer to ensure this. Once the new instance of the application starts up, some of the Kafka topic’s partition are going to be redirected to it and state for the keys belonging to this partition will be populated in the state store (in the case of Kafka Streams) or actors with the appropriate key will be created (in the case of Akka Streams).
The difference here is how fast the state is going to be recreated. Both Kafka Streams (using backing topic) and Akka Streams (using event sourcing). This means that state restoration can be sufficiently long and is based on the amount of messages that have to be replayed for complete state restoration. Both Kafka and Akka Streams provide optimization techniques to speed up state restoration process - compaction for Kafka Streams) and snapshotting for Akka Streams.
The same mechanism works for failover in the case of both Kafka and Akka Streams.
Finally, when it comes to queryable state, Kafka Streams provides APIs for querying the state store and metadata from outside the application. Akka Streams, although does not provide “standard” state query API, makes it very simple to query an actors’ state. However, it does not provide any metadata query capability, because all the actors are instance bound. As a result, in order to combine the state “spread” between multiple instances, it is necessary to query every instance for the required information and then combine results.
Last year, I wrote an O'Reilly eBook titled Serving Machine Learning Models: A Guide to Architecture, Stream Processing Engines, and Frameworks, which I encourage you to download for additional learning. I hope you enjoy it!
1 This also means that if the keys for a stream are significantly skewed, Flink execution is going to have “hot” executors and state. ↩