Knative is an open source project that extends Kubernetes by providing a set of middleware components simplifying deploying, running, and managing serverless applications. Each of the components included in the Knative project implements a common pattern for solving such difficult tasks as:

These components allow developers to use familiar idioms, languages, and frameworks to deploy functions, applications, or containers workloads.

While there are many Knative examples for Knative serving and Knative eventing, it is hard to find examples written in Scala. In this blog post (and corresponding code on GitHub), we’ll fill in this gap by providing some Knative examples in Scala.

Install Knative

To install the latest Knative version (0.17), there’s an excellent tutorial on the Knative website. Installation is fairly simple and straightforward. The most important things to get right are:

  • When installing Istio, make sure that you create a cluster-local-gateway. Without this component in place, local Knative service invocations will not work.
  • Configuring DNS with Magic DNS creates Knative Serving with default DNS suffix xip.io. The caveats of using this DNS are:
    • Only the root of the serving is accessible through generated URLs, so url-based HTTP requests do not work.
    • The load balancer is using HTTP2, so make sure that your code is using HTTP2.

See here for the specific steps and commands.

Knative Serving examples

Knative Serving supports deploying and serving serverless applications and functions. It is built on top of Kubernetes and Istio, is easy to get started with, and scales to support advanced scenarios.

Knative Serving components

The main components of Knative Serving are shown below:

The Knative Service represents an instantiation of a single serverless container environment, which manages all other Knative components including:

  • A network address through which a service can be reached
  • Application code
  • Configuration

To capture application code changes, Knative uses revisions representing a stateless, autoscaling snapshot-in-time of application code and configuration. Configuration keeps track of all known revisions. Routing of the HTTP request to a specific revision (set of revisions) is defined by route.

Knative Hello World

In a nutshell, a simple Knative implementation is a web server listening for a service request. Akka HTTP and this project, part of Knative serving samples, provide such an implementation. Unfortunately, this implementation only supports HTTP1, and as a result does not work with the latest version of Knative and Magic DNS. According to Akka HTTP documentation, enabling this code to work with HTTP2 is fairly straightforward - you just need to add the akka-http2-support library to your build and modify the code as follows (for the full implementation refer to GitHub):

val binding = Http().bindAndHandleAsync(
 Route.asyncHandler(serviceRoute),
 host,
 port,
 connectionContext = HttpConnectionContext()) andThen {
 case Success(sb) =>
   log.info("Bound: {}", sb)
 case Failure(t) =>
   log.error(t, "Failed to bind to {}:{}—shutting down", host, port)
   system.terminate()
}

With this change in place, you can use this yaml file to deploy the service. Once the service is deployed, we can see what is created by running the command describe ksvc httpservice:

Name:         httpservice
Namespace:    default
…………………………...
Status:
  Address:
    URL:  http://httpservice.default.svc.cluster.local
…………………………...
  Latest Created Revision Name:  httpservice-7s2n7
  Latest Ready Revision Name:    httpservice-7s2n7
  Observed Generation:           1
  Traffic:
    Latest Revision:  true
    Percent:          100
    Revision Name:    httpservice-7s2n7
  URL:                http://httpservice.default.35.225.36.19.xip.io
Events:
  Type    Reason   Age   From                Message
  ----    ------   ----  ----                -------
  Normal  Created  39s   service-controller  Created Configuration "httpservice"
  Normal  Created  39s   service-controller  Created Route "httpservice"

