In Part 7 of “How To Deploy And Use Kubeflow On OpenShift”, we looked at model serving with Kubeflow. In this part, we now look at deploying Kubeflow pipelines.
Kubeflow ML pipelines is a set of tools designed to help you build and share models and ML workflows within your organization and across teams. The most important concepts used within the Kubeflow ML Pipelines service include:
The Kubeflow ML Pipelines platform consists of:
Although ML pipelines are based on Argo, it is only reusing the Argo CRD, while providing its own implementation of both the workflow controller and UI.
The biggest difference from the deployment point of view is the fact that the workflow controller is not running as default, but as a service account pipeline-runner. So in order to run ML pipelines, it is also necessary to run the following command:
$ oc adm policy add-scc-to-user privileged -nkubeflow -z pipeline-runner
Kubeflow comes with a few prepackaged pipelines1:
By clicking on a specific pipeline, you can get a view of the pipeline’s details:
You can additionally get a source file for a pipeline and run it.
After your pipeline runs, you can see the execution result leveraging the ML pipeline UI
Note, that because ML pipelines are using the Argo CRD, you can also see the result of the pipeline execution in the Argo UI:
There are multiple ways to build custom pipelines, see this project and blog posts here and here. For this post I will show how to build a custom pipeline using a notebook.
The following example is based on this pipeline sample. In a new Jupyter notebook, enter the following Python code:
# Install the SDK
EXPERIMENT_NAME = 'lightweight python components'
KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.3-rc.2/kfp.tar.gz'
!pip3 install $KFP_PACKAGE --upgrade
import kfp.components as comp
#Define a Python function and convert it to a pipeline operation
def add(a: float, b: float) -> float:
'''Calculates sum of two arguments'''
return a + b
add_op = comp.func_to_container_op(add)
#Advanced function
#Demonstrates imports, helper functions and multiple outputs
from typing import NamedTuple
def my_divmod(dividend: float, divisor:float) -> NamedTuple('MyDivmodOutput', [('quotient', float), ('remainder', float)]):
'''Divides two numbers and calculate the quotient and remainder'''
#Imports inside a component function:
import numpy as np
#This function demonstrates how to use nested functions inside a component function:
def divmod_helper(dividend, divisor):
return np.divmod(dividend, divisor)
(quotient, remainder) = divmod_helper(dividend, divisor)
from collections import namedtuple
divmod_output = namedtuple('MyDivmodOutput', ['quotient', 'remainder'])
return divmod_output(quotient, remainder)
#Test it
print(my_divmod(100, 7))
#and convert it to a pipeline operation
divmod_op = comp.func_to_container_op(my_divmod, base_image='tensorflow/tensorflow:1.11.0-py3')
#Define the pipeline
#Pipeline function has to be decorated with the @dsl.pipeline decorator and the parameters must have default values of type dsl.PipelineParam
import kfp.dsl as dsl
@dsl.pipeline(
name='Calculation pipeline',
description='A toy pipeline that performs arithmetic calculations.'
)
def calc_pipeline(
a=dsl.PipelineParam('a'),
b=dsl.PipelineParam('b', value='7'),
c=dsl.PipelineParam('c', value='17'),
):
#Passing pipeline parameter and a constant value as operation arguments
add_task = add_op(a, 4) #Returns a dsl.ContainerOp class instance.
#Passing a task output reference as operation arguments
#For an operation with a single return value, the output reference can be accessed using `task.output` or `task.outputs['output_name']` syntax
divmod_task = divmod_op(add_task.output, b)
#For an operation with a multiple return values, the output references can be accessed using `task.outputs['output_name']` syntax
result_task = add_op(divmod_task.outputs['quotient'], c)
#Compile the pipeline
pipeline_func = calc_pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)
#Submit the pipeline for execution
#Specify pipeline argument values
arguments = {'a': '7', 'b': '8'}
#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)
#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)
Running this notebook produces this output:
Experiment link here
Run link here
The links point to the UI for the pipeline run.
That’s all for this part. Check out the next post on Model Management, and thanks for reading!
p.s. If you’d like to get professional guidance on best-practices and how-tos with Machine Learning, simply contact us to learn how Lightbend can help.
1 Note that the first 5 samples are generic, while the last 2 will run only on GKE. If you try to run them on OpenShift, they will fail. ↩