FREE Training Courses — 100+ Hours of Akka and Reactive Architecture Expertise Get Started
Support
akka akka-streams

Get Productive with Akka Streams

Janik Dotzel Professional Services, Lightbend, Inc.

Intro

Akka Streams is a powerful and flexible toolkit for building highly concurrent, distributed and fault-tolerant event-driven applications. It provides a high-level API geared towards efficient processing. This article is directed to developers that have attended the Akka Streams Course from the Akkademy or finished the Quickstart Guide. It is designed to be a follow-up read where you can learn the concepts and the best practices for production-ready Streaming applications. I hope you enjoy this guide and that it will get you up to speed quickly.

Tips and Tricks

Operators

In Akka Streams, operators are the functional components that perform specific data processing tasks within your streaming pipeline. They enable you to transform, filter, aggregate, or manipulate the data as it flows through an Akka stream. Operators are combined with Sources, Flows and Sinks to create a comprehensive data processing pipeline.

  • Source: An operator emitting data elements to exactly one output.
  • Flow: An operator transforming data elements with exactly one input and output.
  • Sink: An operator receiving data elements from exactly one input.

For a complete overview of the built-in processing operators, you can look at the operator index (This page provides you with examples and is a resource I use a lot.)

There are a lot of operators that have valuable functions and some operators that you are going to need more often. I want to explain some and show you when you should use them.

  • Logging with Log
  • Transforming elements with Map
  • Transforming elements with MapConcat
  • Emitting multiple elements from one element with FlatMapConcat
  • Stream enrichment with ZipWith

Logging with Log

The log operator enables the logging of elements passing through the stream, making it easier to monitor and debug data flow.

            
List<Integer> elements = Arrays.asList(1, 2, 3);
Source<Integer, NotUsed> source = Source.from(elements);
source
    .map(e -> {
        if (e == 3) {
            throw new RuntimeException();
        } else {
            return e;
        }
    })
    .log("ClassName.MethodName")
    .withAttributes(
        Attributes.createLogLevels(
            Attributes.logLevelOff(),
            Attributes.logLevelInfo(),
            Attributes.logLevelError()))
    .runForeach(System.out::println, system);

/* Prints:
1
2
[ERROR] [example] Upstream failed.
java.lang.RuntimeException ...
*/  
    
            
val source = Source(List(1, 2, 3))

source
    .map(e => if (e == 3) throw new RuntimeException() else e)
    .log("ClassName.MethodName")
    .addAttributes(
    Attributes.logLevels(
        onElement = Attributes.LogLevels.Off,
        onFinish = Attributes.LogLevels.Info,
        onFailure = Attributes.LogLevels.Error))
    .runForeach(println)

/* Prints:
1
2
[ERROR] [example] Upstream failed.
java.lang.RuntimeException ...
*/  
            
        

The log operator is applied to a Source stream containing integers. It logs elements with the prefix "ClassName.MethodName" as they pass through the stream. The stream is then processed using runForeach, which prints each element to the console. The default log levels do not log each element. There is only one info message after the stream is completed or in case of failures. The addAttributes configuration is only needed if you want to implement a different behavior.

The log operator is particularly useful during the development and debugging phase of an Akka Streams application. It allows developers to track the flow of data through the stream, helping to identify unexpected behavior, which in turn, facilitates quick resolution of issues in the streaming pipeline.

Transforming elements with Map

The map operator is a transformation operator in the reactive programming world. It's used to transform the items emitted into other elements. The map operator takes each item emitted by the source and applies a function to it. It always transforms one element into one element.

            
Source.range(1,4)
    .map(i -> Arrays.asList(i * 2))
    .runForeach(System.out::println, system);

/* Prints:
2
4
6
8
*/        
            
        
            
val source = Source(1 to 4)
def double(i: Int): Int = i * 2
source.map(i => double(i).toString).runForeach(println)

/* Prints:
2
4
6
8
*/
            
        

Transforming elements with mapConcat

The mapConcat operator is a transformation operator as well. It’s very similar to the map operator. Except for the fact that the mapConcat operator transforms each element into zero or more elements.

                
Source.range(1,3)
    .mapConcat(num -> Arrays.asList(num, num * 2))
    .runForeach(System.out::println, system);

/* Prints:
1
2
2
4
3
6
*/
                
            
                
Source(1 to 3).
    .mapConcat(num => List(num, num * 2))
    .runForeach(println)

/* Prints:
1
2
2
4
3
6
*/
                
            

