New Name - Cloudflow is now Akka Data Pipelines

In the first two parts of this blog series, we discussed how to implement RSocket-based ingress for Cloudflow and explored some RSocket advanced features including load balancing, resumability, and backpressure. In the final part of this series, we will examine another important feature of RSocket: Pluggable transport, which was part of the original RSocket design1.

The current implementation of RSocket in Java provides support for 3 transports: TCP, Websockets, and local (in JVM) transport. We will show you how to use different currently-supported RSocket transports while revealing how to implement a custom transport to further customize RSocket usage.

The GitHub project for this post can be found and cloned here.

Exploring Currently Supported RSocket Transports

Current RSocket Java APIs allow abstracting the networking away, so that you only program to a specification and don’t have to worry about the transport. This allows RSocket-based applications to switch to a new networking toolkit. In cases when you need to change where your application is deployed, this toolkit allows you to easily swap out transports without any application changes.

To achieve this goal, RSocket defines several interfaces:

public interface ServerTransport extends Transport

This interface defines a server contract for writing transports of RSocket and is used to bind a RSocket server to a specific transport. The second interface is:

public interface ClientTransport extends Transport 

This interface defines a client contract for writing transports of RSocket and is used to connect the RSocket client to a specific transport. The final interface looks like the following:

final class LocalDuplexConnection implements DuplexConnection

This represents a connection with the input/output that the protocol uses.

Based on these interfaces, the current RSocket Java implementation provides the following transport implementations:

  • TCP transport based on Netty
  • WebSocket transport based on Netty
  • Local (in JVM) transport

You have seen the usage of TCP and WebSocket transports in previous parts of this blog series. Now we will walk through a simple implementation of request/response using local transport (see GitHub for the complete code):

object RequestResponceLocal {

 def main(args: Array[String]): Unit = {

   // Server
   RSocketServer.create(SocketAcceptor.forRequestResponse((payload: Payload) => {
     Mono.just(payload)
   }))
     .payloadDecoder(PayloadDecoder.ZERO_COPY)
     .bind(LocalServerTransport.create("boris"))
     .subscribe

   // Client
   val socket = RSocketConnector.create()
     .payloadDecoder(PayloadDecoder.ZERO_COPY)
     .connect(LocalClientTransport.create("boris"))
     .block

   val n = 5
   val start = System.currentTimeMillis()
   1 to n foreach  {_ =>
     socket.requestResponse(DefaultPayload.create(“Hello”))
       .map((payload: Payload) => {
          println(s"Got reply ${payload.getDataUtf8}")
         payload.release()
         payload
       })
       .block
   }
   socket.dispose()
 }
}

We first create a RSocket server and client by binding the server to a Local server Transport and connecting the client to a Local client transport. Once they are created we are just sending several request/reply messages.

To have the same code using WebSocket transport, the only thing that we need to do is to change the binding and connection information as follows ( see GitHub for the complete code):

…………………...
// Server
RSocketServer.create(SocketAcceptor.forRequestResponse((payload: Payload) => {
 Mono.just(payload)
}))
 .payloadDecoder(PayloadDecoder.ZERO_COPY)
 .bind(WebsocketServerTransport.create("0.0.0.0", 7000))
 .subscribe

// Client
val socket = RSocketConnector.create()
 .payloadDecoder(PayloadDecoder.ZERO_COPY)
 .connect(WebsocketClientTransport.create("localhost", 7000))
 .block
…………………...

You can see that this fragment is nearly identical to the above with the difference that it binds/connects to the WebSocket transport (the rest of the example, not shown here, is identical).

A JMH-based performance comparison2 between the three transports for the local execution of request/replies with a message size of 14 kb are shown in the table below:

Transport TCP WebSockets Local
Execution time 41 μs 99 μs 1.3 μs

