Knative is an open source project that extends Kubernetes by providing a set of middleware components simplifying deploying, running, and managing serverless applications. Each of the components included in the Knative project implements a common pattern for solving such difficult tasks as:
These components allow developers to use familiar idioms, languages, and frameworks to deploy functions, applications, or containers workloads.
While there are many Knative examples for Knative serving and Knative eventing, it is hard to find examples written in Scala. In this blog post (and corresponding code on GitHub), we’ll fill in this gap by providing some Knative examples in Scala.
To install the latest Knative version (0.17), there’s an excellent tutorial on the Knative website. Installation is fairly simple and straightforward. The most important things to get right are:
See here for the specific steps and commands.
Knative Serving supports deploying and serving serverless applications and functions. It is built on top of Kubernetes and Istio, is easy to get started with, and scales to support advanced scenarios.
The main components of Knative Serving are shown below:
The Knative Service represents an instantiation of a single serverless container environment, which manages all other Knative components including:
To capture application code changes, Knative uses revisions representing a stateless, autoscaling snapshot-in-time of application code and configuration. Configuration keeps track of all known revisions. Routing of the HTTP request to a specific revision (set of revisions) is defined by route.
In a nutshell, a simple Knative implementation is a web server listening for a service request. Akka HTTP and this project, part of Knative serving samples, provide such an implementation. Unfortunately, this implementation only supports HTTP1, and as a result does not work with the latest version of Knative and Magic DNS. According to Akka HTTP documentation, enabling this code to work with HTTP2 is fairly straightforward - you just need to add the akka-http2-support library to your build and modify the code as follows (for the full implementation refer to GitHub):
val binding = Http().bindAndHandleAsync(
Route.asyncHandler(serviceRoute),
host,
port,
connectionContext = HttpConnectionContext()) andThen {
case Success(sb) =>
log.info("Bound: {}", sb)
case Failure(t) =>
log.error(t, "Failed to bind to {}:{}—shutting down", host, port)
system.terminate()
}
With this change in place, you can use this yaml file to deploy the service. Once the service is deployed, we can see what is created by running the command describe ksvc httpservice:
Name: httpservice
Namespace: default
…………………………...
Status:
Address:
URL: http://httpservice.default.svc.cluster.local
…………………………...
Latest Created Revision Name: httpservice-7s2n7
Latest Ready Revision Name: httpservice-7s2n7
Observed Generation: 1
Traffic:
Latest Revision: true
Percent: 100
Revision Name: httpservice-7s2n7
URL: http://httpservice.default.35.225.36.19.xip.io
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Created 39s service-controller Created Configuration "httpservice"
Normal Created 39s service-controller Created Route "httpservice"
Here we can see that service creation leads to assigning both local (http://httpservice.default.svc.cluster.local
) and remote (http://httpservice.default.35.225.36.19.xip.io
) addresses along with configuration (httpservice), revision (httpservice-7s2n7) and route (httpservice). It also specifies that 100% of the traffic gets routed to the latest (only one in this case) revision. By running the kubectl get deployments command, we can also see a deployment created for this revision:
NAME READY UP-TO-DATE AVAILABLE AGE
httpservice-7s2n7-deployment 0/0 0 0 37m
Note here that this deployment does not have any pods associated with it at the moment. This is because Knative Service is automatically autoscaled - and there is currently no traffic to a service. As a result Knative autoscaling scaled it down to zero. When we send a request (using this client), the autoscaling will create pod(s) for processing requests. The Autoscaler component watches traffic flow to the application, and scales replicas up or down based on configured metrics.
In addition to supporting HTTP services, Knative also supports gRPC. When working in Scala there are several ways to implement gRPC services. In this section we will look at two popular options: ScalaPB GRPC and Akka GRPC.
Regardless of the approach we start from the service protobuf definition. For this implementation we’ll use a simple “Hello World” definition. Once this is done, we can proceed with the service implementation.
This implementation is fairly straightforward and follows the ScalaPB documentation and contains two main classes:
Now we can use this yaml file (very similar to this one) to deploy the service. Once the service is deployed, we can use GRPCClient (see here for implementation details) to invoke it.
The implementation is following this documentation and contains two main classes:
The yaml file (similar to this one) can be used to deploy the service. You can use this GRPCClient to access the service. The client implementation follows this documentation with one caveat: by default, the Akka gRPC client assumes a secured (HTTPS) endpoint, while in our case, the gRPC service assumes non-secure access. In order to enable non-secure access, it is necessary to explicitly disable TLS in the gRPC client setting as follows:
val clientSettings = GrpcClientSettings.connectToServiceAt(host, port).withTls(false)
In the Knative Service examples we looked at so far, we had a single revision of the service. Now that we have two implementations of the same GRPC service, let’s try to experiment with a service containing several revisions and look at how to control traffic.
Let’s start by creating a new service using the following yaml file. Note that here we have added metadata to the spec:
……………………...
spec:
template:
metadata:
name: grpcversioned-grpc-example-first
Spec:
……………………….
Adding this metadata allows us to explicitly name the revision (in the previous examples, revision names were automatically generated).
Now let’s add a second revision to the created service by applying the following yaml file. When a new revision is added all traffic to it is redirected. If we do not want it to happen - for example, we want to deploy a new revision but continue to serve requests using the previous revision - we can use this yaml file. Here we added a traffic tag, which allows us to explicitly control how traffic is delivered:
………………………………….
traffic:
- tag: current
revisionName: grpcversioned-grpc-example-first
percent: 100
- tag: latest
latestRevision: true
percent: 0
Finally, by using this yaml file, we can split traffic between revisions (in this case evenly). Such ability to control traffic distribution in multi revisions is a foundation for different deployment strategies, including canary deployment and blue green deployments.
Knative Eventing allows passing events based on the CloudEvents specification from a producer to a consumer. An event consumer can be implemented as any type of code running on Kubernetes, for example, a “classic” Kubernetes deployment and service or a Knative Service.
Event producers can be:
Additionally, Knative Eventing provides composable primitives to enable late-binding event sources and event consumers.
CloudEvents is a vendor-neutral specification that defines the format of event data as a data record expressing an event occurrence and its context.
As defined in CloudEvents Primer, the specification defines an interoperability of event systems allowing services to produce and consume while developing and deploying consumers and producers independently. The specification is focused on the event formats, and not how they appear on the wire using specific protocols.
At its core, CloudEvents define a set of metadata about events transferred between systems - the minimal set of attributes needed to route the request to the proper component and to facilitate proper processing of the event by that component:
Here the attributes id, source, specversion and type are required, and the rest are optional.
The event format encodings - for example, JSON and Avro - define how an event model is encoded for mapping it to header and payload elements of an application protocol.
The protocol bindings - for example HTTP, Kafka, and MQIT - define how the CloudEvent is bound to an application protocol's transport frame.
In the case of the HTTP protocol (the one that we are using), all CloudEvent attributes (except for data and data_base64), including extensions, are individually mapped to and from distinct HTTP message headers with the same name as the attribute name but prefixed with ce-.
For our implementation, we have defined CloudEvents as the following case class:
case class CloudEvent(var id: String, var source: URI, var specversion : String, var `type`: String,
var datacontenttype: Option[String], var dataschema: Option[URI],
var subject: Option[String], var time: Option[ZonedDateTime],
var data: Option[String], var data_base64: Option[Array[Byte]],
var extensions: Option[Map[String, Any]]) {
override def toString: String = {
val builder = new StringBuilder()
builder.append("CloudEvent{").append(s"id=$id,").append(s" source=${source.toString},").
append(s" specversion=$specversion,").append(s" type=${`type`},")
datacontenttype match {
case Some(d) => builder.append(s" datacontenttype = $d,")
case _ =>
}
dataschema match {
case Some(d) => builder.append(s" dataschema = ${d.toString},")
case _ =>
}
subject match {
case Some(s) => builder.append(s" subject=$s,")
case _ =>
}
time match {
case Some(t) => builder.append(s" time=$t,")
case _ =>
}
data match {
case Some(d) => builder.append(s" data=$d,")
case _ =>
}
data_base64 match {
case Some(d) => builder.append(s" data=$d,")
case _ =>
}
extensions match {
case Some(e) => builder.append(s" extensions=$e")
case _ =>
}
builder.append("}")
builder.toString()
}
…
Here, in addition to the content, we provide a toString method for printing of the cloud events.
In the case of the HTTP event binding that we are implementing, an event consumer has to:
As a result, Akka HTTP is a natural choice for implementing an event consumer. A CloudEventProcessing trait provides all the functionality for receiving cloud events:
trait CloudEventProcessing extends Directives {
def route(eventpath : String = "") : Route =
path(eventpath) {
post {
extractRequest { request =>
request.headers.foreach(header => {
println(s"Header ${header.name()} - ${header.value()}")
})
entity(as[Array[Byte]]) { entity ⇒
val event = CloudEvent("", null, "", "", None, None, None, None, None, None, None)
var extensions : Map[String, Any] = Map()
request.headers.foreach { header => {
header.name() match {
// Attributes
case name if name == "ce-id" => event.id = header.value()
case name if name == "ce-source" => event.source = URI.create(header.value())
case name if name == "ce-specversion" => event.specversion = header.value()
case name if name == "ce-type" => event.`type` = header.value()
case name if name == "ce-dataschema" => event.dataschema = Some(URI.create(header.value()))
case name if name == "ce-subject" => event.subject = Some(header.value())
case name if name == "ce-time" => event.time = Some(ZonedDateTime.parse(header.value()))
// extensions
case name if name.startsWith("ce-") && (name.contains("extension")) =>
val nend = name.indexOf("extension")
val exname = name.substring(3, nend)
extensions = extensions .+(exname -> header.value())
// Data
case name if name == "ce-datacontenttype" =>
if (header.value().contains("json") || header.value().contains("javascript") || header.value().contains("text"))
event.data = Some(new String(entity))
else
event.data_base64 = Some(entity)
case _ =>
}
}
if (extensions.size > 0)
event.extensions = Some(extensions)
if(event.datacontenttype == None) // We did not get content type, default it to JSON event.data = Some(new String(entity))
}
processEvent(event)
complete(StatusCodes.OK)
}
}
}
}
def processEvent(event : CloudEvent) : Unit
}
The most important method in this trait is route, which implements the processing of an incoming request. It processes all received headers to build a cloud event out of them. This trait is used by the CloudEventsReciever class which implements a HTTP receiver based on Akka HTTP server.
A custom event producer is an application that is sending events. The only requirement to this implementation is usage of a K_SINK system variable for determining where to send events (see below). This can be easily achieved using a typesafe config library. In order to simplify the producer implementation, I have added the toHttpRequest method to the CloudEvents class
def toHttpRequest(uri: String): HttpRequest = {
var headers: scala.collection.immutable.Seq[HttpHeader] = scala.collection.immutable.Seq(
// Mandatory fields
RawHeader("ce-id", id),
RawHeader("ce-source", source.toString),
RawHeader("ce-specversion", specversion),
RawHeader("ce-type", `type`))
// OPtional fields
datacontenttype match {
case Some(c) => headers = headers :+ RawHeader("ce-datacontenttype", c)
case _ => headers = headers :+ RawHeader("ce-datacontenttype", "application/json")
}
dataschema match {
case Some(d) => headers = headers :+ RawHeader("ce-dataschema", d.toString)
case _ =>
}
subject match {
case Some(s) => headers = headers :+ RawHeader("ce-subject", s)
case _ =>
}
time match {
case Some(t) => headers = headers :+ RawHeader("ce-time", t.toString)
case _ =>
}
extensions match {
case Some(e) =>
for ((key, value) <- e)
headers = headers :+ RawHeader(s"ce-${key}extension", value.toString)
case _ =>
}
// Entity
val entity = datacontenttype match {
case Some(dtype) =>
if (dtype.contains("json") || dtype.contains("javascript") || dtype.contains("text")) {
HttpEntity(ContentTypes.`application/json`, data.getOrElse(""))
} else {
HttpEntity(ContentTypes.`application/octet-stream`, data_base64.getOrElse(Array[Byte]())
)
}
case _ => HttpEntity(ContentTypes.`application/json`, "")
}
HttpRequest(
method = HttpMethods.POST,
uri = uri,
entity = entity,
headers = headers
)
}
This method builds headers and data entities based on the content of the cloud event. With this method in place the implementation of CloudEventsSender is fairly straightforward.
Now that we have an implementation of both the cloud events sender and receiver, let’s take a look at different usage patterns for Knative Eventing:
In this case, the source sends a message directly to a sink, and there is no queuing or filtering. It is a one-to-one relationship with no delivery guarantees at all:
There are two options to connect source to sink - direct binding and using sinkbinding.
In order to directly connect producers to consumers, we first start a consumer Knative service using the following yaml file, and then start the consumer using this yaml file. Here a consumer is created as a ContainerSource, which allows us to add a sink, which points to the consumer service:
…………………………………..
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: eventsreciever
This reference generates a K-SINK system variable that is used as an address to send events. When a sinkbinding event source deployment (a standard Kubernetes deployment) is unaware of the event consumer deployment (also a standard Kubernetes deployment), the sinkbinding links the two together:
apiVersion: sources.knative.dev/v1alpha1
kind: SinkBinding
metadata:
name: bind-cloud-events
spec:
subject:
apiVersion: apps/v1
kind: Deployment
selector:
matchLabels:
app: cloud-events-source
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: eventsreciever
Note that sink binding is using the same sink definition as a Knative event source and works the same way (i.e., sets the K-SINK system variable in the event’s source).
In addition to a custom source (shown above), Knative also provides several pre-built sources including Sugar Controller, Github Source, Apache Camel-K Source, Apache Kafka Source, GCP Sources, Apache CouchDB Source, and VMware Sources and Bindings1. Usage of these pre-built sources allows for a seamless integration with existing Knative event consumers. Here, following this example, we will demonstrate a direct connection between Kafka and an event listener (HTTP). We will use the same event listener as above (deployed using this yaml file). We will start by creating the topic used for events and then using it in the Kafka event source, which links the Kafka topic with the deployed service. Note that we are using the same sink as above:
…………………………………..
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: eventsreciever
With everything in place, we can use an approach as described here to make sure that everything works correctly.
In this example we were sending Kafka messages “manually”, but it is also possible to use Alpakka Kafka to publish to Kafka programmatically.
With Kafka source you can also use sinkbinding. Check out an example here.
A Knative Channel is a custom resource that can persist and forward events to multiple destinations (via subscriptions). There are multiple channel implementations, such as InMemoryChannel and KafkaChannel. In this case an event producer writes to a channel and a subscription connects a channel to an event consumer. Each sink service needs its own subscription to a channel. This introduces an intermediary (channel) between event producers and consumers, which makes deployment simpler and more flexible.
Some of the characteristics of channel-based delivery are:
To implement channel-based delivery, we will start by creating a channel. Here we are using the InMemoryChannel, which is “a best effort Channel.” InMemoryChannel is probably not appropriate for production, but it is very convenient for development. Now you can create an event receiver and subscription, connecting the receiver to the channel. Finally, we create events publisher, publishing events to the channel
…………………………………...
sink:
ref:
apiVersion: messaging.knative.dev/v1beta1
kind: InMemoryChannel
name: channel
If you need a persistent channel, you can use the Apache Kafka channel (example here).
Although channel-based delivery provides a higher flexibility compared to a direct source to sink connectivity, it still has some pitfalls:
Many of these shortcomings are alleviated in the broker and trigger delivery models. A broker combines channel, reply, and filter functionality into a single resource, while a trigger provides a mechanism for declarative filtering of all events:
A broker is a Knative custom resource that is composed of at least two distinct objects, an ingress and a filter. Events are sent to the broker ingress, and the filter strips all metadata from the event data that is not part of the CloudEvent. Brokers typically use Knative Channels to deliver the events.
Under the covers, trigger creates subscriptions. This means any service can reply back to an incoming event. A reply event routed back through a broker and services interested in that type of event receive the event. In addition, it allows filtering on specific events based on their CloudEvent attributes. But triggers have some limitations. Although they allow filtering on multiple attributes, these attributes are always ANDed. We cannot define a trigger that would filter on OR of several attributes (multiple triggers are required for this). Additionally, a trigger can only define a single subscriber (service).
In order to use this interaction style, first create a broker using this yaml file. Now you can start the events consumer (a simple kubernetes deployment with service in this case) and event producer (container source). Note here that sink is referencing a broker we just created.
………………………………………..
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: default
Finally, a trigger specifies event filtering information and connectivity to an event consumer.
…………………………….
spec:
broker: default
filter:
attributes:
source: https://com.lightbend.knative.eventing/CloudEventsSender
subscriber:
ref:
apiVersion: v1
kind: Service
name: service-consumer
For information on using the Apache Kafka broker, please refer to an example here.
To help you discover all the events existing in your system, Knative introduces a simple Events registry, maintaining a catalog of the event types that can be consumed from different brokers. It also introduces a new EventType CRD in order to persist the event type’s information in the cluster’s data store. An Events registry supports two ways of populating data: manual and automatic.
In the case of manual registration, you create an event type yaml and deploy it to the cluster. Some of the Knative event sources, such as CronJobSource, ApiServerSource, GithubSource, GcpPubSubSource, KafkaSource and AwsSqsSource, support automatic registration of EventTypes.
Once the registry is populated, the event types can be used for the creation of triggers.
In addition to basic eventing mechanisms, described here, Knative eventing provides two high-level eventing constructs: Sequence and Parallel.
Sequence provides a way to define a list of (ordered) functions that will be invoked. Each function can modify, filter or create a new kind of an event. Sequence also creates Channels and Subscriptions under the hood. A definition of Sequence includes:
You can check out examples of Sequence usage here.
Visibility of the Sequence execution is provided through status containing:
Parallel provides a way to define a list of branches, each receiving the same CloudEvent sent to the Parallel ingress channel. Typically, each branch consists of a filter function guarding the execution of the branch. Parallel creates Channels and Subscriptions under the hood. A definition of Parallel includes:
You can check out examples of Parallel usage here.
Visibility of Parallel execution is provided through status containing:
Usage of these high-level constructs allows you to build complex event processing implementations leveraging Knative without the introduction of additional tools.
In this post, I have explored both Knative serving and eventing, highlighting many provided interaction patterns. I have also shown how to use Scala and Akka for implementation of Knative Service, event consumers and custom event producers.
If you're developing cloud native applications, Akka Serverless can get you going quickly and easily by removing the complexity behind managing distributed state. Register your interest to participate in the Akka Serverless preview—and start building a new class of business applications.
1 Note that these additional sources are not installed by default and require separate installation, for example, Kafka source can be installed using the following command:
kubectl apply --filename https://github.com/knative/eventing-contrib/releases/download/v0.17.0/kafka-source.yaml
?