In this example, the source emits the numbers 1, 2, and 3. The mapConcat operator creates a new List with the value and the value times two. That would result in a List(List(1,2), (List(2,4), List(3,6)). Then those Sub-Lists get flattened and concatenated. The resulting stream emits the numbers 1, 2, 2, 4, 3, and 6.

Understanding the mapConcat operator is a stepping stone toward understanding the flatMapConcat operator. Instead of mapping elements to a List of elements, it transforms elements into Sources of elements. The Sources get flattened in the same way the List got flattened in the previous example.

Emitting multiple elements from one element with FlatMapConcat

The flatMapConcat operator in Akka Streams transforms and flattens elements from the input stream into new nested streams, and concatenates them sequentially into a single output stream.

            
Source.range(1,3)
    .flatMapConcat(num -> Source.from(Arrays.asList(num, num * 2)))
    .runForeach(System.out::println, system);

/* Prints:
1
2
2
4
3
6
*/
            
        
            
Source(1 to 3)
    .flatMapConcat(num => Source(List(num, num * 2)))
    .runForeach(println)

/* Prints:
1
2
2
4
3
6
*/
            
        

The flatMapConcat operator is applied to a Source stream containing integers. It maps each input element to a new Source containing the original element and its double. These nested streams are then concatenated into a single output stream. Using runForeach, the resulting stream is processed and each element is printed to the console.

The flatMapConcat operator is particularly useful if you need to perform a transformation on elements in a stream that results in multiple output elements per input element. In addition, you want to maintain a strict sequential order of the output elements. For example, this could be useful when expanding a stream of events into multiple related sub-events while preserving their original order for processing or analysis.

Stream enrichment with ZipWith

The zipWith operator combines elements from multiple sources, applying a user-defined combine function to each pair of elements from the input streams and passing the returned value downstream.

            
public static void main(String[] args) {
    final ActorSystem system = ActorSystem.create("system");
    List sourceTemperature = Arrays.asList(31.1, 31.4, 31.5);
    List sourceHumidity = Arrays.asList(44, 48, 51);

    Source temperatureSource = Source.from(sourceTemperature);
    Source humiditySource = Source.from(sourceHumidity);

    temperatureSource
        .zipWith(humiditySource, (temperature, humidity) -> new WeatherData(temperature, humidity.toString()))
        .runForeach(System.out::println, system);
}

private static class WeatherData {
    private final Double temperature;
    private final String humidity;

    private WeatherData(Double temperature, String humidity) {
        this.temperature = temperature;
        this.humidity = humidity;
    }

    @Override
    public String toString() {
        return "WeatherData{temperature=" + temperature + ", humidity='" + humidity + '}';
    }
}

/* Prints:
WeatherData(31.1,44)
WeatherData(31.4,48)
WeatherData(31.5,51)
*/ 
            
        
            
case class WeatherData(temperature: Double, fruit: String)

val sourceTemperature = Source(List(31.1, 31.4, 31.5))
val sourceHumidity = Source(List(44, 48, 51))

sourceTemperature
    .zipWith(sourceHumidity) { (temperature, humidity) => WeatherData(temperature, humidity.toString) }
    .runForeach(println)

/* Prints:
WeatherData(31.1,44)
WeatherData(31.4,48)
WeatherData(31.5,51)
*/
            
        

The zipWith operator is used to combine two Source streams, sourceTemperature and sourceHumidity. The operator applies a function that creates a WeatherData case class instance for each pair of temperature and humidity values. The resulting stream contains WeatherData instances, which are then printed to the console using runForeach.

The zipWith operator is particularly useful if you need to merge two streams of correlated data. You can merge the elements into one data object and continue further processing.

Actor Integration

Akka Streams and Akka Actors are two powerful tools within the Akka toolkit, each with its unique strengths. While Akka Streams excel in handling back-pressure and building complex data processing pipelines, Akka Actors shine in managing state and handling concurrent, distributed, and fault-tolerant computing.

In this part, we will explore how to integrate actors into Akka Streams, combining their capabilities to create robust and flexible applications. Integrating actors into Akka Streams can be achieved through built-in operators that facilitate communication between streams and actors. The three primary operators for this purpose are ActorFlow.ask, Source.queue and Sink.actorRefWithBackpressure.

ActorFlow.ask Operator: The ask operator allows you to send messages from a stream to an actor, and expect a response from the actor. The operator takes a function that maps elements in the stream to messages for the actor, and an implicit Timeout parameter to specify the maximum wait time for a response. The ask operator returns a new stream containing the actor's responses.

Here's a simple example:

            
public static void main(String[] args) {
    ActorSystem<Void> system = ActorSystem.create(PrinterMain.create(), "system");
}

record Print(String msg, ActorRef<Printed> replyTo) {}
record Printed(String msg, ActorRef<Print> from) {}

static class Printer {
    static Behavior<Print> create() {
        return Behaviors.setup(
            context ->
            Behaviors.receiveMessage(print -> {
                System.out.println(print.msg);
                print.replyTo.tell(new Printed(print.msg(), context.getSelf()));
                return Behaviors.same();
            })
        );
    }
}

static class PrinterMain {
    static Behavior<Void> create() {
        return Behaviors.setup(context -> {
            ActorRef<Print> printerRef = context.spawn(Printer.create(), "printer");

            Flow<String, Printed, NotUsed> askFlow = ActorFlow.ask(
                printerRef,
                Duration.ofSeconds(1),
                Print::new);

            var numbers = IntStream.rangeClosed(1, 50).boxed();
            CompletionStage<Printed> done = Source.fromJavaStream(() -> numbers)
                .map(Object::toString)
                .via(askFlow)
                .runWith(Sink.last(), context.getSystem());

            done.thenRun(context.getSystem()::terminate);
            return Behaviors.same();
        });
    }
}

/* Prints:
1
2
...
49
50
*/
        
        
            
object Printer {
    final case class Print(msg: String, replyTo: ActorRef[Printed])
    final case class Printed(msg: String, from: ActorRef[Print])

    def apply(): Behavior[Print] = Behaviors.receive { (context, print) =>
        println(print.msg)
        print.replyTo ! Printed(print.msg, context.self)
        Behaviors.same
    }
}

object PrinterMain {
    def apply(): Behavior[Void] =
        Behaviors.setup { context =>
        val greeterRef = context.spawn(Printer(), "printer")

        implicit val timeout: Timeout = 1.second
        val askFlow: Flow[String, Printed, NotUsed] = ActorFlow.ask(greeterRef)(Print.apply)

        implicit val system = context.system
        val done = Source(1 to 50)
            .map(_.toString)
            .via(askFlow)
            .map(_.msg)
            .runWith(Sink.seq)

        implicit val ex = context.executionContext
        done.onComplete(done => system.terminate())
        Behaviors.same
    }
}

object Main extends App {
    val system = ActorSystem(PrinterMain(), "system")
}

/* Prints:
1
2
...
49
50
*/
            
        

In this example, we create an actor Printer Actor that receives a Print message, logs the data, and returns a Printed message. The ask operator is used to send messages to this actor and receive responses. The resulting stream contains the processed elements, which are then printed to the console.

Source.queue Operator: This queue operator is useful when you want to start a stream and send elements at a later point in time. You can offer elements to the queue and they will be emitted to the stream if there is demand from downstream, otherwise, they will be buffered until the request for demand is received.

                
public static void main(String[] args) {
ActorSystem system = ActorSystem.create(Behaviors.empty(), "system");

int bufferSize = 20;
int elementsToProcess = 5;

SourceQueueWithComplete queue = Source.queue(bufferSize, OverflowStrategy.backpressure())
    .throttle(elementsToProcess, Duration.ofSeconds(3))
    .map(x -> x * x)
    .toMat(Sink.foreach(x -> System.out.println("completed " + x)), Keep.left())
    .run(system);

Source.fromJavaStream(() -> IntStream.rangeClosed(1, 10).boxed())
    .mapAsync(1, x -> queue.offer(x)
        .thenApply(result -> {
            if (result instanceof QueueOfferResult.Enqueued$) {
                System.out.println("enqueued " + x);
            } else if (result instanceof QueueOfferResult.Dropped$) {
                System.out.println("dropped " + x);
            } else if (result instanceof QueueOfferResult.Failure) {
                System.out.println("Offer failed " + ((QueueOfferResult.Failure) result).cause().getMessage());
            } else if (result instanceof QueueOfferResult.QueueClosed$) {
                System.out.println("Source Queue closed");
            }
            return NotUsed.getInstance();
        }))
    .runWith(Sink.ignore(), system);
}
    
/* Prints:
enqueued 1
enqueued 2
enqueued 3
enqueued 4
enqueued 5
enqueued 6
enqueued 7
enqueued 8
enqueued 9
enqueued 10
completed 1
completed 4
completed 9
completed 16
completed 25
completed 36
completed 49
completed 64
completed 81
completed 100
*/
                
            
                
object Main extends App {

    implicit val system = ActorSystem(Behaviors.empty, "system")
    implicit val executionContext = system.executionContext

    val bufferSize = 20
    val elementsToProcess = 5

    val queue = Source
    .queue[Int](bufferSize)
    .throttle(elementsToProcess, 3.second)
    .map(x => x * x)
    .toMat(Sink.foreach(x => println(s"completed $x")))(Keep.left)
    .run()

    val source = Source(1 to 10)

    source.map(x => {
        queue.offer(x) match {
        case QueueOfferResult.Enqueued    => println(s"enqueued $x")
        case QueueOfferResult.Dropped     => println(s"dropped $x")
        case QueueOfferResult.Failure(ex) => println(s"Offer failed ${ex.getMessage}")
        case QueueOfferResult.QueueClosed => println("Source Queue closed")
        }
    })
    .runWith(Sink.ignore)
}

/* Prints:
enqueued 1
enqueued 2
enqueued 3
enqueued 4
enqueued 5
enqueued 6
enqueued 7
enqueued 8
enqueued 9
enqueued 10
completed 1
completed 4
completed 9
completed 16
completed 25
completed 36
completed 49
completed 64
completed 81
completed 100
*/
                
            

A Source queue of Integers is created with a buffer size of 20. The throttle operator is used to control the rate of the stream. It allows the processing of 5 elements every 3 seconds.

Each element is offered to the queue and the result of the offer is handled accordingly:

  • Enqueued: The element was successfully added to the queue
  • Dropped: The element could not be added because the queue was full
  • Failure: An error occurred when trying to add the element
  • QueueClosed: The queue was closed when trying to add the element

ActorSink.actorRefWithBackpressure Operator: This actorRefWithBackpressure sends elements from a Source to a Sink, which is connected to a given ActorRef using backpressure. This pushes elements downstream if the actor signals a demand. Therefore, there won't be a message overflow.

            
enum Ack {
    INSTANCE;
}

interface Protocol {}

    record Init(ActorRef ack) implements Protocol {}
    record Message(ActorRef ackTo, String msg) implements Protocol {}
    record Complete() implements Protocol {}
    record Fail(Throwable ex) implements Protocol {}

    final ActorRef actorRef = // spawned actor

    final Complete completeMessage = new Complete();

    final Sink sink =
        ActorSink.actorRefWithBackpressure(
            actorRef,
            (responseActorRef, element) -> new Message(responseActorRef, element),
            (responseActorRef) -> new Init(responseActorRef),
            Ack.INSTANCE,
            completeMessage,
            (exception) -> new Fail(exception));

    Source.single("msg1").runWith(sink, system);
            
        
            
trait Ack
object Ack extends Ack

trait Protocol
case class Init(ackTo: ActorRef[Ack]) extends Protocol
case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Throwable) extends Protocol

