New Name - Cloudflow is now Akka Data Pipelines

In Part 1 of this three-part blog series, we discussed how to implement RSocket-based ingress for Cloudflow. In this part, we'll explore some advanced features of the RSocket implementations including load balancing, resumability, and backpressure. We will also sketch out simple Akka Streams Source and Sink implementations around RSocket fire and forget interactions. In Part 3, we'll examine another important feature of RSocket: Pluggable transport, which was part of the original RSocket design.

The GitHub project for this post can be found and cloned here. This part wouldn't have been possible without Boris Lublinsky, who served as the main author on Part 1 and provided a similar level of support on this post.

Load Balancing with RSockets

In a distributed reactive system, being able to divide requests amongst many servers is vital to the success of a project. RSocket1 enables your system to be elastic to meet changing demands and resilient to failure scenarios on the server/consumer. If the requester knows the IP addresses of each server2, it can choose the responder instance to which it sends the request or opens the connection.

On the code level, the implementation of the client load balancing relies on the LoadBalancedRSocketMono object, which works as a bag of available RSocket instances provided by RSocket supplier. Moreover, it calculates statistics for each RSocket, so that it is able to estimate the load of each instance and based on that choose the one with the best performance at the given point of time.

To ensure that messages are properly balanced, the RSocket algorithm takes into account multiple parameters like latency, number of maintained connections, and number of pending requests. The health of each RSocket is reflected by the availability parameter. This parameter takes values from zero to one, where zero indicates that the given instance cannot handle any requests and one is assigned to a fully operational socket.

The code using fire and forget interactions looks like the following (for the complete code check out the GitHub repo):

object Multiserver {

 private val logger = LoggerFactory.getLogger(this.getClass) // Logger
 private val ports = List(7000, 7001, 7002) // Ports
 private var hits = Map(7000 -> 0, 7001 -> 0, 7002 -> 0) // Stats map

 def main(args: Array[String]): Unit = {

   // Create servers
   ports.foreach(port =>
       RSocketServer.create(SocketAcceptor.forFireAndForget((payload: Payload) => {
   	// Peg request to the server
   	addHit(port)
   	payload.release()
   	Mono.empty()
       }))
       .payloadDecoder(PayloadDecoder.ZERO_COPY)
       .bind(TcpServerTransport.create("0.0.0.0", port))
       .block)

   // Create socket suppliers (clients)
   val rsocketSuppliers = ports.map(port =>
     new RSocketSupplier(() => Mono.just(RSocketConnector.create()
       .payloadDecoder(PayloadDecoder.ZERO_COPY)
       .connect(TcpClientTransport.create("0.0.0.0", port)).block()))).asJava

   // Create load balancer
   val balancer = LoadBalancedRSocketMono.create(Flux.create(
     (sink: FluxSink[util.Collection[RSocketSupplier]]) => sink.next(rsocketSuppliers)))

   // Send messages
   1 to 300 foreach { _ =>
     balancer.doOnNext(
       socket => socket.fireAndForget(ByteBufPayload.create("Hello world")).block()).block()
   }

   // Wait to make sure that process completes
   Thread.sleep(3000)

   // Print execution statistics
   println("Execution statistics")
   hits.foreach(entry => println(s"Port : ${entry._1} -> count ${entry._2}"))
 }

 // Collect statistics
 def addHit(port: Int): Unit = this.synchronized { hits += port -> (hits(port) + 1) }
}

Here we start three servers listening on different ports and three rsocket Suppliers each connected to a corresponding server. Finally, we create a LoadBalancedRSocketMono class that manages these suppliers. By sending requests to the balancer we allow it to manage which server the request is sent to.

For example, looking at the execution statistics below:

Execution statistics
Port : 7000 -> count 108
Port : 7001 -> count 94
Port : 7002 -> count 98

We can see that the balancer is doing a decent job distributing requests across the servers. Here we have shown an example of using load balancing for fire and forget, but the same approach works for any interaction style.

When working with an elastic system, there will need to be some form of communication to allow the clients to see all the servers and connections as they may change over time— and are not provided out-of-the-box by RSocket. We are assuming a static setup here with a well known set of addresses.

We have shown a load balancing implementation for fire and forget interactions, but it works the same way for all other interaction patterns outside of request stream. The load balancing in this interaction is done on the level of initial request, not the stream.

Resumability with RSockets

In the case of IoT devices located in areas without access to a stable, reliable internet connection, one of the biggest concerns is connection stability. RSocket’s solution, in this case, is resumability support. The code below (for the complete code check out the GitHub repo) shows resumable request stream implementation:

object ResumableStreamingClient {