Here we can see that service creation leads to assigning both local (http://httpservice.default.svc.cluster.local) and remote (http://httpservice.default.35.225.36.19.xip.io) addresses along with configuration (httpservice), revision (httpservice-7s2n7) and route (httpservice). It also specifies that 100% of the traffic gets routed to the latest (only one in this case) revision. By running the kubectl get deployments command, we can also see a deployment created for this revision:

NAME                           READY   UP-TO-DATE   AVAILABLE   AGE
httpservice-7s2n7-deployment   0/0     0            0           37m

Note here that this deployment does not have any pods associated with it at the moment. This is because Knative Service is automatically autoscaled - and there is currently no traffic to a service. As a result Knative autoscaling scaled it down to zero. When we send a request (using this client), the autoscaling will create pod(s) for processing requests. The Autoscaler component watches traffic flow to the application, and scales replicas up or down based on configured metrics.

gRPC services with Knative

In addition to supporting HTTP services, Knative also supports gRPC. When working in Scala there are several ways to implement gRPC services. In this section we will look at two popular options: ScalaPB GRPC and Akka GRPC.

Regardless of the approach we start from the service protobuf definition. For this implementation we’ll use a simple “Hello World” definition. Once this is done, we can proceed with the service implementation.

Implementing gRPC service with Scala PB

This implementation is fairly straightforward and follows the ScalaPB documentation and contains two main classes:

  • GreeterImpl, which implements the service defined in the service definition.
  • HelloWorldServer, which manages the lifecycle of the gRPC server. It is basically a “standard” code (see here for additional details).

Now we can use this yaml file (very similar to this one) to deploy the service. Once the service is deployed, we can use GRPCClient (see here for implementation details) to invoke it.

Implementing gRPC service with Akka GRPC

The implementation is following this documentation and contains two main classes:

  • GreeterServiceImpl, which implements the service defined in the service definition.
  • GreeterServer, which manages the lifecycle of the gRPC server. It is basically a “standard” code (see here for additional details).

The yaml file (similar to this one) can be used to deploy the service. You can use this GRPCClient to access the service. The client implementation follows this documentation with one caveat: by default, the Akka gRPC client assumes a secured (HTTPS) endpoint, while in our case, the gRPC service assumes non-secure access. In order to enable non-secure access, it is necessary to explicitly disable TLS in the gRPC client setting as follows:

val clientSettings = GrpcClientSettings.connectToServiceAt(host, port).withTls(false)

Controlling traffic between Knative Service revisions

In the Knative Service examples we looked at so far, we had a single revision of the service. Now that we have two implementations of the same GRPC service, let’s try to experiment with a service containing several revisions and look at how to control traffic.

Let’s start by creating a new service using the following yaml file. Note that here we have added metadata to the spec:

……………………...
spec:
 template:
   metadata:
     name: grpcversioned-grpc-example-first
   Spec:
……………………….

Adding this metadata allows us to explicitly name the revision (in the previous examples, revision names were automatically generated).

Now let’s add a second revision to the created service by applying the following yaml file. When a new revision is added all traffic to it is redirected. If we do not want it to happen - for example, we want to deploy a new revision but continue to serve requests using the previous revision - we can use this yaml file. Here we added a traffic tag, which allows us to explicitly control how traffic is delivered:

………………………………….
traffic:
- tag: current
 revisionName: grpcversioned-grpc-example-first
 percent: 100
- tag: latest
 latestRevision: true
 percent: 0

Finally, by using this yaml file, we can split traffic between revisions (in this case evenly). Such ability to control traffic distribution in multi revisions is a foundation for different deployment strategies, including canary deployment and blue green deployments.

Knative Eventing examples

Knative Eventing allows passing events based on the CloudEvents specification from a producer to a consumer. An event consumer can be implemented as any type of code running on Kubernetes, for example, a “classic” Kubernetes deployment and service or a Knative Service.

Event producers can be:

  • Custom code
  • A Kafka message broker
  • A GitHub repository
  • A Kubernetes API server emitting cluster events
  • and many more

Additionally, Knative Eventing provides composable primitives to enable late-binding event sources and event consumers.

CloudEvents

CloudEvents is a vendor-neutral specification that defines the format of event data as a data record expressing an event occurrence and its context.

As defined in CloudEvents Primer, the specification defines an interoperability of event systems allowing services to produce and consume while developing and deploying consumers and producers independently. The specification is focused on the event formats, and not how they appear on the wire using specific protocols.

At its core, CloudEvents define a set of metadata about events transferred between systems - the minimal set of attributes needed to route the request to the proper component and to facilitate proper processing of the event by that component:

  • Id - identifies the event, for example: "A234-1234-1234".
  • Source - identifies the context in which an event happened, for example: "https://github.com/cloudevents","mailto:cncf-wg-serverless@lists.cncf.io", "urn:uuid:6e8bc430-9c3a-11d9-9669-0800200c9a66", "cloudevents/spec/pull/123".
  • Specversion - identifies the version of the CloudEvents specification which the event uses, for example: "1.x-wip".
  • Type - describes the type of event related to the originating occurrence, for example: "com.github.pull_request.opened", "com.example.object.deleted.v2".
  • Datacontenttype - defines the content type of the data value which must adhere to the RFC2046 format, for example: "text/xml", "application/json", "image/png".
  • Dataschema - identifies the schema that data adheres to, for example: "$ref": "#/definitions/dataschemadef".
  • Subject - describes the subject of the event in the context of the event producer (identified by source), for example: "mynewfile.jpg".
  • Time - a timestamp of when the occurrence happened which must adhere to RFC 3339, for example: "2018-04-05T17:31:00Z".
  • Data - contains the event payload, for example: "".
  • Data_base64 - contains the base64 encoded event payload, which must adhere to RFC4648, for example: "Zm9vYg==".
  • Extensions - add a key/value map to the event content, where key is a string and value is any object. Extension attributes to the CloudEvent specification are meant to be additional metadata that needs to be included to help ensure proper routing and processing of the CloudEvent.

Here the attributes id, source, specversion and type are required, and the rest are optional.

The event format encodings - for example, JSON and Avro - define how an event model is encoded for mapping it to header and payload elements of an application protocol.

The protocol bindings - for example HTTP, Kafka, and MQIT - define how the CloudEvent is bound to an application protocol's transport frame.

In the case of the HTTP protocol (the one that we are using), all CloudEvent attributes (except for data and data_base64), including extensions, are individually mapped to and from distinct HTTP message headers with the same name as the attribute name but prefixed with ce-.

For our implementation, we have defined CloudEvents as the following case class:

case class CloudEvent(var id: String, var source: URI, var specversion : String, var `type`: String,
                     var datacontenttype: Option[String], var dataschema: Option[URI], 
        var subject: Option[String], var time: Option[ZonedDateTime], 
        var data: Option[String], var data_base64: Option[Array[Byte]],
                     var extensions: Option[Map[String, Any]]) {

 override def toString: String = {
   val builder = new StringBuilder()
   builder.append("CloudEvent{").append(s"id=$id,").append(s" source=${source.toString},").
append(s" specversion=$specversion,").append(s" type=${`type`},")
   datacontenttype match {
     case Some(d) => builder.append(s" datacontenttype = $d,")
     case _ =>
   }
   dataschema match {
     case Some(d) => builder.append(s" dataschema = ${d.toString},")
     case _ =>
   }
   subject match {
     case Some(s) => builder.append(s" subject=$s,")
     case _ =>
   }
   time match {
     case Some(t) => builder.append(s" time=$t,")
     case _ =>
   }
   data match {
     case Some(d) => builder.append(s" data=$d,")
     case _ =>
   }
   data_base64 match {
     case Some(d) => builder.append(s" data=$d,")
     case _ =>
   }
   extensions match {
     case Some(e) => builder.append(s" extensions=$e")
     case _ =>
   }
   builder.append("}")
   builder.toString()
 }
…

Here, in addition to the content, we provide a toString method for printing of the cloud events.

Implementing event producers and consumers

In the case of the HTTP event binding that we are implementing, an event consumer has to:

  • Be addressable - able to receive and acknowledge an event delivered over HTTP to a well-defined address. Both Knative services and general Kubernetes services are addressable.
  • Be callable - able to receive an event delivered over HTTP and transform the event, returning 0 or 1 new events in the HTTP response. These returned events may be further processed in the same way that events from an external event source are processed.

As a result, Akka HTTP is a natural choice for implementing an event consumer. A CloudEventProcessing trait provides all the functionality for receiving cloud events:

trait CloudEventProcessing extends Directives {

 def route(eventpath : String = "") : Route =
   path(eventpath) {
     post {
       extractRequest { request =>
         request.headers.foreach(header => {
           println(s"Header ${header.name()} - ${header.value()}")
         })
         entity(as[Array[Byte]]) { entity ⇒
           val event = CloudEvent("", null, "", "", None, None, None, None, None, None, None)
           var extensions : Map[String, Any] = Map()
           request.headers.foreach { header => {
             header.name() match {
               // Attributes
               case name if name == "ce-id" => event.id = header.value()
               case name if name == "ce-source" => event.source = URI.create(header.value())
               case name if name == "ce-specversion" => event.specversion = header.value()
               case name if name == "ce-type" => event.`type` = header.value()
               case name if name == "ce-dataschema" => event.dataschema = Some(URI.create(header.value()))
               case name if name == "ce-subject" => event.subject = Some(header.value())
               case name if name == "ce-time" => event.time = Some(ZonedDateTime.parse(header.value()))
               // extensions
               case name if name.startsWith("ce-") && (name.contains("extension")) =>
                 val nend = name.indexOf("extension")
                 val exname = name.substring(3, nend)
                 extensions = extensions .+(exname -> header.value())
               // Data
               case name if name == "ce-datacontenttype" =>
                 if (header.value().contains("json") || header.value().contains("javascript") || header.value().contains("text"))
                   event.data = Some(new String(entity))
                 else
                   event.data_base64 = Some(entity)
               case _ =>
             }
           }
           if (extensions.size > 0)
             event.extensions = Some(extensions)
           if(event.datacontenttype == None)    // We did not get content type, default it to JSON event.data = Some(new String(entity))
           }
           processEvent(event)
           complete(StatusCodes.OK)
         }
       }
     }
   }

 def processEvent(event : CloudEvent) : Unit
} 

The most important method in this trait is route, which implements the processing of an incoming request. It processes all received headers to build a cloud event out of them. This trait is used by the CloudEventsReciever class which implements a HTTP receiver based on Akka HTTP server.

A custom event producer is an application that is sending events. The only requirement to this implementation is usage of a K_SINK system variable for determining where to send events (see below). This can be easily achieved using a typesafe config library. In order to simplify the producer implementation, I have added the toHttpRequest method to the CloudEvents class

def toHttpRequest(uri: String): HttpRequest = {
 var headers: scala.collection.immutable.Seq[HttpHeader] = scala.collection.immutable.Seq(
   // Mandatory fields
   RawHeader("ce-id", id),
   RawHeader("ce-source", source.toString),
   RawHeader("ce-specversion", specversion),
   RawHeader("ce-type", `type`))
 // OPtional fields
 datacontenttype match {
   case Some(c) => headers = headers :+ RawHeader("ce-datacontenttype", c)
   case _ => headers = headers :+ RawHeader("ce-datacontenttype", "application/json")
 }

 dataschema match {
   case Some(d) => headers = headers :+ RawHeader("ce-dataschema", d.toString)
   case _ =>
 }
 subject match {
   case Some(s) => headers = headers :+ RawHeader("ce-subject", s)
   case _ =>
 }
 time match {
   case Some(t) => headers = headers :+ RawHeader("ce-time", t.toString)
   case _ =>
 }
 extensions match {
   case Some(e) =>
     for ((key, value) <- e)
       headers = headers :+ RawHeader(s"ce-${key}extension", value.toString)
   case _ =>
 }

 // Entity
 val entity = datacontenttype match {
   case Some(dtype) =>
     if (dtype.contains("json") || dtype.contains("javascript") || dtype.contains("text")) {
       HttpEntity(ContentTypes.`application/json`, data.getOrElse(""))
     } else {
       HttpEntity(ContentTypes.`application/octet-stream`, data_base64.getOrElse(Array[Byte]())
       )
     }
   case _ => HttpEntity(ContentTypes.`application/json`, "")
 }
 HttpRequest(
   method = HttpMethods.POST,
   uri = uri,
   entity = entity,
   headers = headers
 )
}

This method builds headers and data entities based on the content of the cloud event. With this method in place the implementation of CloudEventsSender is fairly straightforward.

Events delivery methods

Now that we have an implementation of both the cloud events sender and receiver, let’s take a look at different usage patterns for Knative Eventing:

  • Source to Sink
  • Channel and Subscription
  • Brokers and triggers

Source to Sink

In this case, the source sends a message directly to a sink, and there is no queuing or filtering. It is a one-to-one relationship with no delivery guarantees at all:

There are two options to connect source to sink - direct binding and using sinkbinding.

In order to directly connect producers to consumers, we first start a consumer Knative service using the following yaml file, and then start the consumer using this yaml file. Here a consumer is created as a ContainerSource, which allows us to add a sink, which points to the consumer service:

…………………………………..
sink:
 ref:
   apiVersion: serving.knative.dev/v1
   kind: Service
   name: eventsreciever

This reference generates a K-SINK system variable that is used as an address to send events. When a sinkbinding event source deployment (a standard Kubernetes deployment) is unaware of the event consumer deployment (also a standard Kubernetes deployment), the sinkbinding links the two together:

apiVersion: sources.knative.dev/v1alpha1
kind: SinkBinding
metadata:
 name: bind-cloud-events
spec:
 subject:
   apiVersion: apps/v1
   kind: Deployment
   selector:
     matchLabels:
       app: cloud-events-source
 sink:
   ref:
     apiVersion: serving.knative.dev/v1
     kind: Service
     name: eventsreciever

Note that sink binding is using the same sink definition as a Knative event source and works the same way (i.e., sets the K-SINK system variable in the event’s source).

Using Kafka as a source

In addition to a custom source (shown above), Knative also provides several pre-built sources including Sugar Controller, Github Source, Apache Camel-K Source, Apache Kafka Source, GCP Sources, Apache CouchDB Source, and VMware Sources and Bindings1. Usage of these pre-built sources allows for a seamless integration with existing Knative event consumers. Here, following this example, we will demonstrate a direct connection between Kafka and an event listener (HTTP). We will use the same event listener as above (deployed using this yaml file). We will start by creating the topic used for events and then using it in the Kafka event source, which links the Kafka topic with the deployed service. Note that we are using the same sink as above:

…………………………………..
sink:
 ref:
   apiVersion: serving.knative.dev/v1
   kind: Service
   name: eventsreciever

With everything in place, we can use an approach as described here to make sure that everything works correctly.

In this example we were sending Kafka messages “manually”, but it is also possible to use Alpakka Kafka to publish to Kafka programmatically.

With Kafka source you can also use sinkbinding. Check out an example here.

Channel and Subscription

A Knative Channel is a custom resource that can persist and forward events to multiple destinations (via subscriptions). There are multiple channel implementations, such as InMemoryChannel and KafkaChannel. In this case an event producer writes to a channel and a subscription connects a channel to an event consumer. Each sink service needs its own subscription to a channel. This introduces an intermediary (channel) between event producers and consumers, which makes deployment simpler and more flexible.

Some of the characteristics of channel-based delivery are:

  • Persistence: only in case of using a persistent channel, for example Kafka, .
  • No Ordering Guarantee: There is nothing enforcing an ordering, so two messages that arrive at the same time may go to subscribers in any order. Different downstream subscribers may see different orders.
  • No Redelivery Attempts: When a subscriber rejects a message, there are no attempts to retry sending it.
  • Dead Letter Sink: When a subscriber rejects a message, this message is sent to the dead letter sink (if present), otherwise it is dropped.

To implement channel-based delivery, we will start by creating a channel. Here we are using the InMemoryChannel, which is “a best effort Channel.” InMemoryChannel is probably not appropriate for production, but it is very convenient for development. Now you can create an event receiver and subscription, connecting the receiver to the channel. Finally, we create events publisher, publishing events to the channel

…………………………………...
sink:
 ref:
   apiVersion: messaging.knative.dev/v1beta1
   kind: InMemoryChannel
   name: channel

If you need a persistent channel, you can use the Apache Kafka channel (example here).

Brokers and triggers

Although channel-based delivery provides a higher flexibility compared to a direct source to sink connectivity, it still has some pitfalls:

  • It is hard to maintain multiple channels, subscriptions and replies.
  • It does not have the concept for filtering, so services have to filter all messages themselves.

Many of these shortcomings are alleviated in the broker and trigger delivery models. A broker combines channel, reply, and filter functionality into a single resource, while a trigger provides a mechanism for declarative filtering of all events:

A broker is a Knative custom resource that is composed of at least two distinct objects, an ingress and a filter. Events are sent to the broker ingress, and the filter strips all metadata from the event data that is not part of the CloudEvent. Brokers typically use Knative Channels to deliver the events.

Under the covers, trigger creates subscriptions. This means any service can reply back to an incoming event. A reply event routed back through a broker and services interested in that type of event receive the event. In addition, it allows filtering on specific events based on their CloudEvent attributes. But triggers have some limitations. Although they allow filtering on multiple attributes, these attributes are always ANDed. We cannot define a trigger that would filter on OR of several attributes (multiple triggers are required for this). Additionally, a trigger can only define a single subscriber (service).

In order to use this interaction style, first create a broker using this yaml file. Now you can start the events consumer (a simple kubernetes deployment with service in this case) and event producer (container source). Note here that sink is referencing a broker we just created.

………………………………………..
sink:
 ref:
   apiVersion: eventing.knative.dev/v1
   kind: Broker
   name: default

Finally, a trigger specifies event filtering information and connectivity to an event consumer.

…………………………….
spec:
 broker: default
 filter:
   attributes:
     source: https://com.lightbend.knative.eventing/CloudEventsSender
 subscriber:
   ref:
     apiVersion: v1
     kind: Service
     name: service-consumer

For information on using the Apache Kafka broker, please refer to an example here.

Events Registry

To help you discover all the events existing in your system, Knative introduces a simple Events registry, maintaining a catalog of the event types that can be consumed from different brokers. It also introduces a new EventType CRD in order to persist the event type’s information in the cluster’s data store. An Events registry supports two ways of populating data: manual and automatic.

In the case of manual registration, you create an event type yaml and deploy it to the cluster. Some of the Knative event sources, such as CronJobSource, ApiServerSource, GithubSource, GcpPubSubSource, KafkaSource and AwsSqsSource, support automatic registration of EventTypes.

Once the registry is populated, the event types can be used for the creation of triggers.

High-level eventing constructs

In addition to basic eventing mechanisms, described here, Knative eventing provides two high-level eventing constructs: Sequence and Parallel.

Sequence provides a way to define a list of (ordered) functions that will be invoked. Each function can modify, filter or create a new kind of an event. Sequence also creates Channels and Subscriptions under the hood. A definition of Sequence includes:

  • Steps - an ordered list of subscribers (addressable steps).
  • ChannelTemplate defining channels created between the steps.
  • Reply (Optional) referencing the results of the final step destination.

You can check out examples of Sequence usage here.

Visibility of the Sequence execution is provided through status containing:

  • ChannelStatuses showing the status of underlying channels created as part of this Sequence.
  • SubscriptionStatuses showing the status of underlying subscriptions created as part of this sequence.
  • AddressStatus showing the status of steps executed.

Parallel provides a way to define a list of branches, each receiving the same CloudEvent sent to the Parallel ingress channel. Typically, each branch consists of a filter function guarding the execution of the branch. Parallel creates Channels and Subscriptions under the hood. A definition of Parallel includes:

  • Branches defining the list of filter and subscriber (addressable steps) pairs, one per branch, and optionally a reply object.
  • ChannelTemplate defining channels created between the steps.
  • Reply (Optional) referencing the results of each branch destination.

You can check out examples of Parallel usage here.

Visibility of Parallel execution is provided through status containing:

  • ingressChannelStatus and branchesStatuses showing the status of underlying Channel and Subscription resources created as part of this Parallel.
  • AddressStatus showing the status of steps executed.

Usage of these high-level constructs allows you to build complex event processing implementations leveraging Knative without the introduction of additional tools.

Conclusion

In this post, I have explored both Knative serving and eventing, highlighting many provided interaction patterns. I have also shown how to use Scala and Akka for implementation of Knative Service, event consumers and custom event producers.

If you're developing cloud native applications, Akka Serverless can get you going quickly and easily by removing the complexity behind managing distributed state. Register your interest to participate in the Akka Serverless preview—and start building a new class of business applications.


1 Note that these additional sources are not installed by default and require separate installation, for example, Kafka source can be installed using the following command:
kubectl apply --filename https://github.com/knative/eventing-contrib/releases/download/v0.17.0/kafka-source.yaml

Share



Filter by Tag