FREE Training Courses — 100+ Hours of Akka and Reactive Architecture Expertise Get Started
Support
cloudflow streaming

Implementing RSocket Ingress in Cloudflow - Part 1: Getting Started

Boris Lublinsky Principal Architect, Lightbend, Inc.
James Townley Sr. Product Manager for Akka Platform , Lightbend, Inc.

New Name - Cloudflow is now Akka Data Pipelines

In a world of network uncertainty, dropped connections, and really bad WiFi reception, RSocket—a message-driven, binary protocol that standardizes communication in the cloud— makes a perfect ingress for Cloudflow, an open-source project that enables you to quickly develop, orchestrate, and operate distributed streaming applications on Kubernetes.

In this three-part blog series, we'll show you how to get started using RSocket inlets with Cloudflow. In this part, we'll discuss how to implement RSocket-based ingress for Cloudflow. In Part 2, we’ll work through how to implement some of RSocket’s more advanced features. And in Part 3, we'll examine another important feature of RSocket: Pluggable transport, which was part of the original RSocket design.

Accelerate Streaming Data Application Development with Cloudflow

Cloudflow allows you to build streaming applications as a set of small, composable components communicating over Kafka and wired together with schema-based contracts. This approach can significantly improve reuse and allows you to dramatically accelerate streaming application development.

A common challenge when building streaming applications is wiring all of the components together and testing them end-to-end before going into production. Cloudflow addresses this by allowing you to validate the connections between components and to run your application locally during development to avoid surprises during deployment.

Everything in Cloudflow is done in the context of an application, which represents a self-contained distributed system of data processing services connected together by data streams.

Cloudflow supports:

  • Development: By generating a lot of boilerplate code, it allows developers to focus on business logic.
  • Build: It offers all the tooling for going from business logic to a deployable Docker image.
  • Deploy: It features Kubernetes tooling to deploy your distributed application with a single command.
  • Operate: It also provides all the tools you need to get insights, observability, and lifecycle management for your distributed streaming application.

Connectivity of Cloudflow applications to external services is provided by inlets and outlets. Currently, Cloudflow supports HTTP and file inlets.

Although HTTP is widely used for these purposes, it only implements a request/response interaction model. While it has a lot of use cases, it was not designed for machine-to-machine communication. It is not uncommon for the applications to send data to each other without caring about the result of the operation (fire and forget) or stream data automatically when it becomes available (data streaming).

The second HTTP problem is the performance - a typical throughput of HTTP connections is a few hundred requests per second.

Both of the above issues led to the creation of RSocket that can be applied in various business scenarios.

Improve Streaming Data Communication with RSocket

As described in this article, RSocket is a:

...New, message-driven, binary protocol that standardizes the approach to communication in the cloud. It helps to resolve common application concerns in a consistent manner and has support for multiple languages (e.g., Java, JavaScript, and Python) and transport layers (TCP, WebSocket, Aeron).”

RSocket’s main characteristics are that it is1:

  • Message-driven: Interaction in RSocket is broken down into frames.
  • Performant: The frames are sent as a stream of bytes. It makes RSocket way more efficient than typical text-based protocols.
  • Reactive with Flow Control capabilities: The RSocket protocol fully embraces the principles stated in the Reactive Manifesto.

And RSocket supports the following communication styles:

  • The fire and forget style is designed to push the data from the sender to the receiver.
  • The request-response semantics mimic HTTP behavior.
  • The request-stream operation - the requester sends a single frame to the responder and gets back the stream (infinite) of data. Such an interaction method enables services to switch from the pull data to the push data strategy.
  • The request channel streams the data from the requester to the responder and the other way around using a single physical connection.

What’s more, RSocket provides excellent performance. This article provides a comparison of RSocket with gRPC over HTTP and shows that RSocket excels in all categories including queries per second, latency, CPU consumption, and scalability.

Better Together:

 

Now we will present the implementation and usage of RSocket inlets for Cloudflow. We have implemented several versions of RSocket inlets including:

 

  • Fire and forget using text (JSON-based) messages. This implementation is applicable for cases when directly transitioning from HTTP JSON-based communications. This messaging approach is applicable when some of the messages can be lost and provides the simplest RSocket implementation.
  • Fire and forget using binary data (Avro in this implementation2). This allows for a significantly smaller message size and consequently higher throughput. Additionally, usage of Avro allows for a stronger typing of the messages.
  • Streaming using binary (Avro) data. This is a little bit more complex, but a more reactive implementation. Here a client is opening a connection to the server, which replies that it is ready to accept a stream of data. After receiving this reply, a client can start streaming data to the server.