 private val logger = LoggerFactory.getLogger(this.getClass)
 val RESUME_SESSION_DURATION: Duration = Duration.ofSeconds(60)

 def main(args: Array[String]): Unit = {

   // Resume configuration
   val resume =
     new Resume()
       .sessionDuration(RESUME_SESSION_DURATION)
       .retry(
         Retry.fixedDelay(Long.MaxValue, Duration.ofSeconds(1))
           .doBeforeRetry(s => logger.info("Disconnected. Trying to resume...")));

   // Create a server
   RSocketServer.create(SocketAcceptor.forRequestStream((payload: Payload) => {
       // Log request
       logger.info(s"Received 'request stream' request with payload: [${payload.getDataUtf8}] ")
       payload.release()
       // return stream
       Flux.interval(Duration.ofMillis(500))
         .map(t => ByteBufPayload.create(t.toString()))
     }))
     .resume(resume)
     .payloadDecoder(PayloadDecoder.ZERO_COPY)
     .bind(TcpServerTransport.create("0.0.0.0", 7000)).subscribe

   // Create client
   val socket = RSocketConnector.create()
     .resume(resume)
     .connect(TcpClientTransport.create("0.0.0.0", 7001))
     .block

   // Send messages
   socket
     .requestStream(ByteBufPayload.create("Hello"))
     .subscribe((value: Payload) =>  logger.info(s"New stream element ${value.getDataUtf8}"))

   // Wait for completion
   Thread.sleep(100000)
   socket.dispose();
 }
}

The implementation starts with the creation of a Resume object—the holder of configuration settings for the RSocket Resume capability. Here we are setting only RESUME_SESSION_DURATION—maximum time for a client to keep trying to reconnect— and a reconnection logic—how often to retry. In addition, this class also allows overwriting a default storage for buffering frames that are required to be sent again to resume— InMemoryResumableFramesStore.

Once this object is created, it is used to add resumability to both client and server to provide resumability. The rest of the code is a “normal” request-stream implementation, except, if you look carefully, you will notice that a server is connected to port 7000, while a client is connected to port 7001. This is done here to enable usage of socat3,a utility for data transfer between two addresses to emulate loss of connectivity. To ensure that a client can connect to a server run the following command before starting an example:

socat -d TCP-LISTEN:7001,fork,reuseaddr TCP:localhost:7000

Now we can run the example. If you stop socat and then restart it, you should see something similar to the below:

2020-05-17 09:51:34,220 INFO  [ResumableStreamingClient$] - New stream element 12
2020-05-17 09:51:34,720 INFO  [ResumableStreamingClient$] - New stream element 13
2020-05-17 09:51:35,221 INFO  [ResumableStreamingClient$] - New stream element 14
2020-05-17 09:51:35,721 INFO  [ResumableStreamingClient$] - New stream element 15
2020-05-17 09:51:36,232 INFO  [ResumableStreamingClient$] - Disconnected. Trying to resume...
2020-05-17 09:51:37,243 INFO  [ResumableStreamingClient$] - Disconnected. Trying to resume...
2020-05-17 09:51:38,245 INFO  [ResumableStreamingClient$] - Disconnected. Trying to resume...
2020-05-17 09:51:39,252 INFO  [ResumableStreamingClient$] - Disconnected. Trying to resume...
2020-05-17 09:51:40,257 INFO  [ResumableStreamingClient$] - Disconnected. Trying to resume...
2020-05-17 09:51:41,278 INFO  [ResumableStreamingClient$] - New stream element 16
2020-05-17 09:51:41,279 INFO  [ResumableStreamingClient$] - New stream element 17
2020-05-17 09:51:41,279 INFO  [ResumableStreamingClient$] - New stream element 18
2020-05-17 09:51:41,279 INFO  [ResumableStreamingClient$] - New stream element 19

Here we can see that after connection comes back, the elements will be delivered.

Backpressure with RSockets

When working with streaming data, backpressure is vital as it allows the receiver to control the flow of data and prevent itself from being overloaded. Commonly referred to as the slow consumer problem, it has caused the downfall of many systems. But backpressure is the simple answer, and both RSocket and Cloudflow support this.

The Cloudflow implementation shown below is internally completely Reactive, including backpressure support. But RSocket support for backpressure makes this implementation Reactive end-to-end. When talking about backpressure in RSocket we need to distinguish between the two RSocket interaction patterns described here—request-stream and fire and forget.

Backpressure in the Request-Stream Interactions

For the request-stream interaction, backpressure can be implemented by overriding the BaseSubscriber class that provides access to all the major steps of stream processing, allowing control over the speed of streaming. The code demonstrating usage of backpressure looks like the following (for the complete code check out the GitHub repo):

