From 11b6fd38d66b9cb8975e3f0631c49bc2c363332d Mon Sep 17 00:00:00 2001 From: FerdinandWard Date: Tue, 16 Aug 2022 11:08:04 +0800 Subject: [PATCH] add mxnet job --- job-template/job/mxnet/Dockerfile | 33 +++ job-template/job/mxnet/README.md | 0 job-template/job/mxnet/build.sh | 8 + job-template/job/mxnet/demo.yaml | 42 ++++ job-template/job/mxnet/launcher.py | 352 +++++++++++++++++++++++++++++ 5 files changed, 435 insertions(+) create mode 100644 job-template/job/mxnet/Dockerfile create mode 100644 job-template/job/mxnet/README.md create mode 100644 job-template/job/mxnet/build.sh create mode 100644 job-template/job/mxnet/demo.yaml create mode 100644 job-template/job/mxnet/launcher.py diff --git a/job-template/job/mxnet/Dockerfile b/job-template/job/mxnet/Dockerfile new file mode 100644 index 00000000..af2855a7 --- /dev/null +++ b/job-template/job/mxnet/Dockerfile @@ -0,0 +1,33 @@ +FROM ubuntu:18.04 +RUN apt-get update && apt-get -y install gcc g++ libjpeg-dev zlib1g-dev cmake + +# 安装运维工具 +RUN apt install -y --force-yes --no-install-recommends vim apt-transport-https gnupg2 ca-certificates-java rsync jq wget git dnsutils iputils-ping net-tools curl mysql-client locales zip +# 安装python +RUN apt install -y python3.6-dev python3-pip libsasl2-dev libpq-dev \ + && ln -s /usr/bin/python3 /usr/bin/python \ + && ln -s /usr/bin/pip3 /usr/bin/pip + +RUN wget https://github.com/stern/stern/releases/download/v1.21.0/stern_1.21.0_linux_amd64.tar.gz && tar -zxvf stern_1.21.0_linux_amd64.tar.gz && rm stern_1.21.0_linux_amd64.tar.gz && chmod +x stern && mv stern /usr/bin/stern + +# 安装中文 +RUN apt install -y --force-yes --no-install-recommends locales ttf-wqy-microhei ttf-wqy-zenhei xfonts-wqy && locale-gen zh_CN && locale-gen zh_CN.utf8 +ENV LANG zh_CN.UTF-8 +ENV LC_ALL zh_CN.UTF-8 +ENV LANGUAGE zh_CN.UTF-8 + +# 便捷操作 +RUN echo "alias ll='ls -alF'" >> /root/.bashrc && \ + echo "alias la='ls -A'" >> /root/.bashrc && \ + echo "alias vi='vim'" >> /root/.bashrc && \ + /bin/bash -c "source /root/.bashrc" + +RUN pip install kubernetes==18.20.0 pysnooper psutil +COPY job/mxnet/* /app/ +COPY job/pkgs /app/job/pkgs +WORKDIR /app +ENV PYTHONPATH=/app:$PYTHONPATH + +ENTRYPOINT ["python3", "launcher.py"] + + diff --git a/job-template/job/mxnet/README.md b/job-template/job/mxnet/README.md new file mode 100644 index 00000000..e69de29b diff --git a/job-template/job/mxnet/build.sh b/job-template/job/mxnet/build.sh new file mode 100644 index 00000000..d310086c --- /dev/null +++ b/job-template/job/mxnet/build.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +set -ex + +docker build -t ccr.ccs.tencentyun.com/cube-studio/mxnet:20221010 -f job/mxnet/Dockerfile . +docker push ccr.ccs.tencentyun.com/cube-studio/mxnet:20221010 + + diff --git a/job-template/job/mxnet/demo.yaml b/job-template/job/mxnet/demo.yaml new file mode 100644 index 00000000..184b7b13 --- /dev/null +++ b/job-template/job/mxnet/demo.yaml @@ -0,0 +1,42 @@ +apiVersion: "kubeflow.org/v1" +kind: "MXJob" +metadata: + name: "mxnet-job" + namespace: pipeline +spec: + runPolicy: + cleanPodPolicy: None + jobMode: MXTrain + mxReplicaSpecs: + Scheduler: + replicas: 1 + restartPolicy: Never + template: + spec: + containers: + - name: mxnet + image: mxjob/mxnet:gpu + Server: + replicas: 1 + restartPolicy: Never + template: + spec: + containers: + - name: mxnet + image: mxjob/mxnet:gpu + Worker: + replicas: 1 + restartPolicy: Never + template: + spec: + containers: + - name: mxnet + image: mxjob/mxnet:gpu + command: ["python"] + args: ["/incubator-mxnet/example/image-classification/train_mnist.py","--num-epochs","10","--num-layers","2","--kv-store","dist_device_sync","--gpus","0"] + resources: + limits: + nvidia.com/gpu: 1 + ports: + - containerPort: 9991 + name: mxjob-port \ No newline at end of file diff --git a/job-template/job/mxnet/launcher.py b/job-template/job/mxnet/launcher.py new file mode 100644 index 00000000..ff0bbfd7 --- /dev/null +++ b/job-template/job/mxnet/launcher.py @@ -0,0 +1,352 @@ + +import os,sys +base_dir = os.path.split(os.path.realpath(__file__))[0] +sys.path.append(base_dir) + +import argparse +import datetime +import json +import time +import uuid +import os +import pysnooper +import os,sys +import re +import threading +import psutil +import copy + +from kubernetes import client + +# print(os.environ) +from job.pkgs.k8s.py_k8s import K8s +k8s_client = K8s() + +KFJ_NAMESPACE = os.getenv('KFJ_NAMESPACE', '') +KFJ_TASK_ID = os.getenv('KFJ_TASK_ID', '') +KFJ_TASK_NAME = os.getenv('KFJ_TASK_NAME', '') +task_node_selectors = re.split(',|;|\n|\t', os.getenv('KFJ_TASK_NODE_SELECTOR', 'cpu=true,train=true')) +KFJ_TASK_NODE_SELECTOR = {} +for task_node_selector in task_node_selectors: + KFJ_TASK_NODE_SELECTOR[task_node_selector.split('=')[0]] = task_node_selector.split('=')[1] + +KFJ_PIPELINE_ID = os.getenv('KFJ_PIPELINE_ID', '') +KFJ_RUN_ID = os.getenv('KFJ_RUN_ID', '') +KFJ_CREATOR = os.getenv('KFJ_CREATOR', '') +KFJ_RUNNER = os.getenv('KFJ_RUNNER','') +KFJ_PIPELINE_NAME = os.getenv('KFJ_PIPELINE_NAME', '') +KFJ_TASK_IMAGES = os.getenv('KFJ_TASK_IMAGES', '') +KFJ_TASK_VOLUME_MOUNT = os.getenv('KFJ_TASK_VOLUME_MOUNT', '') +KFJ_TASK_RESOURCE_CPU = os.getenv('KFJ_TASK_RESOURCE_CPU', '') +KFJ_TASK_RESOURCE_MEMORY = os.getenv('KFJ_TASK_RESOURCE_MEMORY', '') +NUM_WORKER = 3 +INIT_FILE='' +crd_info={ + "group": "kubeflow.org", + "version": "v1", + 'kind': 'MXJob', + "plural": "mxjobs", + "timeout": 60 * 60 * 24 * 2 +} + + +k8s_volumes, k8s_volume_mounts = k8s_client.get_volume_mounts(KFJ_TASK_VOLUME_MOUNT,KFJ_CREATOR) + +print(k8s_volumes) +print(k8s_volume_mounts) + +GPU_TYPE= os.getenv('KFJ_GPU_TYPE', 'NVIDIA') +GPU_RESOURCE= os.getenv('KFJ_TASK_RESOURCE_GPU', '0') +print(GPU_TYPE,GPU_RESOURCE) + + + +def default_job_name(): + name = "mxjob-" + KFJ_PIPELINE_NAME.replace('_','-')+"-"+uuid.uuid4().hex[:4] + return name[0:54] + + + + +import subprocess +# @pysnooper.snoop() +def run_shell(shell): + print('begin run shell: %s'%shell,flush=True) + cmd = subprocess.Popen(shell, stdin=subprocess.PIPE, stderr=subprocess.PIPE, + stdout=subprocess.PIPE, universal_newlines=True, shell=True, bufsize=1) + # 实时输出 + while True: + line = cmd.stdout.readline() + status = subprocess.Popen.poll(cmd) + if status: + print(status,line,end='', flush=True) + else: + print(line, end='', flush=True) + if status == 0: # 判断子进程是否结束 + print('shell finish %s'%status,flush=True) + break + if status==-9 or status==-15 or status==143: # 外界触发kill + print('shell finish %s'%status,flush=True) + break + + return cmd.returncode + + + + + + +# 监控指定名称的mxjob +def monitoring(crd_k8s,name,namespace): + time.sleep(10) + # 杀掉stern 进程 + def get_pid(name): + ''' + 作用:根据进程名获取进程pid + ''' + pids = psutil.process_iter() + print("[" + name + "]'s pid is:", flush=True) + back=[] + for pid in pids: + if name in pid.name(): + print(pid.pid, flush=True) + back.append(pid.pid) + return back + check_time = datetime.datetime.now() + while(True): + mxjob = crd_k8s.get_one_crd(group=crd_info['group'],version=crd_info['version'],plural=crd_info['plural'],namespace=namespace,name=name) + if mxjob: + print('mxjob status %s'%mxjob['status'], flush=True) + else: + print('mxjob not exist', flush=True) + if mxjob and (mxjob['status']=="Succeeded" or mxjob['status']=="Failed"): # Created, Running, Restarting, Succeeded, or Failed + pids = get_pid("stern") + if pids: + for pid in pids: + pro = psutil.Process(int(pid)) + pro.terminate() + print('kill process %s'%pid, flush=True) + break + if (datetime.datetime.now()-check_time).seconds>3600: + pids = get_pid("stern") + if pids: + for pid in pids: + pro = psutil.Process(int(pid)) + pro.terminate() + print('kill process %s'%pid, flush=True) + check_time=datetime.datetime.now() + time.sleep(60) + + + +# @pysnooper.snoop() +def make_mxjob(name,num_ps,num_workers,image,working_dir,command): + pod_spec={ + "replicas": 1, + "restartPolicy": "Never", + "template": { + "metadata": { + "labels": { + "pipeline-id": KFJ_PIPELINE_ID, + "pipeline-name": KFJ_PIPELINE_NAME, + "task-id": KFJ_TASK_ID, + "task-name": KFJ_TASK_NAME, + 'rtx-user': KFJ_RUNNER, + "component": name, + "type": "mxjob", + "run-id": KFJ_RUN_ID, + } + }, + "spec": { + # "schedulerName": "kube-batch", + "restartPolicy": "Never", + "volumes": k8s_volumes, + "imagePullSecrets": [ + { + "name": "hubsecret" + } + ], + "affinity": { + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": node_selector_key, + "operator": "In", + "values": [ + KFJ_TASK_NODE_SELECTOR[node_selector_key] + ] + } for node_selector_key in KFJ_TASK_NODE_SELECTOR + ] + } + ] + } + }, + "podAntiAffinity": { + "preferredDuringSchedulingIgnoredDuringExecution": [ + { + "weight": 5, + "podAffinityTerm": { + "topologyKey": "kubernetes.io/hostname", + "labelSelector": { + "matchLabels": { + "component": name, + "type": "mxjob" + } + } + } + } + ] + } + }, + "containers": [ + { + "name": "mxnet", + "image": image if image else KFJ_TASK_IMAGES, + "imagePullPolicy": "Always", + "workingDir":working_dir if working_dir else None, + "command": ['bash','-c',command] if command else None, + "volumeMounts": k8s_volume_mounts, + "resources": { + "requests": { + "cpu": KFJ_TASK_RESOURCE_CPU, + "memory": KFJ_TASK_RESOURCE_MEMORY, + }, + "limits": { + "cpu": KFJ_TASK_RESOURCE_CPU, + "memory": KFJ_TASK_RESOURCE_MEMORY + } + } + } + ] + } + } + } + + + if GPU_TYPE=='NVIDIA' and GPU_RESOURCE: + pod_spec['template']['spec']['containers'][0]['resources']['requests']['nvidia.com/gpu'] = GPU_RESOURCE.split(',')[0] + pod_spec['template']['spec']['containers'][0]['resources']['limits']['nvidia.com/gpu'] = GPU_RESOURCE.split(',')[0] + + worker_pod_spec = copy.deepcopy(pod_spec) + worker_pod_spec['replicas']=int(num_workers) + + scheduler_pod_spec={ + "replicas": 1, + "restartPolicy": "Never", + "template": { + "metadata": { + "labels": { + "pipeline-id": KFJ_PIPELINE_ID, + "pipeline-name": KFJ_PIPELINE_NAME, + "task-id": KFJ_TASK_ID, + "task-name": KFJ_TASK_NAME, + 'rtx-user': KFJ_RUNNER, + "component": name, + "type": "mxjob", + "run-id": KFJ_RUN_ID, + } + }, + "spec": { + "containers": [ + { + "name": "mxnet", + "image": image + } + ] + } + } + } + server_pod_spec = copy.deepcopy(scheduler_pod_spec) + server_pod_spec['replicas']=int(num_ps) + + mx_deploy = { + "apiVersion": "kubeflow.org/v1", + "kind": "MXJob", + "metadata": { + "namespace": KFJ_NAMESPACE, + "name": name, + "labels":{ + "run-id":KFJ_RUN_ID, + "run-rtx":KFJ_RUNNER, + "pipeline-rtx": KFJ_CREATOR, + "pipeline-id": KFJ_PIPELINE_ID, + "pipeline-name": KFJ_PIPELINE_NAME, + "task-id": KFJ_TASK_ID, + "task-name": KFJ_TASK_NAME, + } + }, + "spec": { + "jobMode":'MXTrain', + "runPolicy":{ + "cleanPodPolicy": "None", + }, + "mxReplicaSpecs": { + "Scheduler":scheduler_pod_spec, + "Server":server_pod_spec, + "Worker":worker_pod_spec + } + } + } + + return mx_deploy + + +# @pysnooper.snoop() +def launch_mxjob(name, num_ps,num_workers, image,working_dir, worker_command): + if KFJ_RUN_ID: + print('delete old mx, run-id %s'%KFJ_RUN_ID, flush=True) + k8s_client.delete_crd(group=crd_info['group'],version=crd_info['version'],plural=crd_info['plural'],namespace=KFJ_NAMESPACE,labels={"run-id":KFJ_RUN_ID}) + time.sleep(10) + # 删除旧的mx + k8s_client.delete_crd(group=crd_info['group'], version=crd_info['version'], plural=crd_info['plural'],namespace=KFJ_NAMESPACE, name=name) + time.sleep(10) + # 创建新的mx + mxjob_json = make_mxjob(name=name,num_ps=num_ps,num_workers= num_workers,image = image,working_dir=working_dir,command=worker_command) + print('create new mx %s' % name, flush=True) + k8s_client.create_crd(group=crd_info['group'],version=crd_info['version'],plural=crd_info['plural'],namespace=KFJ_NAMESPACE,body=mxjob_json) + time.sleep(10) + + print('begin start monitoring thread', flush=True) + # # 后台启动监控脚本 + monitoring_thread = threading.Thread(target=monitoring,args=(k8s_client,name,KFJ_NAMESPACE)) + monitoring_thread.start() + while True: + # 实时打印日志 + line='>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>' + print('begin follow log\n%s'%line, flush=True) + command = '''stern %s --namespace %s --exclude-container init-mx --since 10s --template '{{.PodName}} {{.Message}} {{"\\n"}}' '''%(name,KFJ_NAMESPACE) + + print(command, flush=True) + run_shell(command) + print('%s\nend follow log'%line, flush=True) + time.sleep(10) + + mxjob = k8s_client.get_one_crd(group=crd_info['group'], version=crd_info['version'],plural=crd_info['plural'], namespace=KFJ_NAMESPACE, name=name) + if mxjob and (mxjob['status'] == "Succeeded" or mxjob['status'] == "Failed"): + break + + mxjob = k8s_client.get_one_crd(group=crd_info['group'],version=crd_info['version'],plural=crd_info['plural'],namespace=KFJ_NAMESPACE,name=name) + print("mxJob %s finished, status %s"%(name, mxjob['status'])) + + if mxjob['status']!='Succeeded': + exit(1) + + +if __name__ == "__main__": + arg_parser = argparse.ArgumentParser("mxjob launcher") + arg_parser.add_argument('--working_dir', type=str, help="运行job的工作目录", default='/mnt/') + arg_parser.add_argument('--command', type=str, help="运行job的命令", default='python3 mnist.py') + arg_parser.add_argument('--num_ps', type=int, help="运行ps的pod数目", default=0) + arg_parser.add_argument('--num_worker', type=int, help="运行worker的pod数目", default=3) + arg_parser.add_argument('--image', type=str, help="运行job的镜像", default='') + + args = arg_parser.parse_args() + print("{} args: {}".format(__file__, args)) + + + launch_mxjob(name=default_job_name(),num_ps=int(args.num_ps),num_workers=int(args.num_worker),image=args.image,working_dir=args.working_dir,worker_command=args.command) + +