As expected, local execution provides best performance, with network base protocols being slower. Also as expected, TCP performance is better than WebSocket. For other considerations on choosing between TCP and WebSockets consider these characteristics:

  • A memory footprint that is significantly lower for TCP.
  • The ability to support a large amount of connections, which is lower for Web Sockets
  • CPU utilization, which is slightly higher for WebSockets.

On the other hand, you should consider whether you are working within an intranet boundary (where TCP is probably a better solution) or over the Internet, where you are limited to standard ports for web servers that generally speak HTTP (where web sockets really shine).

Custom RSocket Transport Over IPC

In the world of pods, containers, and virtualization where you are sure that your service instances run in isolation, it is common that they run on the same hardware (leveraging the sidecar pattern). In many cases, we follow the location transparency pattern and just send messages using socket-based communications, ignoring the fact that our instances are co-located. By doing this, we sacrifice communications performance. This may require a higher number of instances for handling the same load, or increasing latency, which is crucial when it comes to trading systems.

Achieving better performance, such as when a client and server are running on the same hardware, can be realized with native (IPC) communications. The main IPC mechanisms are:

  • Shared files - This is the most basic IPC mechanism. In the most simple case, one process (producer) creates and writes to a file, and another process (consumer) reads from this same file.
  • Shared memory (with semaphores) - This is very similar to the previous mechanism, but uses a separate memory region instead of files.
  • Pipes (named and unnamed) - An unnamed pipe has no backing file: the system maintains an in-memory buffer to transfer bytes from the writer to the reader. Once the writer and reader terminate, the buffer is reclaimed, so the unnamed pipe goes away. By contrast, a named pipe has a backing file. Pipes support strict FIFO.
  • Message queues - Behave in the same way as pipes, but are flexible enough that byte chunks can be retrieved out of FIFO order.
  • Sockets - IPC sockets rely upon the local system kernel to support communication; in particular, IPC sockets communicate using a local file as a socket address.
  • Signals - A signal interrupts an executing program and, in this sense, communicates with it. This allows for communicating between multiple processes by sending signals to each other.

Performance benchmarks for different IPC mechanisms can be found here.

The latest version of RSocket provides experimental support for transports like Aeron or Shared Memory. In the following section, we are going to look at implementation of IPC transports via Shared Memory.

In order to implement IPC over shared memory for RSocket in Java, we will use one of the most straightforward mechanisms: MappedByteBuffer, which provides access to a memory-mapped region of a file. Since in many operating systems RAM can be mounted as a file system, we can easily use the same MappedByteBuffer for communication over a file or shared memory region.

To implement messaging over MappedByteBuffer we need to do an initial negotiation between a client and a server. An interesting approach proposed in the OSS library Jocket is to handle an initial handshake via ordinary TCP, and then switch to a MappedByteBuffer, once both sides are aware of the communication file (see GitHub for the complete code):

public class Server implements Closeable {

…………………...

  public Server(int port) throws IOException {
    this(port, DEFAULT_MAX_PACKETS, DEFAULT_CAPACITY);
  }

  public Server(int port, int maxPackets, int capacity) throws IOException {
    
    …………………...
    srv = new java.net.ServerSocket();

    srv.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), port));
  }

 …………………...

  public Socket accept() throws IOException {
    while (true) {

      if (closed) throw new IllegalStateException("Closed");

      java.net.Socket s = srv.accept();
      // allows to wakeup if client never does the handshake
      s.setSoTimeout(1000);

      DataInputStream in = new DataInputStream(s.getInputStream());
      DataOutputStream out = new DataOutputStream(s.getOutputStream());
      out.writeInt(HANDSHAKE_PACKET);
      out.flush();

      int magic = 0;
      try {
        magic = in.readInt();
      } catch (SocketTimeoutException timeout) {
      }

      if (magic != HANDSHAKE_PACKET) {
        s.close();
        continue;
      }

     
      MappedFileSupport fw = new MappedFileSupport(maxPackets, capacity);
      MappedFileSupport fr = new MappedFileSupport(maxPackets, capacity);

      out.writeUTF(fw.getPath());
      out.writeUTF(fr.getPath());
      out.flush();

      in.readInt();
      fw.deleteFile();
      fr.deleteFile();
      s.close();

      return new Socket(fr.reader(), fw.writer());
    }
  }

