New Name - Cloudflow is now Akka Data Pipelines

 

Does your organization need to enhance developer productivity when building analytic processes and pipelines? Contact us and explore how our experts can help.

Today’s customers expect to receive information in real time. In order to meet these expectations, many companies are now moving their data systems from batch to stream processing. In this post, we’ll discuss the role of model serving in building a real-time streaming application and reveal how to implement and deploy machine learning (ML) models with open-source frameworks Cloudflow and Seldon.

The Github project for this post can be found and cloned here.

Introduction

Examples of stream processing applications gaining popularity in the industry today include, but are not limited to:

  • Fraud detection: Correlate payment information with other historical data or known patterns to detect fraud before it happens. This typically needs very fast processing as you must decline a transaction before it is processing.
  • Cross-selling: Evaluate customer purchasing history to make context-specific, personal, customized offers or discounts before the customer leaves the store.
  • Predictive maintenance: Evaluate current operations to predict failure before it happens. This allows replacing parts before they break.

The key in all of these use cases is that you process data while it is in motion. You need to handle the event while it is occuring, not several hours after something has happened. Although such mission-critical, real-time applications have been built for years, usage of ML allows:

  • Building new innovative applications that are differentiated from competitors.
  • Applying ML to more “traditional scenarios” like fraud detection, cross selling, or predictive maintenance to enhance existing business processes and make better data-driven decisions.

The introduction of ML models also impacts the development cycle of streaming applications by adding model creation and management to the overall process, which includes the following steps:

  • Build: Use ML algorithms to find insights based on the historical data. This step includes data collection, cleansing, preparation, and building models.
  • Validate: Use different techniques to ensure that a created model works in the production requirements.
  • Operate: Deploy the created model in production to process new incoming events in real time.
  • Monitor: Watch the outcomes of the applied model. The two most important monitoring activities here are concept drift monitoring to make sure that the model still behaves correctly and model explainability—figuring out why a certain decision was made by a model to ensure trust in the model’s behavior.
  • Continuous Loop: Improve the model by rebuilding the model based on new data. This can be done on schedule (for example, weekly) or based on the results of concept drift monitoring.

Model Deployment Strategies

Once the model is created, there are a few alternatives to share and update models between data scientists and the teams productizing its usage:

  • Native Model: Directly deploy a model to the stream processing engine. This typically means exporting the model (typically as Python code) and using it directly for stream processing.
  • Model as data: A ML model consists of a given algorithm, weights, and hyper parameters. Weights and parameters are data, not code, while the algorithm of any particular model is a reference to a static library. From this perspective, treating models as data is a more natural fit than creating packaging semantics around weights and hyper parameters. Examples of model as data includes such approaches as:
    • Predictive Model Markup Language (PMML): An older XML standard with several limitations and drawbacks, but supported in some analytics tools.
    • Portable Format for Analytics (PFA): A modern standard, including preprocessing in addition to the model. It is not yet supported out-of-the-box in most analytics tools.
    • TensorFlow model represented as a computational graph of Tensors.
    • H2O Mojo export represents the H2O model in a specialized H2O format.

Once the model is exported, there are two basic approaches to deploy it:

  • Embedded model: In this approach, the model runs directly in the streaming application. This is mostly applicable for models as data approaches model export and is typically based on Java/Scala APIs for a specific export format.
  • External Server: Packaging of the exported model into a specialized server that allows inference of the stream data using HTTP or gRPC. There are two types of such model servers:
    • Basic servers implemented for serving a specific type of the model—for example TF Serving—working for any type of exported TF model.
    • Compound servers allowing you to serve graphs of deployed models (in different formats)—for example Seldon—along with evaluation of concept drift and support for a model’s explainability.

Implementing Stream Processing

