New Name - Cloudflow is now Akka Data Pipelines
Does your organization need to enhance developer productivity when building analytic processes and pipelines? Contact us and explore how our experts can help.
Today’s customers expect to receive information in real time. In order to meet these expectations, many companies are now moving their data systems from batch to stream processing. In this post, we’ll discuss the role of model serving in building a real-time streaming application and reveal how to implement and deploy machine learning (ML) models with open-source frameworks Cloudflow and Seldon.
The Github project for this post can be found and cloned here.
Examples of stream processing applications gaining popularity in the industry today include, but are not limited to:
The key in all of these use cases is that you process data while it is in motion. You need to handle the event while it is occuring, not several hours after something has happened. Although such mission-critical, real-time applications have been built for years, usage of ML allows:
The introduction of ML models also impacts the development cycle of streaming applications by adding model creation and management to the overall process, which includes the following steps:
Once the model is created, there are a few alternatives to share and update models between data scientists and the teams productizing its usage:
Once the model is exported, there are two basic approaches to deploy it:
There are multiple ways of implementing stream processing, including model serving. Virtually any serving framework or library can be used for such an implementation. An even better approach is to leverage a specialized streaming framework called Cloudflow, which enables you to quickly develop, orchestrate, and operate distributed streaming applications on Kubernetes. Cloudflow allows you to build streaming applications as a set of small, composable components wired together with schema-based contracts. This approach can significantly improve reuse and dramatically accelerate streaming application development. Cloudflow supports:
Seldon Core is a ML Deployment framework that allows data scientists to convert their ML code or artifacts into full-fledged microservices through flexible and extensible inference graphs, which can be scaled to thousands of production models. The core components of Seldon include:
The lifecycle of Seldon begins once your model has been trained (whether it’s offline or online/continously being re-trained). The steps you would carry out when productionizing your ML model with Seldon are:
In this example, we will be building a one-node inference graph, using the prepackaged TensorFlow server to containerize our model.
Overall implementation of the model serving in Cloudflow is based on a dynamically controlled stream pattern: 1
In the heart of the implementation is the model server abstract class:
abstract class TensorFlowBase(descriptor: ModelDescriptor) extends Serializable {
protected val startTime = System.currentTimeMillis()
/**
* Actual scoring.
* @param record - Record to serve
* @return Either error or invocation result.
*/
def invokeModel(record: RecommenderRecord): Either[String, RecommenderServingOutput]
/**
* Cleanup when a model is not used anymore
*/
def cleanup(): Unit
/**
* Score a record with the model
*
* @param record - Record to serve
* @return RecommenderResult, including the result, scoring metadata (possibly
* including an error string), and some scoring metadata.
*/
def score(record: RecommenderRecord): RecommenderResult = {
val start = System.currentTimeMillis()
val (errors, modelOutput) = invokeModel(record) match {
case Left(errors) ⇒ (errors, RecommenderServingOutput(Seq.empty, Seq.empty))
case Right(output) ⇒ ("", output)
}
val duration = (System.currentTimeMillis() - start)
val resultMetadata = ModelResultMetadata(errors, descriptor.modelName, descriptor.modelSourceLocation, startTime, duration)
RecommenderResult(record, modelOutput, resultMetadata)
}
}
This base class can have different implementations, depending on whether we are using embedded models or a server. Here ModelDescriptor
is a generic definition of the model defined as an Avro message2. With this class in place, a specific class—for example, using an internal TF model—can be implemented as follows:
class TensorFlowBundleModel(descriptor: ModelDescriptor, directory: String) extends TensorFlowBase(descriptor) {
// get tags. We assume here that the first tag is the one we use
private val tags: Seq[String] = getTags(directory)
private val bundle = SavedModelBundle.load(directory, tags(0))
private val graph = bundle.graph
// get metatagraph and signature
private val metaGraphDef = MetaGraphDef.parseFrom(bundle.metaGraphDef)
private val signatureMap = metaGraphDef.getSignatureDefMap.asScala
// parse signature, so that we can use definitions (if necessary) programmatically in score method
private val signatures = parseSignatures(signatureMap)
// Create TensorFlow session
private val session = bundle.session
/**
* Convert incoming wine record to Tensor.
*/
private def toTensor(record: RecommenderRecord): Seq[Tensor[_]] = {
val products = Tensor.create(record.products.map(p ⇒ Array(p.toFloat)).toArray)
val users = Tensor.create(record.products.map(_ ⇒ Array(record.user.toFloat)).toArray)
Seq(users, products)
}
/**
* Usage of TensorFlow bundled model for Wine scoring.
*/
override def invokeModel(record: RecommenderRecord): Either[String, RecommenderServingOutput] = {
try {
// Create record tensor
val modelInput = toTensor(record)
// Serve model using TensorFlow APIs
val signature = signatures.head._2
val tinputs = signature.inputs.map(inp ⇒ inp._2.name).toSeq
val toutput = signature.outputs.head._2.name
val result = session.runner
.feed(tinputs(0), modelInput(0))
.feed(tinputs(1), modelInput(1))
.fetch(toutput).run().get(0)
// process result
val rshape = result.shape
val rMatrix = Array.ofDim[Float](rshape(0).asInstanceOf[Int], rshape(1).asInstanceOf[Int])
result.copyTo(rMatrix)
val prediction = rMatrix.map(arrayV ⇒ arrayV(0).toDouble)
val predictions = prediction.zip(record.products).map(r ⇒ (r._2.toString, r._1)).unzip
Right(RecommenderServingOutput(predictions._1, predictions._2.toSeq))
} catch {
case t: Throwable ⇒ Left(t.getMessage)
}
}
/**
* Cleanup when a model is not used anymore
*/
override def cleanup(): Unit = {
try {
session.close
} catch {
case t: Throwable ⇒
println(s"WARNING: in TensorFlowBundleModel.cleanup(), call to session.close threw $t. Ignoring")
}
try {
graph.close
} catch {
case t: Throwable ⇒
println(s"WARNING: in TensorFlowBundleModel.cleanup(), call to graph.close threw $t. Ignoring")
}
}
……………………..
}
Here we are using TensorFlow Java APIs to load the model and use it for inference. The same class using TensorFlow serving Rest APIs can be implemented as follows:
class TensorFlowModelRPCServer(descriptor: ModelDescriptor) extends TensorFlowBase(descriptor) {
private val gson = new Gson
private val server = descriptor.modelSourceLocation(0)
override def invokeModel(record: RecommenderRecord): Either[String, RecommenderServingOutput] = {
// Build request json
val products = record.products.map(p ⇒ Array(p.toDouble)).toArray
val users = record.products.map(_ ⇒ Array(record.user.toDouble)).toArray
val request = Request("", RequestInputs(products, users))
try {
val response = Http(server).postData(gson.toJson(request)).header("content-type", "application/json").asString
response.code match {
case code if code == 200 ⇒ // Got successful response
val prediction = gson.fromJson(response.body,
classOf[RecommendationOutputs])
val predictions = prediction.outputs.map(_(0))
.zip(record.products).map(r ⇒ (r._2.toString, r._1)).unzip
Right(RecommenderServingOutput(predictions._1, predictions._2))
case _ ⇒ // Got error response
Left(response.body)
}
} catch {
case t: Throwable ⇒
println(s"Error accessing HTTP server $server")
t.printStackTrace()
Left(t.getMessage)
}
}
/**
* Cleanup when a model is not used anymore. Nothing to do in this case
*/
override def cleanup(): Unit = {}
}
Finally, for implementing gRPC we followed the steps outlined in this article to pick the required protobuf definitions:
class TensorFlowModelRPCServer(descriptor: ModelDescriptor) extends TensorFlowBase(descriptor) {
import SeldonTFGRPCExecutor._
// Headers
val headers = new Metadata()
headers.put(SELDON_KEY, "grpc-tfserving")
headers.put(NAMESPACE_KEY, "seldon")
// create a stub
val channelbuilder = ManagedChannelBuilder.forAddress(host, port)
channelbuilder.usePlaintext()
val channel = channelbuilder.build()
val server = PredictionServiceGrpc.blockingStub(channel)
val serverWithHeaders = MetadataUtils.attachHeaders(server, headers)
// Model spec
val model = ModelSpec(name = modelName, signatureName = "serving_default")
/**
* Actual scoring.
*
* @param record - Record to serve
* @return Either error or invocation result.
*/
override def invokeModel(record: RecommenderRecord): Either[String, RecommenderServingOutput] = {
// Build products and users proto
val tensorshape = Some(TensorShapeProto(Seq(TensorShapeProto.Dim(record.products.size.toLong), TensorShapeProto.Dim(1l))))
val productProto = TensorProto(dtype = DataType.DT_FLOAT, tensorShape = tensorshape, floatVal = record.products.map(_.toFloat))
val userProto = TensorProto(dtype = DataType.DT_FLOAT, tensorShape = tensorshape, floatVal = record.products.map(_ ⇒ record.user.toFloat))
// Create request
val request = PredictRequest(modelSpec = Some(model), inputs = Map("products" -> productProto, "users" -> userProto))
try {
val response = serverWithHeaders.predict(request)
// println(s"Response ${response.toString}")
val probabilities = response.outputs.get("predictions").get.floatVal
val predictions = probabilities.map(_.toDouble)
.zip(record.products).map(r ⇒ (r._2.toString, r._1)).unzip
Right(RecommenderServingOutput(predictions._1, predictions._2))
} catch {
case t: Throwable ⇒ Left(t.getMessage)
}
}
/**
* Cleanup when a model is not used anymore
*/
override def cleanup(): Unit = {
val _ = channel.shutdown()
}
}
Once we have all the executors in place, the implementation of a dynamically controlled streams pattern requires a state that keeps the model (in our case executor). Because an overall implementation is based on Akka Streams, the simplest way to implement such a state is by using Akka Actors.
abstract class BaseModelServingActor(label: String) extends Actor {
// Log
val log = Logging(context.system, this)
log.info(s"Creating ModelServingActor for $label")
// Current model
protected var currentModelState: Option[ModelState] = None
protected var scorer: Option[TensorFlowBase] = None
// Current execution state
protected var currentExecutionState: Option[ModelToServeStats] = None
// Restore state (if exists)
override def preStart {
………………………...
}
// Recieve message
override def receive: PartialFunction[Any, Unit] = {
case modelState: ModelWithState ⇒ // Model update
// update model
currentModelState = Some(modelState.state)
// Cleanup current model
scorer.map(s ⇒ s.cleanup())
scorer = createScorer(modelState.descriptor, modelState.state)
// Persist it
StatePersistence.saveState(label, currentModelState.get)
// Update execution state
currentExecutionState = Some(ModelToServeStats(modelState))
// Reply
sender() ! Done
case record: RecommenderRecord ⇒ // Serve data
// Check whether we have a scorer
scorer match {
case Some(server) ⇒
val result = server.score(record)
currentExecutionState.get.incrementUsage(result.modelResultMetadata.duration)
sender() ! result
case _ ⇒ // Model server is not defined
sender() ! RecommenderResult(
record,
RecommenderServingOutput(Seq.empty, Seq.empty), ModelResultMetadata("No model server defined"))
}
case _: GetState ⇒ // State query
sender() ! currentExecutionState.getOrElse(ModelToServeStats())
case unknown ⇒ // Unknown message
log.error(s"ModelServingActor: Unknown actor message received: $unknown")
}
// Abstract methods to be implemented
def createScorer(modelDescriptor: ModelDescriptor, state: ModelState): Option[TensorFlowBase]
}
With the actor in place, the streamlet implementation looks like the following (we are using an Akka Streams-based implementation):
abstract class ModelProcessingStreamletBase extends AkkaStreamlet {
val dtype = "recommender"
val in0 = AvroInlet[RecommenderRecord]("in-0")
val in1 = AvroInlet[ModelDescriptor]("in-1")
val out = AvroOutlet[RecommenderResult]("out", _.inputRecord.user.toString)
final override val shape = StreamletShape.withInlets(in0, in1).withOutlets(out)
// Declare the volume mount
private val persistentDataMount = VolumeMount("persistence-data-mount", "/data", ReadWriteMany)
override def volumeMounts: Vector[VolumeMount] = Vector(persistentDataMount)
// Create logic
override final def createLogic = new RunnableGraphStreamletLogic() {
implicit val askTimeout: Timeout = Timeout(30.seconds)
// Model serving actor
val modelserver: ActorRef = createModelServingActor(system)
def runnableGraph() = {
// Set persistence
FilePersistence.setGlobalMountPoint(getMountedPath(persistentDataMount).toString)
FilePersistence.setStreamletName(streamletRef)
sourceWithOffsetContext(in1).via(modelFlow).runWith(sinkWithOffsetContext)
sourceWithOffsetContext(in0).via(dataFlow).to(sinkWithOffsetContext(out))
}
protected def dataFlow =
FlowWithOffsetContext[RecommenderRecord].mapAsync(1) { record ⇒
modelserver.ask(record).mapTo[Model.ModelReturn[ModelKeyDoubleValueArrayResult]]
.map { modelReturn ⇒
RecommenderResult(record, modelReturn.modelOutput, modelReturn.modelResultMetadata)
}
}
protected def modelFlow =
FlowWithOffsetContext[ModelDescriptor].mapAsync(1) {
descriptor ⇒ modelserver.ask(descriptor).mapTo[Done]
}
def createModelServingActor(system: ActorSystem): ActorRef
def descriptorToState(descriptor: ModelDescriptor): ModelWithState
def getModelTopic(): String
}
In addition to this one, we also implemented three additional streamlets: one for generating data load, a second one for generating model updates, and a third for printing the results into the terminal.
Once we have all the streamlets in place, we need to define connectivity between them. Cloudflow is leveraging an HOCON configuration format. An example of pipeline definition looks like the following:
blueprint {
streamlets {
recommender-data-ingress = ….
recommender-model-data-ingress = ….
console-egress = ….
model-serving = …..
}
connections {
recommender-data-ingress.out = [model-serving.in-0]
recommender-model-data-ingress.out = [model-serving.in-1]
model-serving.out = [console-egress.in]
}
}
This configuration contains two main parts:
Once the pipeline definition is complete, we can build a complete application and test it locally.
Once it works correctly we can deploy it to the cluster and monitor an execution3:
Now that we have a Cloudflow implementation in place, let’s take a look at overall deployment for this implementation. As a minimum, we need Seldon’s model server and a Cloudflow-based stream processor. In reality, as we want to use gRPC to invoke the Seldon model server, we also need to properly load balance gRPC requests. As explained in this document, there are two main approaches to gRPC load balancing:
As described above, Seldon provides Istio/Ambassador Ingress integration, so for our implementation we decided to use Ambassador for load balancing gRPC requests. An additional advantage of using Ambassador here is the ability to update model deployment without any changes to the Cloudlow implementation. The final deployment architecture is shown below:
This deployment includes three layers:
Because every layer in this architecture can scale independently, such an architecture can provide quite a scalable model serving solution.
We did initial measurements of model serving duration from inside the model serving streamlet, which turns out to be around 5-6 ms, on average. Without scaling components, this latency will provide a throughput of roughly 170 - 180 inferences per second. But by appropriately scaling all layers, we managed to achieve 10-fold throughput—or 2,000 messages processing per second.
Different model serving approaches have their strengths and weaknesses. The figure below provides a base comparison:
As the complexity of implementation (adding more components) grows, the performance goes down, while functionality and capabilities increase.
Cloudflow allows you to implement any of these approaches, but you have to decide what is more important: Raw performance or all the additional functionality that you can leverage with Seldon and Ambassador.
If you’d like guidance on how your company can use ML models to deliver scalable insights at speed, please contact us and explore how our experts can help.
1 Note that a dynamically controlled streams pattern is only required in the case embedded model or basic model server, when the application has to track a model or endpoint change. In the case of a compound server—Seldon or KF serving—the URL does not change when the model is updated and the pattern is not required. ?
2 Remember that Cloudflow is strongly typed and is currently using Avro schemas to define inputs/outputs. ?
3 Monitoring is currently only available in the enterprise version of Cloudflow. ?