As we can see from the above code, the server accepts all new incoming connections and does an exchange with the client provided a handshake number and expects to receive the same handshake back. Once that happens, the server creates a temporary file which can be created at ‘dev/shm’ or any other location with mounted RAM memory and sends the path to those files back to a client. In turn, the client has a similar logic which allows handling files and open access to the same memory region (see GitHub for the complete code):

public class Socket {

   ……………………

  public Socket(int port) throws IOException {
    java.net.Socket s = new java.net.Socket(InetAddress.getLoopbackAddress(), port);
    // allows to wakeup if server never does the handshake
    s.setSoTimeout(5000);

    DataOutputStream out = new DataOutputStream(s.getOutputStream());
    out.writeInt(Server.HANDSHAKE_PACKET);
    out.flush();

    DataInputStream in = new DataInputStream(s.getInputStream());
    int magic = 0;
    try {
      magic = in.readInt();
    } catch (SocketTimeoutException timeout) {
    }

    if (magic != Server.HANDSHAKE_PACKET) {
      s.close();
      throw new IOException("Server does not support RSocket Shared Memory transport");
    }

    File r = new File(in.readUTF());
    File w = new File(in.readUTF());

    MappedFileSupport jfr = new MappedFileSupport(r);
    MappedFileSupport jfw = new MappedFileSupport(w);

    jfr.deleteFile();
    jfw.deleteFile();

    out.writeInt(0);
    s.close();

    this.reader = jfr.reader();
    this.writer = jfw.writer();

     …………………...
  }

  ……………………

}

Here the server sends back paths to two different files. That is done to separate the reader and writer sides and simplify logic required for message exchange or the creation and management of MappedByteBuffer we are using MappedFileSupport. The APIs for sending messages via the underlying MappedByteBuffer look like the following (see GitHub here and here for the complete code):

public class ReaderBuffer {

  public void close() { … }

  public boolean isClosed() { … }

  public ByteBuf read() { … }

  public void advance(int packetsCount) { … }
}

public class WriterBuffer {

  public int claim(int size) { … }

  public void write(ByteBuf byteBuf) { … }

  public void commit() { … }

  public void close() { ... }

  public boolean isClosed() { … }
}

In order to send messages using these APIs, a sender has to first claim space, required for the message. If space is acquired successfully (returned result is 1), the sender can write a ByteBuf. Once all the messages are written, the commit can be invoked to update the writer index.

To receive messages, the receiver can use read in order to get the latest ByteBuf written into the MappedByteBuf, or null, if there is nothing new.

To ensure correctness in cases when there is no available space in the MappedByteBuf or there are no new messages, both sender and receiver have to apply a wait strategy. The simplest non-blocking strategy can be rescheduling of the read/write operation on the shared Scheduler, so that other senders and receivers can execute a read/write without allocation of the new threads. This logic is hidden in the FluxReceive and MonoSendMany classes which you can find in the following GitHub repo. Examples of different RSocket interaction styles and implementations can be found here.

The benchmarks for RSocket with SharedMemoryTransports are presented below3:

Storage Type Disk RAM
Execution time 20.778 μs 15.520 μs

Custom RSocket Transport Using Kafka

At the first glance, using Kafka for RSocket does not make a lot of sense. Both implement messaging, and both have support for streaming. However, while this is true, the one caveat is that they do provide very different quality of service. While RSocket over existing transports (TCP and web sockets) requires temporal coupling between consumers and producers4, Kafka-based transport is inherently decoupled, which opens up an opportunity to effectively deal with network interruption and long running execution. Implementing Kafka transport for RSocket allows you to use the same implementation and then choose transport during deployment, based on the deployment environment.