There are multiple ways of implementing stream processing, including model serving. Virtually any serving framework or library can be used for such an implementation. An even better approach is to leverage a specialized streaming framework called Cloudflow, which enables you to quickly develop, orchestrate, and operate distributed streaming applications on Kubernetes. Cloudflow allows you to build streaming applications as a set of small, composable components wired together with schema-based contracts. This approach can significantly improve reuse and dramatically accelerate streaming application development. Cloudflow supports:

  • Development: By generating a lot of boilerplate code, it allows developers to focus on business logic.
  • Build: It provides all the tooling for going from business logic to a deployable Docker image.
  • Deploy: It provides Kubernetes tooling to deploy your distributed application with a single command.
  • Operate: Additional commercial add-on provides all the tools you need to get insights, observability, and lifecycle management for your distributed streaming application.

Model Serving with Seldon

Seldon Core is a ML Deployment framework that allows data scientists to convert their ML code or artifacts into full-fledged microservices through flexible and extensible inference graphs, which can be scaled to thousands of production models. The core components of Seldon include:

  • Prepackaged model servers: Optimised docker containers for popular libraries such as TensorFlow, XGBoost, and H2O which can load model artifacts/binaries and serve them as Seldon Deployed model microservices.
  • Language wrappers: Tools to enable more custom ML models to be wrapped using a set of CLIs which allow data scientists to convert a Python File or a Java Jar into a fully fledged microservice.
  • Inference graph: With Seldon, it’s possible to containerize multiple model artifacts into separate re-usable inference containers, which can be linked using the Seldon inference graph. This allows Seldon users to build inference pipelines that can consist of models, transformers, combiners, and routers.
  • Standardized API: Every model that is deployed comes with an out-of-the-box API which can be REST or gRPC.
  • Out-of-the-box observability: Each Seldon Deployment comes with monitoring metrics and auditable request logs through a standardised format that allows for consistency of monitoring during the scaling of deployed models.
  • Advanced ML insights: Seldon abstracts complex ML concepts such as Explainers, Outlier Detectors and Adversarial Attack Detectors into infrastructural components that can be extended by the Seldon users to leverage model-specific or reusable techniques.
Figure 1. Seldon inference graph example.

The lifecycle of Seldon begins once your model has been trained (whether it’s offline or online/continously being re-trained). The steps you would carry out when productionizing your ML model with Seldon are:

  1. Design your inference graph:
    • In this step, you will decide what components you want your inference graph to consist of—whether it’s going to be a simple model, or whether you’ll add a model with a set of transformers, and potentially add explainers and outlier detectors.
  2. Containerize your processing steps: You can build each step of your inference graph with either of the two below options:
    • Use a pre-packaged model server to load your model artifacts/binaries and avoid the need to have to build a docker container every time your model changes (you can also build your own pre-packaged model server).
    • Use a language wrapper to containerize a file you can write to expose a high level interface to your model’s logic. This can be used for more complex cases, even use-cases that may require custom operating-system-specific or even external-system dependencies.
  3. Locally test your Seldon implementation:
    • Test your model locally using the seldon-core Python library, running it in docker locally as well as running it in a Kubernetes Development client such as KIND (Kubernetes in Docker) - you can follow this in the “Testing your SeldonDeployment section in the documentation
  4. Choose relevant Seldon Core extensions:
    • You can choose to set up seldon core extensions including Jaeger tracing integration, ELK Request Logging integration, Seldon Core Analytics integration, Istio/Ambassador Ingress integration, or any of the other Seldon Core integrations available.
  5. Deploy to your Kubernetes cluster:
    • In this step, you have to just define your Kubernetes configuration file that will allow you to deploy your model and send requests.
  6. Build a scalable CI pipeline:
    • The Seldon components allow you to integrate it without much complexity into your CI/CD workflows, which ensures that you can use your CI of choice to connect all your preferred model sources into Seldon Core for deployment.

In this example, we will be building a one-node inference graph, using the prepackaged TensorFlow server to containerize our model.

Implementing Streaming Applications

Overall implementation of the model serving in Cloudflow is based on a dynamically controlled stream pattern: 1

TensorFlow Serving Implementation

In the heart of the implementation is the model server abstract class:

