Kubeflow Pipeline - 上传一个 Pipeline

时间:2022-07-22
本文章向大家介绍Kubeflow Pipeline - 上传一个 Pipeline,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

1 Overview

Pipeline 提供了几个内置的 Pipline…有点绕口,但是真正使用的时候,但是默认提供的几个 Pipeline 都要基于 GCP Google 的云平台,但是我们的目的是在自己的集群部署,自然是访问不到 GCP 的,所以根据官网,总结了一些构建 Pipeline 的流程。

2 理解 Pipeline

https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/

从官网文档了解,什么是 Pipeline,看看定义还是挺让人兴奋的。

首先,数据科学家本身就是在提数据,训练,保存模型,部署模型几个重要环节中工作,Pipeline 提供了一个很友好的 UI 来给数据科学家来定义整个过程,而且整个过程是运行在 K8S 集群上的。这对于一些对资源利用率有要求的公司,统一一层 K8S 来服务在线的应用和这些机器学习,还是很不错的。

通过定义这个 Pipeline,就可以定义环环相扣的机器学习 Workflow,市面是有很多类似的产品的,例如阿里云,腾讯云都有,但是都不全是基于 K8S 来做的。然后 Pipeline 也提供了相关的工具来定义这个 Pipeline,不过都是 Python 的,当然这个对于数据科学家来说,不会是什么问题。最后就是,Pipeline 在 Kubeflow 的生态内,结合 Notebook,数据科学家甚至都可以不用跳出去 Kubeflow 来做其他操作,一站式 e2e 的就搞定了。

2 Upload Pipeline

通过?这个 Link,学习一下如何构建自己的 Pipeine 并且上传。

https://www.kubeflow.org/docs/pipelines/tutorials/build-pipeline/

主要包括几个步骤。

  1. 安装专门的 SDK
  2. Python 定义好 Pipeline
  3. SDK 构建 pipeline 的包,最后通过 UI 上传

请理解?脚本每一步的含义。

# 1 下载官方的示例 python 代码来构建
git clone https://github.com/kubeflow/pipelines.git
# 2 实例代码在这里
cd pipelines/samples/core

分析一下这个例子的代码,留意一下里面的注释。

# 省略了 License
import kfp
from kfp import dsl

def random_num_op(low, high):
    """Generate a random number between low and high."""
    return dsl.ContainerOp(
        name='Generate random number',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; print(random.randint($0, $1))" | tee $2', str(low), str(high), '/tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def flip_coin_op():
    """Flip a coin and output heads or tails randomly."""
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = 'heads' if random.randint(0,1) == 0 '
                  'else 'tails'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )

def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )

@dsl.pipeline(
    # 这是你 Pipeline 的名字和描述
    # 为了和默认的 Condition 那个例子有所区别,这里更改了 name 和 description。
    name='test test name',
    description='just test'
)
def flipcoin_pipeline():
    # 这里面就是从上到下来设计这个 Pipeline 了,先做什么后做什么
    # 或者有条件限制的话就做跳转
    flip = flip_coin_op()
    # 如果 flip 这个值等于 heads 就走这个分支
    # 这是 SDK 定义的 API,想用好 Pipeline,除了简单的按顺序写方法
    # 还有像条件控制这样的魔法
    with dsl.Condition(flip.output == 'heads'):
        # 又是一个条件,就不赘述了
        random_num_head = random_num_op(0, 9)
        with dsl.Condition(random_num_head.output > 5):
            print_op('heads and %s > 5!' % random_num_head.output)
        with dsl.Condition(random_num_head.output <= 5):
            print_op('heads and %s <= 5!' % random_num_head.output)

    # 如果 flip 这个值等于 tails 就走这个分支
    with dsl.Condition(flip.output == 'tails'):
        random_num_tail = random_num_op(10, 19)
        with dsl.Condition(random_num_tail.output > 15):
            print_op('tails and %s > 15!' % random_num_tail.output)
        with dsl.Condition(random_num_tail.output <= 15):
            print_op('tails and %s <= 15!' % random_num_tail.output)

if __name__ == '__main__':
    # 最后就是保存这个 Pipeline 了
    kfp.compiler.Compiler().compile(flipcoin_pipeline, __file__ + '.zip')

了解了如何通过 Pipeline 提供的 SDK 来构建工作流之后,还需要通过 pip 来下载一些工具,方便直接转换你写的 pipeline 文件。假设你已经有 Python3 环境了那么就装包就行了。

https://www.kubeflow.org/docs/pipelines/sdk/install-sdk/#install-the-kubeflow-pipelines-sdk

# 1 安装
pip install https://storage.googleapis.com/ml-pipeline/release/0.1.20/kfp.tar.gz --upgrade
# 2 检查一下安装成功了没
which dsl-compile

确定一下,所在的目录,然后就可以搞起来了。

然后就是上传了。

3 Summary

如果有需要深度使用 Pipeline 的同学,建议看看其 SDK。本质上,构建出来的 Pipeline 文件是一个 基于 Argo 的一个定义 Workflow 的 YAML 文件。