New Name - Cloudflow is now Akka Data Pipelines
In a world of network uncertainty, dropped connections, and really bad WiFi reception, RSocket—a message-driven, binary protocol that standardizes communication in the cloud— makes a perfect ingress for Cloudflow, an open-source project that enables you to quickly develop, orchestrate, and operate distributed streaming applications on Kubernetes.
In this three-part blog series, we'll show you how to get started using RSocket inlets with Cloudflow. In this part, we'll discuss how to implement RSocket-based ingress for Cloudflow. In Part 2, we’ll work through how to implement some of RSocket’s more advanced features. And in Part 3, we'll examine another important feature of RSocket: Pluggable transport, which was part of the original RSocket design.
Cloudflow allows you to build streaming applications as a set of small, composable components communicating over Kafka and wired together with schema-based contracts. This approach can significantly improve reuse and allows you to dramatically accelerate streaming application development.
A common challenge when building streaming applications is wiring all of the components together and testing them end-to-end before going into production. Cloudflow addresses this by allowing you to validate the connections between components and to run your application locally during development to avoid surprises during deployment.
Everything in Cloudflow is done in the context of an application, which represents a self-contained distributed system of data processing services connected together by data streams.
Cloudflow supports:
Connectivity of Cloudflow applications to external services is provided by inlets and outlets. Currently, Cloudflow supports HTTP and file inlets.
Although HTTP is widely used for these purposes, it only implements a request/response interaction model. While it has a lot of use cases, it was not designed for machine-to-machine communication. It is not uncommon for the applications to send data to each other without caring about the result of the operation (fire and forget) or stream data automatically when it becomes available (data streaming).
The second HTTP problem is the performance - a typical throughput of HTTP connections is a few hundred requests per second.
Both of the above issues led to the creation of RSocket that can be applied in various business scenarios.
As described in this article, RSocket is a:
...New, message-driven, binary protocol that standardizes the approach to communication in the cloud. It helps to resolve common application concerns in a consistent manner and has support for multiple languages (e.g., Java, JavaScript, and Python) and transport layers (TCP, WebSocket, Aeron).”
RSocket’s main characteristics are that it is1:
And RSocket supports the following communication styles:
What’s more, RSocket provides excellent performance. This article provides a comparison of RSocket with gRPC over HTTP and shows that RSocket excels in all categories including queries per second, latency, CPU consumption, and scalability.
Now we will present the implementation and usage of RSocket inlets for Cloudflow. We have implemented several versions of RSocket inlets including:
All of the above implement the abstract ServerStreamletLogic class provided by Cloudflow, specifically its run method to start a RSocket server. An example below, from binary fire and forget implementation (for the complete code see the GitHub repo), shows the actual code:
class RSocketBinaryStreamletLogic[out <: SpecificRecordBase](server: Server, schema: Schema, outlet: CodecOutlet[out])(implicit context: AkkaStreamletContext) extends ServerStreamletLogic(server) {
override def run(): Unit = {
RSocketServer.create(new RSocketBinaryAcceptorImpl(sinkRef(outlet), schema))
.bind(TcpServerTransport.create("0.0.0.0", containerPort))
.subscribe
println(s"Bound RSocket server to port $containerPort")
}
}
This implementation is using an acceptor class, which in this case looks like the following:
class RSocketBinaryAcceptorImpl[out <: SpecificRecordBase](writer: WritableSinkRef[out], schema: Schema) extends SocketAcceptor {
val dataConverter = new DataConverter[out](schema)
override def accept(setupPayload: ConnectionSetupPayload, reactiveSocket: RSocket): Mono[RSocket]
= Mono.just(new AbstractRSocket() {
override def fireAndForget(payload: Payload): Mono[Void] = {
dataConverter.fromByteBuffer(payload.getData).map(writer.write)
Mono.empty()
}
})
}
Here once a message is received, it is unmarshalled and then written to the writer, which in our case is writing to the streamlet outlet.
An implementation for the other two ingresses can be found here (for JSON fire and forget) and here (for binary streaming implementation). Please note that for streams in RSocket there is a request message that needs to be sent first to start the streaming.
The rest of the Cloudflow implementation is based on the Cloudflow introductory example. To send data to the Cloudflow application, we are providing an implementation of data provider (one per inlet implementation). For the binary fire and forget provider, the code looks as follows (for the complete code check out the GitHub repo):
class BinaryFireAndForget(host : String, port : Int) {
def run(): Unit = {
// Create client
val socket = RSocketConnector
.connectWith(TcpClientTransport.create(host, port))
.block
// Send messages
while (true) {
Thread.sleep(1000)
val payload = DefaultPayload.create(generateData())
socket.fireAndForget(payload).block
}
}
// Generate data
def generateData(): Array[Byte] = {
SensorDataConverter.toBytes(SensorDataGenerator.random())
}
}
Here we first create a socket connecting with our server and then send a server request every second.
An implementation for the other two data providers can be found here (for JSON fire and forget) and here (for binary streaming implementation).
A particular data provider can be configured using this application configuration and is controlled by the ProducerRunner class.
For reference, our simple Cloudflow pipeline implementation is running on a three node Kubernetes cluster on AWS EKS using T3.large instances. The below image shows throughput for the binary fire and forget interaction:
Here we see not an RSocket throughput, but rather the amount of messages that are produced by ingress (our ingress in this case is reading messages from RSocket and immediately writes them to Kafka). This provides a good indication of RSocket throughput.
Based on this image, we see a pretty steady throughput of about 4,000 messages per second. The request stream implementation provides about the same throughput.
Even in this brief example, RSocket has shown its worth in terms of speed, accuracy, and as a potential ingress to Cloudflow. Its support for binary streams and its protocols can lead to smaller packets, greater speed, and less latency. It also significantly simplifies the usage of the schema-first approach used throughout Cloudflow implementations.
In Part 2 of this series, we'll showcase in greater detail more RSocket advanced features including backpressure and load balancing.
RSocket is supported by the Reactive Foundation, an open source software foundation that enables developers to build reactive applications on cloud native infrastructure. Learn more about the foundation’s charter, and feel free to contribute to RSocket on GitHub.
1 See also this presentation for main RSocket characteristics. ↩
2 This can be easily extended to other marshallers, for example, Protobuf. Avro was chosen for ease of integration with the current cloudflow implementation, which at the moment of this writing is all based on Avro. ↩