object StreamingClient {

 private val logger = LoggerFactory.getLogger(this.getClass)

 def main(args: Array[String]): Unit = {
   // Ensure clean disposal
   Hooks.onErrorDropped((t: Throwable) => {})

   // Create a server
   RSocketServer.create(SocketAcceptor.forRequestStream((payload: Payload) => {
       Logger.info(s"Received 'request stream' request with payload: [${payload.getDataUtf8}] ")
       payload.release()
       Flux.generate[Payload, Int](() => 0, (state: Int, sink: SynchronousSink[Payload]) => {
          Thread.sleep(100)
           sink.next(ByteBufPayload.create("Interval: " + state))
           state + 1
       })
     }))
     .payloadDecoder(PayloadDecoder.ZERO_COPY)
     .bind(TcpServerTransport.create("0.0.0.0", 7000)).subscribe

   // create a client
   val socket = RSocketConnector
     .connectWith(TcpClientTransport.create("0.0.0.0", 7000))
     .block

   // Send messages
   socket
     .requestStream(ByteBufPayload.create("Hello"))
     .subscribe(new BaseSubscriber[Payload] {
       // Backpressure subscriber
       private val log = LoggerFactory.getLogger(this.getClass)
       val PROCESS_BATCH = 5l
       var receivedItems = 0
       // Start subscription
       override def hookOnSubscribe(subscription: Subscription): Unit = {
         subscription.request(PROCESS_BATCH)
       }
       // Processing request
       override def hookOnNext(value: Payload): Unit = {
         log.info(s"New stream element ${value.getDataUtf8}")
         value.release()
         receivedItems += 1
         if (receivedItems % PROCESS_BATCH == 0) 
           request(PROCESS_BATCH)
       }
       // Invoked on stream completion
       override def hookOnComplete(): Unit = log.info("Completing subscription")
       // Invoked on stream error
       override def hookOnError(throwable: Throwable): Unit = log.error(s"Stream subscription error [$throwable]")
       // Invoked on stream cancelation
       override def hookOnCancel(): Unit = log.info("Subscription canceled")
     })

   // Wait for completion
   Thread.sleep(3000)

   socket.dispose();
 }

Here we first create a server and a consumer. We then send a request from a client to a server and a server starts streaming data back to a client. A client is using a custom implementation BaseSubscriber to process streaming data. This implementation starts with a constructor, which is limiting the amount of data elements submitted as a single batch to PROCESS_BATCH elements. Processing of a single request is done by hookOnNext method. Here we are processing each request up to the batch size. Once the batch is processed completely, a new batch is requested.

Picking the Right Value for PROCESS_BATCH

The value of PROCESS_BATCH can significantly impact an overall implementation’s throughput. Making this number too small will result in too many messages sent back to a producer. On the other hand, making it too large can overwhelm a consumer. Ideally, in a production environment, you probably need an in-memory buffer capable of storing the complete batch of the incoming messages and then consuming them asynchronously from this buffer.

This implementation ensures that the receiver—the client in this case—will never be overwhelmed by too many consumer requests.

This solves the problem of a consumer never being overwhelmed by a producer, but also requires some synchronization mechanism for a producer to avoid overwhelming a consumer with messages. A Generate method on a Flux object provides such capabilities. Writing to the sink respects backpressure, thus solving coordination with backpressure.

Backpressure in the Fire and Forget Interactions

Technically, backpressure—as described in the previous example—applies only to the streams. To incorporate some of the backpressure functionality for other interaction styles, such as request/response or fire and forget, RSocket introduces leases. As defined in the specifications, a lease allows control over the number of individual requests of any type that a Requester may send in a given time period.

A simple example of using lease for simulating backpressure is presented below (for the complete code check out the GitHub repo):

object FireAndForgetWithLeaseClient {

 private val blockingQueue = new LinkedBlockingDeque[String]()
 private val SERVER_TAG = "server"
 private val CLIENT_TAG = "client"