val actor: ActorRef[Protocol] = targetActor()

val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure(
    ref = actor,
    messageAdapter = (responseActorRef: ActorRef[Ack], element) => Message(responseActorRef, element),
    onInitMessage = (responseActorRef: ActorRef[Ack]) => Init(responseActorRef),
    ackMessage = Ack,
    onCompleteMessage = Complete,
    onFailureMessage = (exception) => Fail(exception))

Source.single("msg1").runWith(sink)
            
        

The Protocol trait defines the messages which are used to communicate between the stream and the actor.

  • Init: A message sent when the stream starts, containing a reference to the actor that will receive the acknowledgment messages.
  • Message: A message containing the actual data element from the stream and a reference to the actor that will receive the acknowledgment messages.
  • Complete: A message sent when the stream is successfully completed.
  • Fail: A message sent when the stream encounters a failure, containing the exception.

Then one creates an actor by calling a targetActor() function (which is not defined in the code snippet provided).

Define a Sink named sink using the ActorSink.actorRefWithBackpressure method.

  • ref The actor reference to which the stream will send messages.
  • messageAdapter A function that converts the stream element into a Message with the appropriate acknowledgment actor reference.
  • onInitMessage A function that creates the Init message containing the acknowledgment actor reference.
  • ackMessage The acknowledgment message that the actor sends back to the stream after processing an element.
  • onCompleteMessage The message to send when the stream is successfully completed.
  • onFailureMessage A function that converts a stream failure (exception) into a Fail message.