abstract class TensorFlowBase(descriptor: ModelDescriptor) extends Serializable {

 protected val startTime = System.currentTimeMillis()

 /**
  * Actual scoring.
  * @param record - Record to serve
  * @return Either error or invocation result.
  */
 def invokeModel(record: RecommenderRecord): Either[String, RecommenderServingOutput]

 /**
  * Cleanup when a model is not used anymore
  */
 def cleanup(): Unit

 /**
  * Score a record with the model
  *
  * @param record - Record to serve
  * @return RecommenderResult, including the result, scoring metadata (possibly
  *  including an error string), and some scoring metadata.
  */
 def score(record: RecommenderRecord): RecommenderResult = {
   val start = System.currentTimeMillis()
   val (errors, modelOutput) = invokeModel(record) match {
     case Left(errors)  ⇒ (errors, RecommenderServingOutput(Seq.empty, Seq.empty))
     case Right(output) ⇒ ("", output)
   }
   val duration = (System.currentTimeMillis() - start)
   val resultMetadata = ModelResultMetadata(errors, descriptor.modelName, descriptor.modelSourceLocation, startTime, duration)
   RecommenderResult(record, modelOutput, resultMetadata)
 }
}

This base class can have different implementations, depending on whether we are using embedded models or a server. Here ModelDescriptor is a generic definition of the model defined as an Avro message2. With this class in place, a specific class—for example, using an internal TF model—can be implemented as follows:

class TensorFlowBundleModel(descriptor: ModelDescriptor, directory: String) extends TensorFlowBase(descriptor) {

 // get tags. We assume here that the first tag is the one we use
 private val tags: Seq[String] = getTags(directory)
 private val bundle = SavedModelBundle.load(directory, tags(0))
 private val graph = bundle.graph
 // get metatagraph and signature
 private val metaGraphDef = MetaGraphDef.parseFrom(bundle.metaGraphDef)
 private val signatureMap = metaGraphDef.getSignatureDefMap.asScala
 //  parse signature, so that we can use definitions (if necessary) programmatically in score method
 private val signatures = parseSignatures(signatureMap)
 // Create TensorFlow session
 private val session = bundle.session

 /**
  * Convert incoming wine record to Tensor.
  */
 private def toTensor(record: RecommenderRecord): Seq[Tensor[_]] = {
   val products = Tensor.create(record.products.map(p ⇒ Array(p.toFloat)).toArray)
   val users = Tensor.create(record.products.map(_ ⇒ Array(record.user.toFloat)).toArray)
   Seq(users, products)
 }

 /**
  * Usage of TensorFlow bundled model for Wine scoring.
  */
 override def invokeModel(record: RecommenderRecord): Either[String, RecommenderServingOutput] = {
   try {
     // Create record tensor
     val modelInput = toTensor(record)
     // Serve model using TensorFlow APIs
     val signature = signatures.head._2
     val tinputs = signature.inputs.map(inp ⇒ inp._2.name).toSeq
     val toutput = signature.outputs.head._2.name
     val result = session.runner
       .feed(tinputs(0), modelInput(0))
       .feed(tinputs(1), modelInput(1))
       .fetch(toutput).run().get(0)
     // process result
     val rshape = result.shape
     val rMatrix = Array.ofDim[Float](rshape(0).asInstanceOf[Int], rshape(1).asInstanceOf[Int])
     result.copyTo(rMatrix)
     val prediction = rMatrix.map(arrayV ⇒ arrayV(0).toDouble)
     val predictions = prediction.zip(record.products).map(r ⇒ (r._2.toString, r._1)).unzip
     Right(RecommenderServingOutput(predictions._1, predictions._2.toSeq))
   } catch {
     case t: Throwable ⇒ Left(t.getMessage)
   }
 }