 def main(args: Array[String]): Unit = {

   // Create server
   val server = RSocketServer.create(SocketAcceptor.forFireAndForget((payload: Payload) => {
       // Log message
       blockingQueue.add(payload.getDataUtf8)
       payload.release()
       Mono.empty()
    }))
     .lease(new Supplier[Leases[_]] {override def get(): Leases[_] = Leases.create().sender(new LeaseCalculator(SERVER_TAG, blockingQueue))})
     .payloadDecoder(PayloadDecoder.ZERO_COPY)
     .bindNow(TcpServerTransport.create("0.0.0.0", 7000))

   // Lease notification receiver
   val receiver = new LeaseReceiver(CLIENT_TAG)

   // Create client
   val clientRSocket = RSocketConnector.create
     .lease(new Supplier[Leases[_]] {override def get(): Leases[_] = Leases.create.receiver(receiver)})
     // Enable Zero Copy
     .payloadDecoder(PayloadDecoder.ZERO_COPY)
     .connect(TcpClientTransport.create(server.address)).block

   // Create queue drainer
   new Thread(() => while(true)
       blockingQueue.take()
   ).start()

   // Send messages
   Flux.generate(() => 0L, (state: Long, sink: SynchronousSink[Long]) => {
       sink.next(state)
       state + 1
   })
     // Wait for the  Lease arrival
     .delaySubscription(receiver.notifyWhenNewLease().`then`())
     .concatMap((tick : Long) => {
         println(s"Sending $tick")
         Mono.defer(() => clientRSocket.fireAndForget(ByteBufPayload.create("" + tick)))
           .retryWhen(
             Retry.indefinitely()
               .filter((t : Throwable) => t.isInstanceOf[MissingLeaseException])
               .doBeforeRetryAsync(rs => 
                 receiver.notifyWhenNewLease().`then`()
               ))
       }
     )
     .blockLast()

   clientRSocket.onClose.block
   server.dispose()
 }
}

In this example, once the request arrives we put it in the internal queue and process it asynchronously in a separate thread (i.e., queue drainer). The state of the queue is also used for lease control. During server creation we also create a lease sender for the server-class LeaseCalculator (described below). We then create a LeaseReceiver (described below), which serves as a listener for server lease messages at the client. Finally, we create a client, specifying a lease receiver in leases.

Once all the required objects have been created we start sending fire and forget messages to the server. However, before we send any messages, we wait for the first lease message to arrive. Once we send all of the messages allowed by the lease we will wait in the retry loop until the new lease arrives.

For the lease calculation we are using a fairly simple implementation:

class LeaseCalculator(tag : String, queue : LinkedBlockingDeque[String]) extends Function[Optional[LeaseStats], Flux[Lease]] {

 val leaseDuration = Duration.ofSeconds(1)
 val maxQueueDepth = 50

 override def apply(leaseStats: Optional[LeaseStats]): Flux[Lease] = {
   val stats = leaseStats.isPresent() match {
     case true => "present"
     case _ => "absent"
   }

   Flux.interval(Duration.ZERO, leaseDuration)
     .handle((_, sink : SynchronousSink[Lease]) => {
       (maxQueueDepth - queue.size()) match {
         case requests if (requests > 0) => sink.next(Lease.create(leaseDuration.toMillis.toInt, requests))
         case _ =>
       }
     })
 }
}

Here we update lease every second4 based on the current depth of the queue. A more complex algorithm can be used to better manage leases.

A final part of this implementation is a lease listener:

class LeaseReceiver(tag : String) extends Consumer[Flux[Lease]] {

 val lastLeaseReplay: ReplayProcessor[Lease] = ReplayProcessor.cacheLast[Lease]

 override def accept(receivedLeases: Flux[Lease]): Unit = this.synchronized{
   receivedLeases
     .subscribe((l:Lease) => lastLeaseReplay.onNext(l))
 }

 def notifyWhenNewLease(): Mono[Lease] = lastLeaseReplay.filter(l => l.isValid()).next()

}

This class listens for new lease requests and is used to notify the writer when a new lease arrives.

Akka Streams Support

In the first half of this post we showed an Akka Streams-based implementation of RSocket ingress. And while this implementation works, it would be nice to make it follow the canonical Akka Streams pattern:

Source => Transform => Sink

In order to be able to do this, it is necessary to implement custom Akka Streams Source and Sink for RSocket. Here we will present a sketch of such implementations5 for fire and forget interactions.

Akka Streams Source for RSocket

A custom Akka Streams source implementation is done using custom Graph Stage. The main complication in the case of RSocket is that we need some kind of coordination between the arrival of a RSocket message and data requests from the source. We decided to use an internal queue for this purpose. Overall implementation of the Akka Streams Source can be done as follows (for the complete code check out the GitHub repo):

class RSocketSource(port: Int, host: String = "0.0.0.0") extends GraphStage[SourceShape[Array[Byte]]] {

 val out: akka.stream.Outlet[Array[Byte]] = Outlet("RsocketSourceOut")
 private val blockingQueue = new LinkedBlockingDeque[Array[Byte]]()

 override val shape: SourceShape[Array[Byte]] = SourceShape(out)

