Lightbend Activator

Akka Streams with Scala!

Activator will be EOL-ed on May 24, 2017.

We’re making it easier and simpler for developers to get started with Lightbend technologies. This unfortunately means that future releases of Play, Akka and Scala will no longer include Activator support, and Lightbend’s Activator server will be decommissioned by the end of 2017. Instead of supporting Activator to create and set up development projects, we'll be supporting standard Giter8 templates for sbt users and Maven archetypes for Maven users. So going forward,

To create new Lightbend projects

Instead of using the Activator command, make sure you have sbt 0.13.13 (or higher), and use the “sbt new” command, providing the name of the template. For example, “$ sbt new akka/hello-akka.g8”. You can find a list of templates here.

Also, as a convenience, the Lightbend Project Starter allows you to quickly create a variety of example projects that you just unzip and run.

To create new templates

If you want to create new templates, you can now do that in Giter8.

To migrate templates from Activator to Giter8

If you created Activator templates in the past, please consider migrating them to Giter8 with this simple process.

Akka Streams with Scala!

December 29, 2015
akka scala sample

Demonstrates Akka Streams

How to get "Akka Streams with Scala!" on your computer

There are several ways to get this template.

Option 1: Choose akka-stream-scala in the Lightbend Activator UI.

Already have Lightbend Activator (get it here)? Launch the UI then search for akka-stream-scala in the list of templates.

Option 2: Download the akka-stream-scala project as a zip archive

If you haven't installed Activator, you can get the code by downloading the template bundle for akka-stream-scala.

  1. Download the Template Bundle for "Akka Streams with Scala!"
  2. Extract the downloaded zip file to your system
  3. The bundle includes a small bootstrap script that can start Activator. To start Lightbend Activator's UI:

    In your File Explorer, navigate into the directory that the template was extracted to, right-click on the file named "activator.bat", then select "Open", and if prompted with a warning, click to continue:

    Or from a command line:

     C:\Users\typesafe\akka-stream-scala> activator ui 
    This will start Lightbend Activator and open this template in your browser.

Option 3: Create a akka-stream-scala project from the command line

If you have Lightbend Activator, use its command line mode to create a new project from this template. Type activator new PROJECTNAME akka-stream-scala on the command line.

Option 4: View the template source

The creator of this template maintains it at

Option 5: Preview the tutorial below

We've included the text of this template's tutorial below, but it may work better if you view it inside Activator on your computer. Activator tutorials are often designed to be interactive.

Preview the tutorial


This tutorial contains a few samples that demonstrates Akka Streams.

Akka Streams is an implementation of Reactive Streams, which is a standard for asynchronous stream processing with non-blocking backpressure. Akka Streams is interoperable with other Reactive Streams implementations.

Akka Streams is currently under development and these samples use a preview release, i.e. changes can be expected. Please try it out and send feedback to the Akka mailing list.

Akka Streams provides a way to express and run a chain of asynchronous processing steps acting on a sequence of elements. Every step is processed by one actor to support parallelism. The user describes the “what” instead of the “how”, i.e. things like batching, buffering, thread-safety are handled behind the scenes.

The processing steps are declared with a DSL, a so called Flow. A Flow may be connected to a Source and/or a Sink. It may also exist without either of these end points, as an "open" flow. Any open flow when connected to a Source itself becomes a Source and likewise when connected to a Sink becomes a Sink. A Flow with both a Source and a Sink is called a RunnableFlow and may be executed.

The Source can be constructed from a collection, an iterator, a future, or a function which is evaluated repeatedly.