 /**
  * Cleanup when a model is not used anymore
  */
 override def cleanup(): Unit = {
   try {
     session.close
   } catch {
     case t: Throwable ⇒
       println(s"WARNING: in TensorFlowBundleModel.cleanup(), call to session.close threw $t. Ignoring")
   }
   try {
     graph.close
   } catch {
     case t: Throwable ⇒
       println(s"WARNING: in TensorFlowBundleModel.cleanup(), call to graph.close threw $t. Ignoring")
   }
 }
……………………..
}

Here we are using TensorFlow Java APIs to load the model and use it for inference. The same class using TensorFlow serving Rest APIs can be implemented as follows:

class TensorFlowModelRPCServer(descriptor: ModelDescriptor) extends TensorFlowBase(descriptor) {

 private val gson = new Gson
 private val server = descriptor.modelSourceLocation(0)

 override def invokeModel(record: RecommenderRecord): Either[String, RecommenderServingOutput] = {

   // Build request json
   val products = record.products.map(p ⇒ Array(p.toDouble)).toArray
   val users = record.products.map(_ ⇒ Array(record.user.toDouble)).toArray
   val request = Request("", RequestInputs(products, users))

   try {
     val response = Http(server).postData(gson.toJson(request)).header("content-type", "application/json").asString
     response.code match {
       case code if code == 200 ⇒ // Got successful response
         val prediction = gson.fromJson(response.body,
 						classOf[RecommendationOutputs])
         val predictions = prediction.outputs.map(_(0))
           .zip(record.products).map(r ⇒ (r._2.toString, r._1)).unzip
         Right(RecommenderServingOutput(predictions._1, predictions._2))
       case _ ⇒ // Got error response
         Left(response.body)
     }
   } catch {
     case t: Throwable ⇒
       println(s"Error accessing HTTP server $server")
       t.printStackTrace()
       Left(t.getMessage)
   }
 }

 /**
  * Cleanup when a model is not used anymore. Nothing to do in this case
  */
 override def cleanup(): Unit = {}
}

Finally, for implementing gRPC we followed the steps outlined in this article to pick the required protobuf definitions:

class TensorFlowModelRPCServer(descriptor: ModelDescriptor) extends TensorFlowBase(descriptor) {

 import SeldonTFGRPCExecutor._

 // Headers
 val headers = new Metadata()
 headers.put(SELDON_KEY, "grpc-tfserving")
 headers.put(NAMESPACE_KEY, "seldon")

 // create a stub
 val channelbuilder = ManagedChannelBuilder.forAddress(host, port)
 channelbuilder.usePlaintext()
 val channel = channelbuilder.build()
 val server = PredictionServiceGrpc.blockingStub(channel)
 val serverWithHeaders = MetadataUtils.attachHeaders(server, headers)

 // Model spec
 val model = ModelSpec(name = modelName, signatureName = "serving_default")

 /**
  * Actual scoring.
  *
  * @param record - Record to serve
  * @return Either error or invocation result.
  */
 override def invokeModel(record: RecommenderRecord): Either[String, RecommenderServingOutput] = {
   // Build products and users proto
   val tensorshape = Some(TensorShapeProto(Seq(TensorShapeProto.Dim(record.products.size.toLong), TensorShapeProto.Dim(1l))))
   val productProto = TensorProto(dtype = DataType.DT_FLOAT, tensorShape = tensorshape, floatVal = record.products.map(_.toFloat))
   val userProto = TensorProto(dtype = DataType.DT_FLOAT, tensorShape = tensorshape, floatVal = record.products.map(_ ⇒ record.user.toFloat))

   // Create request
   val request = PredictRequest(modelSpec = Some(model), inputs = Map("products" -> productProto, "users" -> userProto))
   try {
     val response = serverWithHeaders.predict(request)
     //    println(s"Response ${response.toString}")
     val probabilities = response.outputs.get("predictions").get.floatVal
     val predictions = probabilities.map(_.toDouble)
       .zip(record.products).map(r ⇒ (r._2.toString, r._1)).unzip
     Right(RecommenderServingOutput(predictions._1, predictions._2))
   } catch {
     case t: Throwable ⇒ Left(t.getMessage)
   }
 }