Finally, we create a Source with a single element "msg1" and connect it to the sink. This source will send the "msg1" string to the actor through the sink.

When the stream is executed, it will send an Init message to the actor, followed by a Message containing the "msg1" element. The actor will process the message and send back an Ack message to the stream, indicating that it is ready to receive the next message. In this case, since there is only one element in the source, the stream will complete and send a Complete message to the actor. If an error occurs during the stream processing, the Fail message with the exception will be sent to the actor.

A great read on Actor Integration with Akka Streams is also this article from RockTheJvm

Akka Alpakka - Simple Integration to External Services

Akka Alpakka is a powerful library that extends the capabilities of Akka Streams, enabling seamless integration with various external systems and technologies. Such as databases, message brokers, cloud storage providers, and more. By providing a diverse set of connectors, Alpakka simplifies the process of exchanging data between Akka Streams applications and other platforms.

Side Note: Akka Alpakka embodies the notion of adaptability and compatibility, as alpacas are known for their ability to thrive in diverse environments.

The following three connectors are very popular. I will shortly describe them and name the benefits of using Akka Alpakka afterward.

  • Apache Kafka Connector: This connector enables seamless integration with the widely-used Apache Kafka distributed streaming platform, simplifying the process of producing and consuming messages in Akka Streams applications.
  • Apache Cassandra Connector: The Cassandra Connector empowers Akka Streams applications to interact with Apache Cassandra, a highly scalable and distributed NoSQL database. It enables efficient and reliable data storage and retrieval through CassandraSessions.
  • AWS S3 Connector: The S3 Connector streamlines the storing and downloading of files from and to AWS. Additionally, it includes utility functions for bucket management such as creating, listing, and deleting them.