Each DSL element produces a new Flow that can be further transformed, building up a description of the complete transformation pipeline. In order to execute this pipeline the Flow must be runnable (have both Source and Sink endpoints, and is materialized by calling one of the execution methods which include .run, .runWith, .runForeach and .runFold.

Running a Flow involves a process called materialization, which requires a FlowMaterializer configured for an actor system. This FlowMaterializer can (and usually is) an implicit value to streamline the calls.

It should be noted that the streams modeled by this library are “hot”, meaning that they asynchronously flow through a series of processors without detailed control by the user. In particular it is not predictable how many elements a given transformation step might buffer before handing elements downstream, which means that transformation functions may be invoked more often than for corresponding transformations on strict collections like List. An important consequence is that elements that were produced into a stream may be discarded by later processors, e.g. when using the take combinator.

By default every operation is executed within its own Actor to enable full pipelining of the chained set of computations. This behavior is determined by the which is required by those methods that materialize the Flow into a series of org.reactivestreams.Processor instances that are started and active. Synchronous compaction of steps is possible (but not yet implemented).

Basic transformation

What does a Flow look like?

Open BasicTransformation.scala

Here we use an Iterator over the Array produced by splitting the text using the spaces, as input producer; note that the iterator is an Iterator[String] and this produces a Source[String, Unit]. The first type parameter is the type of the elements that this source is producing. Second type parameter is the type of the materialized value of this source. A value of this type will be returned after materialization when calling any of run* methods. In this case materialized value has a type of Unit which means no value will be materialized.

The flow written to use this source must match the the first type parameter, so we could not treat the source as a source of Int for example.

In this sample we convert each read line to upper case and printing it to the console. This is done in the lines map(_.toUpperCase) and runForeach(println).

The map(_.toUpperCase) takes Strings and produces Strings. When this is attached to the Source[String, Unit], the result is a new Flow that is also a Source[String, Unit]. If the map was over a function that converted, say, String to Int, the result would be a Source[Int, Unit] when attaching it to this Source[String, Unit].

The runForeach(println) constructs and attaches a Sink, in this case an implementation called Sink.foreach and again this is specifically a Sink[String, Unit] which matches the first type parameter of the Source[String, Unit]. The result of attaching this matching Sink to the Source creates a RunnableFlow which is then also run by the runForeach call.

Unlike a foreach on a collection (which returns Unit), the runForeach on a Flow returns a Future[Unit] instead. Because we get a Future back, we can use it to shutdown the actor system once the flow is completed. This is accomplished by the final line in the flow: onComplete(_ => system.shutdown())

Try to run the class by selecting it in the 'Main file' menu in the Run tab and click the 'Run' button.

Try to add additional steps in the flow, for example skip short lines:

filter(line => line.length > 3).

The API is intended to be familiar to anyone used to the collections API in Scala.

All stream manipulation operations can be found in the API documentation.


The mandatory non-blocking backpressure is a key feature of Reactive Streams.

Open WritePrimes.scala

In this sample we use a fast producer and several consumers, with potentially different throughput capacity. To avoid out of memory problems it is important that the producer does not generate elements faster than what can be consumed. Also the speed of the slowest consumer must be taken into account to avoid unbounded buffering in intermediate steps.

Here we use a random number generator as input. The input producer is a block of code which is evaluated repeatedly. It can generate elements very fast if needed.

We filter the numbers through two prime number checks and end up with a stream of prime numbers, which neighbor +2 number is also a prime number. These two flow filter steps can potentially be pipelined, i.e. executed in parallel.

Then we connect that prime number producer to two consumers. One writing to a file, and another printing to the console. To simulate that the file writer is slow we have added an additional sleep in a map stage right before the SynchronousFileSink.

Try to run the class by selecting it in the 'Main file' menu in the Run tab and click the 'Run' button.

Note that speed of the output in the console is limited by the slow file writer, i.e. one element per second. When running the app you will notice that elements are not requested precisely 1 by 1, but in larger batches. This is because the File sink tries to optimise its request behaviour in order to write as many elements at once as possible to the target file. You can configure this behaviour by setting .withAttributes(Attributes.inputBuffer(initial, max)).

Open primes.txt to see the file output.

Stream of streams

Let us take a look at an example of more advanced stream manipulation.

Open GroupLogFile.scala

We want to read a log file and pipe entries of different log level to separate files. For this the groupBy operator is useful. It demultiplexes the incoming flow into separate source flows, one for each element key. The key is computed for each element using the given function. When a new key is encountered for the first time it is emitted to the downstream consumer together with a fresh producer that will eventually produce all the elements of the substream.

In this sample we group by a regular expression matching the log levels and then write the elements of each group to a separate file.

Try to run the class by selecting it in the 'Main file' menu in the Run tab and click the 'Run' button.

Open the input logfile.txt and look at the resulting output log files in the target directory.

TCP Stream

Akka Streams also provides a stream based API on top of Akka I/O.

Open TcpEcho.scala

When you Run TcpEcho without parameters it starts both client and server in the same JVM and the client connects to the server over port 6000.

The server is started by calling bind on the extension. It returns a Source[Tcp.IncomingConnection, Future[StreamTcp.ServerBinding]]. Each new client connection is represented by a new IncomingConnection element produced by the source. From the connection the server can operate on the ByteString elements. This source materializes to a future that is completed with a ServerBinding when server starts listening on a given address.

In this sample the server sends back the same bytes as it receives.

conn handleWith Flow[ByteString]

You can add transformation of the bytes using a Flow. For example convert characters to upper case.

val toUpper = Flow[ByteString].map(b => ByteString.fromString(b.utf8String.toUpperCase()))
 conn handleWith toUpper

The connection from the client is established by calling outgoingConnection on the extension and attaching corresponding source and a sink to the returned flow.

val result = Source(testInput).via(Tcp().outgoingConnection(address, port)).
 runFold(ByteString.empty) { (acc, in) ⇒ acc ++ in }

In this sample the client sends a sequence of characters one-by-one to the server, aggregates the replies into a single ByteString, and finally prints that.

Try to run the class by selecting it in the 'Main file' menu in the Run tab and click the 'Run' button.

That runs the client and server in the same JVM process. It can be more interesting to run them in separate processes. Run the following commands in separate terminal windows.

<path to activator dir>/activator "run-main server 6001"
<path to activator dir>/activator "run-main client 6001"

You can also interact with the server with telnet:

telnet 6001

Type a few characters in the telnet session and press enter to see them echoed back to the terminal.