All of the above implement the abstract ServerStreamletLogic class provided by Cloudflow, specifically its run method to start a RSocket server. An example below, from binary fire and forget implementation (for the complete code see the GitHub repo), shows the actual code:

class RSocketBinaryStreamletLogic[out <: SpecificRecordBase](server: Server, schema: Schema, outlet: CodecOutlet[out])(implicit context: AkkaStreamletContext) extends ServerStreamletLogic(server) {

 override def run(): Unit = {
   RSocketServer.create(new RSocketBinaryAcceptorImpl(sinkRef(outlet), schema))
     .bind(TcpServerTransport.create("0.0.0.0", containerPort))
     .subscribe
   println(s"Bound RSocket server to port $containerPort")
 }
}

This implementation is using an acceptor class, which in this case looks like the following:

class RSocketBinaryAcceptorImpl[out <: SpecificRecordBase](writer: WritableSinkRef[out], schema: Schema) extends SocketAcceptor {

 val dataConverter = new DataConverter[out](schema)

 override def accept(setupPayload: ConnectionSetupPayload, reactiveSocket: RSocket): Mono[RSocket] 
= Mono.just(new AbstractRSocket() {
     override def fireAndForget(payload: Payload): Mono[Void] = {
       dataConverter.fromByteBuffer(payload.getData).map(writer.write)
       Mono.empty()
     }
   })
}

Here once a message is received, it is unmarshalled and then written to the writer, which in our case is writing to the streamlet outlet.

An implementation for the other two ingresses can be found here (for JSON fire and forget) and here (for binary streaming implementation). Please note that for streams in RSocket there is a request message that needs to be sent first to start the streaming.

The rest of the Cloudflow implementation is based on the Cloudflow introductory example. To send data to the Cloudflow application, we are providing an implementation of data provider (one per inlet implementation). For the binary fire and forget provider, the code looks as follows (for the complete code check out the GitHub repo):

class BinaryFireAndForget(host : String, port : Int) {

 def run(): Unit = {

   // Create client
   val socket = RSocketConnector
     .connectWith(TcpClientTransport.create(host, port))
     .block

   // Send messages
   while (true) {
     Thread.sleep(1000)
     val payload = DefaultPayload.create(generateData())
     socket.fireAndForget(payload).block
   }
 }

 // Generate data
 def generateData(): Array[Byte] = {
   SensorDataConverter.toBytes(SensorDataGenerator.random())
 }
}

Here we first create a socket connecting with our server and then send a server request every second.

An implementation for the other two data providers can be found here (for JSON fire and forget) and here (for binary streaming implementation).

A particular data provider can be configured using this application configuration and is controlled by the ProducerRunner class.

Measuring Throughput

For reference, our simple Cloudflow pipeline implementation is running on a three node Kubernetes cluster on AWS EKS using T3.large instances. The below image shows throughput for the binary fire and forget interaction:

Here we see not an RSocket throughput, but rather the amount of messages that are produced by ingress (our ingress in this case is reading messages from RSocket and immediately writes them to Kafka). This provides a good indication of RSocket throughput.

Based on this image, we see a pretty steady throughput of about 4,000 messages per second. The request stream implementation provides about the same throughput.

Conclusion

Even in this brief example, RSocket has shown its worth in terms of speed, accuracy, and as a potential ingress to Cloudflow. Its support for binary streams and its protocols can lead to smaller packets, greater speed, and less latency. It also significantly simplifies the usage of the schema-first approach used throughout Cloudflow implementations.

In Part 2 of this series, we'll showcase in greater detail more RSocket advanced features including backpressure and load balancing.

RSocket is supported by the Reactive Foundation, an open source software foundation that enables developers to build reactive applications on cloud native infrastructure. Learn more about the foundation’s charter, and feel free to contribute to RSocket on GitHub.


1 See also this presentation for main RSocket characteristics.

2 This can be easily extended to other marshallers, for example, Protobuf. Avro was chosen for ease of integration with the current cloudflow implementation, which at the moment of this writing is all based on Avro.