Why Akka Alpakka is Invaluable to Developers: Akka Alpakka's pre-built connectors save developers countless hours of effort by eliminating the need to create custom solutions for data exchange between Akka Streams applications and external systems. With Alpakka, developers can focus on their core business logic, knowing that the library's connectors are designed with robustness, performance, and scalability in mind. Furthermore, the connectors are built to adhere to the Reactive Streams specification, ensuring seamless integration with Akka Streams' back-pressure mechanism, and providing an end-to-end reactive pipeline.

TestKit for easy testing

The Akka Streams TestKit provides a powerful and flexible way to test your Akka Stream applications. With its suite of built-in tools and utilities, the TestKit makes it easy to verify the correctness and performance of your stream components. It comes with two main components that are TestSource and TestSink which provide sources and sinks that materialize to probes that allow fluent API.

            
Source<Integer, NotUsed> sourceUnderTest = Source.fromJavaStream(() -> IntStream.rangeClosed(1, 4))
    .filter(num -> num % 2 == 0)
    .map(num -> num * 2);

sourceUnderTest.runWith(TestSink.probe(testKit.system()), testKit.system())
    .request(2)
        .expectNext(4, 8)
        .expectComplete();
            
        
            
val sourceUnderTest = Source(1 to 4).filter(_ % 2 == 0).map(_ * 2)
sourceUnderTest.runWith(TestSink[Int]()).request(2).expectNext(4, 8).expectComplete()
            
        

This code snippet tests a Source that takes a sequence of integers from 1 to 4, filters out odd numbers, and multiplies the even numbers by 2.

            
Sink<Integer, NotUsed> sinkUnderTest = Sink.cancelled();
TestSource.<Integer>probe(testKit.system())
.toMat(sinkUnderTest, Keep.left())
.run(testKit.system())
.expectCancellation();
            
        
            
val sinkUnderTest = Sink.cancelled
TestSource[Int]().toMat(sinkUnderTest)(Keep.left).run().expectCancellation()
            
        

This code snippet tests the behavior of a Sink that cancels the stream immediately upon materialization.

As you can see, each component can be tested separately. One is able to break a stream into multiple flows and unit test each of them individually. This is incredibly useful for ensuring that your application is running fine and simplifies the creation of complex tests.

Because of the reusability, you can create your stream components once and reuse them any number of times. And because of the composability, one can create arbitrary complex data pipelines by concatenating individual components. A Stream component is independent, if it runs in one place it will run everywhere.

Conclusion

In this blog article, I've introduced you to the tools that will allow you to quickly become productive. From its set of built-in operators to integrating with external services using Akka Alpakka, combining your Streams with Akka Actors and testing them with the TestKit. As you continue to delve into the world of Akka Streams, you will discover that its ability to manage back pressure, provide fault tolerance, and enable distributed processing makes it an invaluable asset for building scalable and efficient data-driven applications. By following the tips, tricks, and best practices outlined in this article, you will be well on your way to mastering Akka Streams and leveraging its full potential to develop high-performance, resilient applications. Now it's your turn to start experimenting with Akka Streams today and watch your productivity soar!