 /**
  * Cleanup when a model is not used anymore
  */
 override def cleanup(): Unit = {
   val _ = channel.shutdown()
 }
}

Building a Model Serving Streamlet

Once we have all the executors in place, the implementation of a dynamically controlled streams pattern requires a state that keeps the model (in our case executor). Because an overall implementation is based on Akka Streams, the simplest way to implement such a state is by using Akka Actors.

abstract class BaseModelServingActor(label: String) extends Actor {

 // Log
 val log = Logging(context.system, this)
 log.info(s"Creating ModelServingActor for $label")

 // Current model
 protected var currentModelState: Option[ModelState] = None
 protected var scorer: Option[TensorFlowBase] = None

 // Current execution state
 protected var currentExecutionState: Option[ModelToServeStats] = None

 // Restore state (if exists)
 override def preStart {
	………………………...
 }

 // Recieve message
 override def receive: PartialFunction[Any, Unit] = {
   case modelState: ModelWithState ⇒ // Model update
     // update model
     currentModelState = Some(modelState.state)
     // Cleanup current model
     scorer.map(s ⇒ s.cleanup())
     scorer = createScorer(modelState.descriptor, modelState.state)
     // Persist it
     StatePersistence.saveState(label, currentModelState.get)
     // Update execution state
     currentExecutionState = Some(ModelToServeStats(modelState))
     // Reply
     sender() ! Done

   case record: RecommenderRecord ⇒ // Serve data
     // Check whether we have a scorer
     scorer match {
       case Some(server) ⇒
         val result = server.score(record)
         currentExecutionState.get.incrementUsage(result.modelResultMetadata.duration)
         sender() ! result
       case _ ⇒ // Model server is not defined
         sender() ! RecommenderResult(
           record,
           RecommenderServingOutput(Seq.empty, Seq.empty), ModelResultMetadata("No model server defined"))
     }
   case _: GetState ⇒ // State query
     sender() ! currentExecutionState.getOrElse(ModelToServeStats())
   case unknown ⇒ // Unknown message
     log.error(s"ModelServingActor: Unknown actor message received: $unknown")
 }

 // Abstract methods to be implemented
 def createScorer(modelDescriptor: ModelDescriptor, state: ModelState): Option[TensorFlowBase]
}

With the actor in place, the streamlet implementation looks like the following (we are using an Akka Streams-based implementation):

abstract class ModelProcessingStreamletBase extends AkkaStreamlet {

val dtype = "recommender"
val in0 = AvroInlet[RecommenderRecord]("in-0")
val in1 = AvroInlet[ModelDescriptor]("in-1")
val out = AvroOutlet[RecommenderResult]("out", _.inputRecord.user.toString)
final override val shape = StreamletShape.withInlets(in0, in1).withOutlets(out)

 // Declare the volume mount
 private val persistentDataMount = VolumeMount("persistence-data-mount", "/data", ReadWriteMany)
 override def volumeMounts: Vector[VolumeMount] = Vector(persistentDataMount)