The implementation is based on the Reactive API for Kafka, which enables publishing to Kafka and consuming from Kafka using functional APIs with non-blocking back-pressure and very low overheads. Additionally, similar to the RSocket in Java implementation, Reactive APIs for the Kafka implementation is based on Project Reactor, which significantly simplifies overall implementation.

When designing this implementation, we decided to base bidirectional client/server implementation on a pair of topics. Only one topic is specified when defining transport—or the topic on which a server is receiving messages. The second topic name is auto generated by adding -reply postfix to the topic specified for the transport creation5.

For the server transport, the main topic is used for incoming messages, while the -reply topic is used for replies (for client transport the roles of topics are reversed.) Additionally, topic names are used to generate a Kafka consumer group ID6, thus supporting load balanced Kafka consumers/producers. This design assumes that—functionally—identical clients can communicate with the server (potentially load-balanced) over the same topic pairs (single connection), while different clients use different connections (topic pairs) communicating to the same server.

Implementation consists of three main classes. KafkaDuplexConnection implements a full-duplex, Kafka-based connection based on two internal classes (see GitHub for the complete code):

private static class KafkaProducer{

   private final KafkaSender sender;
   private final String topic;

   public KafkaProducer(String bootstrapServers, String topic, String prefix){

     this.topic = topic;
     Map props = new HashMap<>();
     ……………………………….
     SenderOptions senderOptions = 
SenderOptions.create(props).maxInFlight(128);
     sender = KafkaSender.create(senderOptions);
   }

   public void send(Publisher frames){
     sender.send(Flux.from(frames)
             .map(frame -> {
               byte[] bytes = new byte[frame.readableBytes()];
               frame.readBytes(bytes);
               return SenderRecord.create(new ProducerRecord<>(topic,null,bytes), 1);
             }))
             .subscribe();
   }

   public void close() {
     sender.close();
   }
 }

This class implements a Kafka producer leveraging Reactive Kafka APIs. All the RSocket messages are sent as byte array values. The key here is null, which means round robin distribution for multi-partitioned topics (see GitHub for the complete code):

private static class KafkaConsumer{

   private final ReceiverOptions receiverOptions;
   private Flux> kafkaFlux;

   public KafkaConsumer(String bootstrapServers, String topic, String prefix){

     Map props = new HashMap<>();
     …………………………...
     receiverOptions = ReceiverOptions.create(props);

     ReceiverOptions options = receiverOptions.subscription(Collections.singleton(topic))
             .commitInterval(Duration.ZERO)
             .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
             .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
     kafkaFlux = KafkaReceiver.create(options).receive();
   }

   public Flux getKafkaFlux() {
     return kafkaFlux.map(receiverRecord -> copiedBuffer(receiverRecord.value()));
   }
 }

This class implements a Kafka consumer leveraging Reactive Kafka APIs. All the RSocket messages are received as byte array values, which are converted into ByteBuf used internally by RSocket.

These two support classes are used to implement KafkaDuplexConnection (see GitHub for the complete code):

final class KafkaDuplexConnection implements DuplexConnection {

 private String name;                                        // Name of the input server
 private KafkaConsumer consumer;                 // Consumer
 private KafkaProducer producer;                     // Producer

 private final ByteBufAllocator allocator;
 private final MonoProcessor onClose;