 override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
   override def preStart(): Unit = {
     RSocketServer.create(SocketAcceptor.forFireAndForget((payload: Payload) ⇒ {
       val buffer = payload.getData
       val data = new Array[Byte](buffer.remaining())
       buffer.get(data)
       blockingQueue.add(data)
       payload.release()
       Mono.empty()
     }))
       .payloadDecoder(PayloadDecoder.ZERO_COPY)
       .bind(TcpServerTransport.create(host, port))
       .subscribe
   }

   setHandler(out, new OutHandler {
     override def onPull(): Unit = {
       push(out, blockingQueue.take())
     }
   })
 }
}

In the preStart method we create a RSocket client. The acceptor for the client is fairly straightforward—once the message is received, it is immediately placed on the queue for processing. When Akka Streams is polling for the new message, we are just getting it from the queue. Note that this is a blocking call (the take method will block if the queue is empty) providing synchronization with RSocket message arrivals.

With this in place, we can reimplement the RSocket Ingress streamlet logic described in the first half of the post (for the complete code check out the GitHub repo):

class RSocketSourceStreamletLogic[out <: SpecificRecordBase]
 (server: cloudflow.akkastream.Server, schema: Schema, outlet: CodecOutlet[out])
 (implicit context: AkkaStreamletContext) extends ServerStreamletLogic(server) {

 val dataConverter = new DataConverter[out](schema) // Marshaller

 override def run(): Unit = {

   // Process flow
   akka.stream.scaladsl.Source.fromGraph(new RSocketSource(containerPort))
     .map(dataConverter.fromBytes)
     .collect { case (Some(data)) ⇒ data }
     .runWith(plainSink(outlet))
   ()
 }
}

Akka Streams Sink for RSocket

Similar to the Akka Streams Source, custom Akka Streams Sink implementation is based on a custom Graph Stage. The simple implementation looks like the following (for the complete code check out the GitHub repo):

class RSocketFFSink(port: Int, host: String = "0.0.0.0") extends GraphStage[SinkShape[Array[Byte]]] {
 val in: Inlet[Array[Byte]] = Inlet("RsocketSinkInput")
 var socket: RSocket = null

 override def shape: SinkShape[Array[Byte]] = SinkShape(in)

 override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
   new GraphStageLogic(shape) {
     // ask for an element as soon as you start
     override def preStart(): Unit = {
       socket = RSocketConnector
         .connectWith(TcpClientTransport.create(host, port))
         .block
     }

     setHandler(in, new InHandler {
       override def onPush(): Unit = {
         val payload = grab(in)
         socket.fireAndForget(DefaultPayload.create(payload)).block
         pull(in)
       }
     })
   }
}

Both the Source and Sink implementations described in this section do not provide backpressure support. However, this can be easily added using a lease-based approach to backpressure implementation described earlier in this post.

Conclusion

In this part, we showed that RSocket and Cloudflow work well together for fire and forget, request-response and request-stream interactions. We also demonstrated high throughput, load balancing, resumability, backpressure, and Akka Streams integration.

Elasticity, resiliency and message-driven are key to achieving responsiveness and maintaining a Reactive System (see the Reactive Manifesto for additional details). RSocket, Cloudflow, and Akka Streams are great tools to help us achieve this goal. Using RSocket load balancing and Cloudflow’s scalability support allows us to maintain elastic and resilient capabilities. The RSocket message protocol and its support for backpressure allows us to control the message-driven nature of the system. In addition, resumability support adds resiliency to communications.

In Part 3 of this three-part series, we'll examine another important feature of RSocket: Pluggable transport, which was part of the original RSocket design.

Next Steps

RSocket has proven to be a viable ingress between services and Cloudflow. Going forward, it will be interesting to see if its cross-functional/resumable nature makes it a candidate for telemetry data collection from IoT or mobile devices.

The Alpakka project is an open source initiative to implement stream-aware and Reactive integration pipelines for Java and Scala. Adding an RSocket connector would be a great way to help make this tool easily accessible and widely available within the Akka ecosystem.

Now’s a perfect time to get started with RSocket as it just achieved its milestone 1.0.0 release. It’s also supported by the Reactive Foundation, an open source software foundation that enables developers to build Reactive applications on Cloud Native infrastructure. Learn more about the foundation’s charter, and feel free to contribute to RSocket on GitHub.


1 This is not part of the spec but rather a RSocket Java-specific implementation.

2 On Kubernetes you can use headless service to obtain such information.

3 To install socat on Mac run brew install socat

4 This parameter is similar to the PROCESS_BATCH parameter used in backpressure implementation and is subject to the same considerations.

5 This is just a demonstration of approach, not a real implementation

Share



Comments


Filter by Tag