 // Create logic
 override final def createLogic = new RunnableGraphStreamletLogic() {

   implicit val askTimeout: Timeout = Timeout(30.seconds)

   // Model serving actor
   val modelserver: ActorRef = createModelServingActor(system)

def runnableGraph() = {
 // Set persistence
 FilePersistence.setGlobalMountPoint(getMountedPath(persistentDataMount).toString)
 FilePersistence.setStreamletName(streamletRef)

 sourceWithOffsetContext(in1).via(modelFlow).runWith(sinkWithOffsetContext)
 sourceWithOffsetContext(in0).via(dataFlow).to(sinkWithOffsetContext(out))
}

protected def dataFlow =
 FlowWithOffsetContext[RecommenderRecord].mapAsync(1) { record ⇒
   modelserver.ask(record).mapTo[Model.ModelReturn[ModelKeyDoubleValueArrayResult]]
     .map { modelReturn ⇒
       RecommenderResult(record, modelReturn.modelOutput, modelReturn.modelResultMetadata)
     }
 }

protected def modelFlow =
 FlowWithOffsetContext[ModelDescriptor].mapAsync(1) {
   descriptor ⇒ modelserver.ask(descriptor).mapTo[Done]
 }

 def createModelServingActor(system: ActorSystem): ActorRef
 def descriptorToState(descriptor: ModelDescriptor): ModelWithState
 def getModelTopic(): String
}

In addition to this one, we also implemented three additional streamlets: one for generating data load, a second one for generating model updates, and a third for printing the results into the terminal.

Putting This All Together

Once we have all the streamlets in place, we need to define connectivity between them. Cloudflow is leveraging an HOCON configuration format. An example of pipeline definition looks like the following:

blueprint {
 streamlets {
   recommender-data-ingress = ….
   recommender-model-data-ingress = ….
   console-egress = ….
   model-serving = …..
 }
 connections {
   recommender-data-ingress.out = [model-serving.in-0]
   recommender-model-data-ingress.out = [model-serving.in-1]
   model-serving.out = [console-egress.in]
 }
}

This configuration contains two main parts:

  • Streamlets - defining an implementation class for streamlets.
  • Connections - defining how streamlets are connected to each other.

Once the pipeline definition is complete, we can build a complete application and test it locally.

Once it works correctly we can deploy it to the cluster and monitor an execution3:

Deploying and Scaling Model Serving with Seldon

Now that we have a Cloudflow implementation in place, let’s take a look at overall deployment for this implementation. As a minimum, we need Seldon’s model server and a Cloudflow-based stream processor. In reality, as we want to use gRPC to invoke the Seldon model server, we also need to properly load balance gRPC requests. As explained in this document, there are two main approaches to gRPC load balancing:

  1. Client load balancing. In this case the client’s implementation is responsible for all load balancing solutions. One of the drawbacks of this approach is writing and maintaining the load balancing policies in multiple languages and/or versions of the clients.
  2. Proxy load balancing. In this case the implementation of the load balancing is done by proxy, invoked by the client. Proxies typically require more resources to operate since they have temporary copies of the RPC request and response. This model also increases latency to the RPCs.

As described above, Seldon provides Istio/Ambassador Ingress integration, so for our implementation we decided to use Ambassador for load balancing gRPC requests. An additional advantage of using Ambassador here is the ability to update model deployment without any changes to the Cloudlow implementation. The final deployment architecture is shown below:

This deployment includes three layers:

  • Cloudflow-based stream processing invoking model serving for every element of the stream.
  • A Seldon model server doing the actual model inference.
  • Ambassador providing load balancing for inference gRPC requests.

Because every layer in this architecture can scale independently, such an architecture can provide quite a scalable model serving solution.

We did initial measurements of model serving duration from inside the model serving streamlet, which turns out to be around 5-6 ms, on average. Without scaling components, this latency will provide a throughput of roughly 170 - 180 inferences per second. But by appropriately scaling all layers, we managed to achieve 10-fold throughput—or 2,000 messages processing per second.

Conclusion

Different model serving approaches have their strengths and weaknesses. The figure below provides a base comparison:

As the complexity of implementation (adding more components) grows, the performance goes down, while functionality and capabilities increase.

Cloudflow allows you to implement any of these approaches, but you have to decide what is more important: Raw performance or all the additional functionality that you can leverage with Seldon and Ambassador.

If you’d like guidance on how your company can use ML models to deliver scalable insights at speed, please contact us and explore how our experts can help.


1 Note that a dynamically controlled streams pattern is only required in the case embedded model or basic model server, when the application has to track a model or endpoint change. In the case of a compound server—Seldon or KF serving—the URL does not change when the model is updated and the pattern is not required.

2 Remember that Cloudflow is strongly typed and is currently using Avro schemas to define inputs/outputs.

3 Monitoring is currently only available in the enterprise version of Cloudflow.

Share



Comments


Filter by Tag