 static Mono create(String bootstrapServers, String name, 
ByteBufAllocator allocator) {
    final String topicName = UUID.randomUUID().toString();
    return new KafkaProducer(bootstrapServers, name, "client")
            .send(Mono.fromSupplier(() -> {
              final ByteBuf initFrame = allocator.buffer();
              initFrame.writeCharSequence(topicName, CharsetUtil.UTF_8);
              return initFrame;
            }))
            .then(
                    new KafkaConsumer(bootstrapServers, name + "-reply", "client").getKafkaFlux()
                    .filter(bb -> {
                      final String topicNameConfirmation = bb.toString(CharsetUtil.UTF_8);
                      bb.release();
                      return topicNameConfirmation.equals(topicName);
                    })
                    .next()
                    .then(Mono.fromSupplier(() -> 
new KafkaDuplexConnection(bootstrapServers, topicName, false, allocator)))
            );
  }

static Flux accept(String bootstrapServers, String name, 
ByteBufAllocator allocator) {
        final KafkaProducer producer = new KafkaProducer(bootstrapServers, name + "-reply", "server");
        return new KafkaConsumer(bootstrapServers, name, "server").getKafkaFlux()
                        .concatMap(bb -> {
                            final String topicName = bb.toString(CharsetUtil.UTF_8);
                            return producer.send(Mono.just(bb))
                                    .then(Mono.fromSupplier(() -> 
new KafkaDuplexConnection(bootstrapServers, topicName, true, allocator)));
                        });
    }

   KafkaDuplexConnection(String bootstrapServers, String name, boolean server, 
ByteBufAllocator allocator){
    this.allocator = Objects.requireNonNull(allocator, "allocator must not be null");
    if (server) {
      producer = new KafkaProducer(bootstrapServers, name + "-reply", "server");
      consumer = new KafkaConsumer(bootstrapServers, name, "server");
    }
    else{
      producer = new KafkaProducer(bootstrapServers, name, "client");
      consumer = new KafkaConsumer(bootstrapServers, name + "-reply", "client");
    }
  }
……………………………….

This class creates a producer/consumer pair, which are leveraged for implementation of the required methods of the class.

This connection is used for implementation of client and server transports. The client transport implementation looks like the following (see GitHub for the complete code):

public class KafkaClientTransport  implements ClientTransport{

    private String bootstrapServers;
    private String name;

    private KafkaClientTransport(String bootstrapServers, String name) {
        this.bootstrapServers = bootstrapServers;
        this.name = name;
    }

   public static KafkaClientTransport create(String bootstrapServers, String name) {
        Objects.requireNonNull(bootstrapServers, "bootstrap servers must not be null");
        Objects.requireNonNull(name, "name must not be null");

        return new KafkaClientTransport(bootstrapServers, name);
    }

    @Override
    public Mono connect() {
        return KafkaDuplexConnection.create(bootstrapServers, name, ByteBufAllocator.DEFAULT);
    }
}

The most interesting method here is connect, creating a Kafka duplex connection. The server transport implementation can be found in the GitHub repo.

Examples of different RSocket interaction styles and implementations can be found here. These examples leverage an embedded Kafka server for local testing.

Conclusion

We hope you enjoyed this three-part series. We started off by showing you how to implement RSocket-based ingress for Cloudflow. From there, we worked through how to use some of RSocket’s more advanced features.

In this last part, we revealed how to use multi-transport support in a RSocket implementation and introduced new custom transports. We learned that different transports provide different quality of service characteristics, including performance and reliability. Multi-transport support allows us to write implementation once and then configure it for a different quality of service by choosing the transport best suited for a required service level.

As you can see, now’s an ideal time to get started with RSocket. It recently achieved its milestone 1.0.1 release. It’s also supported by the Reactive Foundation, an open source software foundation that enables developers to build Reactive applications on Cloud Native infrastructure. Learn more about the foundation’s charter, and feel free to contribute to RSocket on GitHub.


1 RSocket was introduced supporting multiple transports including TCP, WebSocket, and Aeron

2 This benchmark is performed on macOS with a 2.9 GHz, 6-Core Intel Core i9 CPU. The result may vary significantly depending on the hardware and OS it is run on, so the presented table is showing reference numbers.

3 This benchmark is performed on the macOS with 2.9 GHz, 6-Core Intel Core i9 CPU. The result may vary significantly depending on the hardware and OS it is run on, so the presented table is showing reference numbers. Also, please note that SharedMemoryTransport is experimental and numbers can change overtime

4 See resumability in the previous post of this series for some support for such decoupling, which requires local memory.

5 See below.

6 See here for more details on Kafka consumer group id

Share



Filter by Tag