mirror of
https://github.com/tencentmusic/cube-studio.git
synced 2024-11-21 01:16:33 +08:00
fix ray sklearn
This commit is contained in:
parent
dbd71e06f9
commit
6de19d29b0
@ -2,7 +2,6 @@
|
||||
FROM ccr.ccs.tencentyun.com/cube-studio/ray:gpu
|
||||
USER root
|
||||
|
||||
|
||||
# 安装调试相关工具
|
||||
RUN apt update && apt install -y --force-yes --no-install-recommends vim apt-transport-https gnupg2 ca-certificates-java rsync jq wget git dnsutils iputils-ping net-tools curl mysql-client locales zip software-properties-common
|
||||
|
||||
@ -28,7 +27,7 @@ WORKDIR /app
|
||||
RUN pip3 install kubernetes==12.0.1 pysnooper psutil
|
||||
RUN pip3 install scikit-learn==0.23.2 pandas numpy joblib
|
||||
|
||||
COPY job/sklearn_estimator/* /app/
|
||||
COPY job/ray_sklearn/sklearn_estimator/* /app/
|
||||
COPY job/pkgs /app/job/pkgs
|
||||
ENV PYTHONPATH=/app:$PYTHONPATH
|
||||
|
||||
|
38
job-template/job/ray_sklearn/sklearn_estimator/Dockerfile
Normal file
38
job-template/job/ray_sklearn/sklearn_estimator/Dockerfile
Normal file
@ -0,0 +1,38 @@
|
||||
#FROM rayproject/ray:nightly
|
||||
FROM ccr.ccs.tencentyun.com/cube-studio/ray:gpu
|
||||
USER root
|
||||
|
||||
#COPY job/pkgs/config/pip.conf /root/.pip/pip.conf
|
||||
#COPY job/pkgs/config/ubuntu-sources.list /etc/apt/sources.list
|
||||
|
||||
# 安装调试相关工具
|
||||
RUN apt update && apt install -y --force-yes --no-install-recommends vim apt-transport-https gnupg2 ca-certificates-java rsync jq wget git dnsutils iputils-ping net-tools curl mysql-client locales zip software-properties-common
|
||||
|
||||
ENV TZ 'Asia/Shanghai'
|
||||
ENV DEBIAN_FRONTEND=noninteractive
|
||||
|
||||
# 安装开发相关工具
|
||||
RUN apt install -y python3-dev gcc automake autoconf libtool make ffmpeg build-essential
|
||||
|
||||
# 安装pip库
|
||||
RUN pip install pysnooper cython
|
||||
RUN pip install polaris-python --index-url https://mirrors.cloud.tencent.com/pypi/simple/ --extra-index-url https://mirrors.tencent.com/repository/pypi/tencent_pypi/simple/
|
||||
|
||||
# 安装stern
|
||||
RUN wget https://github.com/stern/stern/releases/download/v1.21.0/stern_1.21.0_linux_amd64.tar.gz && tar -zxvf stern_1.21.0_linux_amd64.tar.gz && rm stern_1.21.0_linux_amd64.tar.gz && chmod +x stern && mv stern /usr/bin/stern
|
||||
|
||||
# 便捷操作
|
||||
RUN echo "alias ll='ls -alF'" >> /root/.bashrc && \
|
||||
echo "alias la='ls -A'" >> /root/.bashrc && \
|
||||
echo "alias vi='vim'" >> /root/.bashrc && \
|
||||
/bin/bash -c "source /root/.bashrc"
|
||||
|
||||
WORKDIR /app
|
||||
RUN pip3 install kubernetes==12.0.1 pysnooper psutil
|
||||
RUN pip3 install scikit-learn==0.23.2 pandas numpy joblib
|
||||
|
||||
COPY job/sklearn_estimator/* /app/
|
||||
COPY job/pkgs /app/job/pkgs
|
||||
ENV PYTHONPATH=/app:$PYTHONPATH
|
||||
|
||||
ENTRYPOINT ["python3", "launcher.py"]
|
7
job-template/job/ray_sklearn/sklearn_estimator/build.sh
Normal file
7
job-template/job/ray_sklearn/sklearn_estimator/build.sh
Normal file
@ -0,0 +1,7 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -ex
|
||||
|
||||
docker build --network=host -t ccr.ccs.tencentyun.com/cube-studio/sklearn_estimator:v1 -f job/sklearn_estimator/Dockerfile .
|
||||
docker push ccr.ccs.tencentyun.com/cube-studio/sklearn_estimator:v1
|
||||
|
30
job-template/job/ray_sklearn/sklearn_estimator/common.py
Normal file
30
job-template/job/ray_sklearn/sklearn_estimator/common.py
Normal file
@ -0,0 +1,30 @@
|
||||
import subprocess
|
||||
import fcntl
|
||||
import os,sys
|
||||
import time
|
||||
|
||||
import logging
|
||||
BASE_LOGGING_CONF = '[%(levelname)s] [%(asctime)s] %(message)s'
|
||||
logging.basicConfig(level=logging.INFO,format=BASE_LOGGING_CONF)
|
||||
|
||||
def nonBlockRead(output):
|
||||
fd = output.fileno()
|
||||
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
|
||||
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
|
||||
try:
|
||||
return output.readline()
|
||||
except:
|
||||
return ''
|
||||
|
||||
class HiddenPrints:
|
||||
def __enter__(self):
|
||||
self._original_stdout = sys.stdout
|
||||
self._original_stderr = sys.stderr
|
||||
sys.stdout = open(os.devnull, 'w')
|
||||
sys.stderr = open(os.devnull, 'w')
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
sys.stdout.close()
|
||||
sys.stderr.close()
|
||||
sys.stdout = self._original_stdout
|
||||
sys.stderr = self._original_stderr
|
1
job-template/job/ray_sklearn/sklearn_estimator/init.sh
Normal file
1
job-template/job/ray_sklearn/sklearn_estimator/init.sh
Normal file
@ -0,0 +1 @@
|
||||
echo "init already done in Dockerfile"
|
151
job-template/job/ray_sklearn/sklearn_estimator/launcher.py
Normal file
151
job-template/job/ray_sklearn/sklearn_estimator/launcher.py
Normal file
@ -0,0 +1,151 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import os,sys
|
||||
base_dir = os.path.split(os.path.realpath(__file__))[0]
|
||||
sys.path.append(base_dir)
|
||||
sys.path.append(os.path.realpath(__file__))
|
||||
|
||||
from common import logging, nonBlockRead, HiddenPrints
|
||||
|
||||
import argparse
|
||||
import datetime
|
||||
import json
|
||||
import time
|
||||
import uuid
|
||||
#import pysnooper
|
||||
import re
|
||||
import subprocess
|
||||
#import psutil
|
||||
import copy
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
from sklearn import naive_bayes, neighbors, linear_model, ensemble, tree, svm
|
||||
import joblib
|
||||
|
||||
SUPPORT_MODELS = {
|
||||
'GaussianNB': naive_bayes.MultinomialNB,
|
||||
'MultinomialNB': naive_bayes.MultinomialNB,
|
||||
'BernoulliNB': naive_bayes.BernoulliNB,
|
||||
'Naive Bayes': naive_bayes.MultinomialNB,
|
||||
'nb': naive_bayes.MultinomialNB,
|
||||
|
||||
'KNeighborsClassifier': neighbors.KNeighborsClassifier,
|
||||
'KNN': neighbors.KNeighborsClassifier,
|
||||
'knn': neighbors.KNeighborsClassifier,
|
||||
|
||||
'LogisticRegression' : linear_model.LogisticRegression,
|
||||
'LR' : linear_model.LogisticRegression,
|
||||
'lr' : linear_model.LogisticRegression,
|
||||
|
||||
'RandomForestClassifier' : ensemble.RandomForestClassifier,
|
||||
'Random Forest' : ensemble.RandomForestClassifier,
|
||||
|
||||
'DecisionTreeClassifier' : tree.DecisionTreeClassifier,
|
||||
'Decision Tree' : tree.DecisionTreeClassifier,
|
||||
|
||||
'GradientBoostingClassifier' : ensemble.GradientBoostingClassifier,
|
||||
'gbdt' : ensemble.GradientBoostingClassifier,
|
||||
|
||||
'SVC' : svm.SVC,
|
||||
'SVM' : svm.SVC,
|
||||
'svc' : svm.SVC,
|
||||
'svm' : svm.SVC,
|
||||
}
|
||||
|
||||
def model_name_parse(model_name):
|
||||
return model_name.replace(' ','').lower()
|
||||
|
||||
if __name__ == "__main__":
|
||||
arg_parser = argparse.ArgumentParser("sklearn estimator launcher")
|
||||
arg_parser.add_argument('--train_csv_file_path', type=str, help="训练集csv,|分割,首行header", default='')
|
||||
arg_parser.add_argument('--predict_csv_file_path', type=str, help="预测数据集csv,格式和训练集一致,默认为空,需要predict时填", default='')
|
||||
arg_parser.add_argument('--label_name', type=str, help="label的列名,必填", default='')
|
||||
arg_parser.add_argument('--model_name', type=str, help="训练用到的模型名称,如lr,必填", default='')
|
||||
arg_parser.add_argument('--model_args_dict', type=str, help="模型参数,json格式,默认为空", default='')
|
||||
arg_parser.add_argument('--model_file_path', type=str, help="模型文件保存文件名,必填", default='')
|
||||
arg_parser.add_argument('--predict_result_path', type=str, help="预测结果保存文件名,默认为空,需要predict时填", default='')
|
||||
|
||||
arg_parser.add_argument('--worker_num', type=str, help="ray worker数量", default=1)
|
||||
|
||||
args = arg_parser.parse_args()
|
||||
logging.info("{} args: {}".format(__file__, args))
|
||||
|
||||
support = {model_name_parse(model_name) : SUPPORT_MODELS[model_name] for model_name in SUPPORT_MODELS}
|
||||
if model_name_parse(args.model_name) not in support:
|
||||
print("support models : " + str(SUPPORT_MODELS.keys))
|
||||
raise RuntimeError("your model {} not support".format(args.model_name))
|
||||
model = support[model_name_parse(args.model_name)]
|
||||
|
||||
model_args_dict = {}
|
||||
if args.model_args_dict:
|
||||
model_args_dict = json.loads(args.model_args_dict)
|
||||
|
||||
if not (int(args.worker_num) >=1 and int(args.worker_num)<=10):
|
||||
raise RuntimeError("worker_num between 1 and 10")
|
||||
worker_num = int(args.worker_num)
|
||||
|
||||
if not args.train_csv_file_path and not args.predict_csv_file_path:
|
||||
raise("train_csv_file_path and predict_csv_file_path can not both ba empty")
|
||||
|
||||
if args.train_csv_file_path:
|
||||
if not os.path.exists(args.train_csv_file_path):
|
||||
raise RuntimeError("train_csv_file_path file not exist")
|
||||
train_data = pd.read_csv(args.train_csv_file_path, sep='|', header=0)
|
||||
print('train_data.shape : ' + str(train_data.shape))
|
||||
if train_data.shape[0] <= 0 or train_data.shape[0] <= 0:
|
||||
raise RuntimeError("train data load error")
|
||||
|
||||
if args.predict_csv_file_path:
|
||||
if not args.predict_result_path:
|
||||
raise RuntimeError("predict_result_path can not be empty")
|
||||
predict_result_path = args.predict_result_path
|
||||
|
||||
if not os.path.exists(args.predict_csv_file_path):
|
||||
raise RuntimeError("predict_csv_file_path file not exist")
|
||||
predict_data = pd.read_csv(args.predict_csv_file_path, sep='|', header=0)
|
||||
print('predict_data.shape : ' + str(predict_data.shape))
|
||||
if predict_data.shape[0] <= 0 or predict_data.shape[0] <= 0:
|
||||
raise RuntimeError("predict data load error")
|
||||
|
||||
# if not os.path.exists(args.model_file_path):
|
||||
# raise RuntimeError("must set a exist model_file_path")
|
||||
model_file_path = args.model_file_path
|
||||
|
||||
print('train_data.columns : ' + str(train_data.columns))
|
||||
if not args.label_name or not args.label_name in train_data.columns:
|
||||
raise RuntimeError("label_name illegal")
|
||||
label = train_data[args.label_name]
|
||||
train_data = train_data.drop(args.label_name, axis=1)
|
||||
|
||||
# 启动ray集群
|
||||
init_file = '/app/init.sh'
|
||||
from ray_launcher import ray_launcher
|
||||
head_service_ip = ray_launcher(worker_num, init_file, 'create')
|
||||
print('head_service_ip: ' + head_service_ip)
|
||||
if not head_service_ip:
|
||||
raise RuntimeError("ray cluster not found")
|
||||
os.environ['RAY_ADDRESS'] = head_service_ip
|
||||
|
||||
from ray.util.joblib import register_ray
|
||||
register_ray()
|
||||
with joblib.parallel_backend('ray'):
|
||||
|
||||
st = time.time()
|
||||
|
||||
if args.train_csv_file_path:
|
||||
model = model(**model_args_dict)
|
||||
model.fit(train_data, label)
|
||||
joblib.dump(model, model_file_path)
|
||||
|
||||
if args.predict_csv_file_path:
|
||||
if not args.train_csv_file_path:
|
||||
model = joblib.load(model_file_path)
|
||||
res = model.predict(predict_data)
|
||||
with open(predict_result_path, 'w') as f:
|
||||
for line in res:
|
||||
f.write(str(line) + '\n')
|
||||
|
||||
print("succ, cost {}s".format(str(time.time() -st)))
|
||||
|
||||
ray_launcher(worker_num, init_file, 'delete')
|
||||
|
487
job-template/job/ray_sklearn/sklearn_estimator/ray_launcher.py
Normal file
487
job-template/job/ray_sklearn/sklearn_estimator/ray_launcher.py
Normal file
@ -0,0 +1,487 @@
|
||||
import ray
|
||||
import re
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
from job.pkgs.k8s.py_k8s import K8s
|
||||
k8s_client = K8s()
|
||||
|
||||
import argparse
|
||||
import datetime, time
|
||||
import pysnooper
|
||||
|
||||
# print(os.environ)
|
||||
base_dir = os.path.split(os.path.realpath(__file__))[0]
|
||||
KFJ_NAMESPACE = os.getenv('KFJ_NAMESPACE', '')
|
||||
KFJ_TASK_ID = os.getenv('KFJ_TASK_ID', '')
|
||||
KFJ_TASK_NAME = os.getenv('KFJ_TASK_NAME', '')
|
||||
task_node_selectors = re.split(',|;|\n|\t', os.getenv('KFJ_TASK_NODE_SELECTOR', 'cpu=true,train=true'))
|
||||
KFJ_TASK_NODE_SELECTOR = {}
|
||||
for task_node_selector in task_node_selectors:
|
||||
KFJ_TASK_NODE_SELECTOR[task_node_selector.split('=')[0]] = task_node_selector.split('=')[1]
|
||||
|
||||
KFJ_PIPELINE_ID = os.getenv('KFJ_PIPELINE_ID', '')
|
||||
KFJ_RUN_ID = os.getenv('KFJ_RUN_ID', '')
|
||||
KFJ_CREATOR = os.getenv('KFJ_CREATOR', '')
|
||||
KFJ_RUNNER = os.getenv('KFJ_RUNNER','')
|
||||
KFJ_PIPELINE_NAME = os.getenv('KFJ_PIPELINE_NAME', '')
|
||||
KFJ_TASK_IMAGES = os.getenv('KFJ_TASK_IMAGES', '')
|
||||
KFJ_TASK_VOLUME_MOUNT = os.getenv('KFJ_TASK_VOLUME_MOUNT', '')
|
||||
KFJ_TASK_RESOURCE_CPU = os.getenv('KFJ_TASK_RESOURCE_CPU', '')
|
||||
KFJ_TASK_RESOURCE_MEMORY = os.getenv('KFJ_TASK_RESOURCE_MEMORY', '')
|
||||
NUM_WORKER = 3
|
||||
|
||||
os.environ['RAY_HOST'] = 'ray-header-' + KFJ_PIPELINE_NAME + '-' + KFJ_TASK_ID
|
||||
HEADER_NAME = os.getenv('RAY_HOST', '')
|
||||
WORKER_NAME = HEADER_NAME.replace('header', 'worker')
|
||||
INIT_FILE=''
|
||||
|
||||
|
||||
k8s_volumes, k8s_volume_mounts = k8s_client.get_volume_mounts(KFJ_TASK_VOLUME_MOUNT,KFJ_CREATOR)
|
||||
|
||||
|
||||
print(k8s_volumes)
|
||||
print(k8s_volume_mounts)
|
||||
|
||||
GPU_TYPE= os.getenv('KFJ_GPU_TYPE', 'NVIDIA')
|
||||
GPU_RESOURCE= os.getenv('KFJ_TASK_RESOURCE_GPU', '0')
|
||||
print(GPU_TYPE,GPU_RESOURCE)
|
||||
|
||||
|
||||
def create_header_service(name):
|
||||
service_json = {
|
||||
"apiVersion": "v1",
|
||||
"kind": "Service",
|
||||
"metadata": {
|
||||
"namespace": KFJ_NAMESPACE,
|
||||
"name": name,
|
||||
"labels":{
|
||||
"run-id":os.getenv('KFJ_RUN_ID','unknown'),
|
||||
"run-rtx":os.getenv('KFJ_RUNNER','unknown'),
|
||||
"pipeline-rtx": os.getenv('KFJ_CREATOR', 'unknown'),
|
||||
"task-id":os.getenv('KFJ_TASK_ID','unknown'),
|
||||
"pipeline-id": os.getenv('KFJ_PIPELINE_ID', 'unknown')
|
||||
}
|
||||
},
|
||||
"spec": {
|
||||
"ports": [
|
||||
{
|
||||
"name": "client",
|
||||
"protocol": "TCP",
|
||||
"port": 10001,
|
||||
"targetPort": 10001
|
||||
},
|
||||
{
|
||||
"name": "dashboard",
|
||||
"protocol": "TCP",
|
||||
"port": 8265,
|
||||
"targetPort": 8265
|
||||
},
|
||||
{
|
||||
"name": "redis",
|
||||
"protocol": "TCP",
|
||||
"port": 6379,
|
||||
"targetPort": 6379
|
||||
}
|
||||
],
|
||||
"selector": {
|
||||
"component": name
|
||||
}
|
||||
}
|
||||
}
|
||||
return service_json
|
||||
|
||||
# @pysnooper.snoop()
|
||||
def create_header_deploy(name):
|
||||
header_deploy = {
|
||||
"apiVersion": "apps/v1",
|
||||
"kind": "Deployment",
|
||||
"metadata": {
|
||||
"namespace": KFJ_NAMESPACE,
|
||||
"name": name,
|
||||
"labels":{
|
||||
"run-id":os.getenv('KFJ_RUN_ID','unknown'),
|
||||
"run-rtx":os.getenv('KFJ_RUNNER','unknown'),
|
||||
"pipeline-rtx": os.getenv('KFJ_CREATOR', 'unknown'),
|
||||
"task-id":os.getenv('KFJ_TASK_ID','unknown'),
|
||||
"pipeline-id": os.getenv('KFJ_PIPELINE_ID', 'unknown')
|
||||
}
|
||||
},
|
||||
"spec": {
|
||||
"replicas": 1,
|
||||
"selector": {
|
||||
"matchLabels": {
|
||||
"component": name,
|
||||
"type": "ray"
|
||||
}
|
||||
},
|
||||
"template": {
|
||||
"metadata": {
|
||||
"labels": {
|
||||
"pipeline-id": KFJ_PIPELINE_ID,
|
||||
"pipeline-name": KFJ_PIPELINE_NAME,
|
||||
"task-name": KFJ_TASK_NAME,
|
||||
'rtx-user': KFJ_RUNNER,
|
||||
"component": name,
|
||||
"type": "ray",
|
||||
"run-id": os.getenv('KFJ_RUN_ID', 'unknown'),
|
||||
}
|
||||
},
|
||||
"spec": {
|
||||
"restartPolicy": "Always",
|
||||
"volumes": k8s_volumes,
|
||||
"imagePullSecrets": [
|
||||
{
|
||||
"name": "hubsecret"
|
||||
},
|
||||
{
|
||||
"name": "csig-hubsecret"
|
||||
}
|
||||
],
|
||||
"affinity": {
|
||||
"nodeAffinity": {
|
||||
"requiredDuringSchedulingIgnoredDuringExecution": {
|
||||
"nodeSelectorTerms": [
|
||||
{
|
||||
"matchExpressions": [
|
||||
{
|
||||
"key": node_selector_key,
|
||||
"operator": "In",
|
||||
"values": [
|
||||
KFJ_TASK_NODE_SELECTOR[node_selector_key]
|
||||
]
|
||||
} for node_selector_key in KFJ_TASK_NODE_SELECTOR
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"podAntiAffinity": {
|
||||
"preferredDuringSchedulingIgnoredDuringExecution": [
|
||||
{
|
||||
"weight": 5,
|
||||
"podAffinityTerm": {
|
||||
"topologyKey": "kubernetes.io/hostname",
|
||||
"labelSelector": {
|
||||
"matchLabels": {
|
||||
"component": name,
|
||||
"type":"ray"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"containers": [
|
||||
{
|
||||
"name": "ray-head",
|
||||
"image": KFJ_TASK_IMAGES,
|
||||
"imagePullPolicy": "Always",
|
||||
"command": [
|
||||
"/bin/bash",
|
||||
"-c",
|
||||
"%s ray start --head --port=6379 --redis-shard-ports=6380,6381 --num-cpus=$MY_CPU_REQUEST --object-manager-port=12345 --node-manager-port=12346 --block"%INIT_FILE
|
||||
],
|
||||
"ports": [
|
||||
{
|
||||
"containerPort": 6379
|
||||
},
|
||||
{
|
||||
"containerPort": 10001
|
||||
},
|
||||
{
|
||||
"containerPort": 8265
|
||||
}
|
||||
],
|
||||
"volumeMounts": k8s_volume_mounts,
|
||||
"env": [
|
||||
{
|
||||
"name": "MY_CPU_REQUEST",
|
||||
"valueFrom": {
|
||||
"resourceFieldRef": {
|
||||
"resource": "requests.cpu"
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"resources": {
|
||||
"requests": {
|
||||
"cpu": KFJ_TASK_RESOURCE_CPU,
|
||||
"memory": KFJ_TASK_RESOURCE_MEMORY,
|
||||
},
|
||||
"limits": {
|
||||
"cpu": KFJ_TASK_RESOURCE_CPU,
|
||||
"memory": KFJ_TASK_RESOURCE_MEMORY
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if GPU_TYPE=='NVIDIA' and GPU_RESOURCE:
|
||||
header_deploy['spec']['template']['spec']['containers'][0]['resources']['requests']['nvidia.com/gpu'] = GPU_RESOURCE.split(',')[0]
|
||||
header_deploy['spec']['template']['spec']['containers'][0]['resources']['limits']['nvidia.com/gpu'] = GPU_RESOURCE.split(',')[0]
|
||||
|
||||
if GPU_TYPE=='TENCENT' and GPU_RESOURCE:
|
||||
if len(GPU_RESOURCE.split(','))==2:
|
||||
gpu_core,gpu_mem = GPU_RESOURCE.split(',')[0],str(4*int(GPU_RESOURCE.split(',')[1]))
|
||||
if gpu_core and gpu_mem:
|
||||
header_deploy['spec']['template']['spec']['containers'][0]['resources']['requests'][
|
||||
'tencent.com/vcuda-core'] = gpu_core
|
||||
header_deploy['spec']['template']['spec']['containers'][0]['resources']['requests'][
|
||||
'tencent.com/vcuda-memory'] = gpu_mem
|
||||
header_deploy['spec']['template']['spec']['containers'][0]['resources']['limits'][
|
||||
'tencent.com/vcuda-core'] = gpu_core
|
||||
header_deploy['spec']['template']['spec']['containers'][0]['resources']['limits'][
|
||||
'tencent.com/vcuda-memory'] = gpu_mem
|
||||
|
||||
return header_deploy
|
||||
|
||||
|
||||
def create_worker_deploy(header_name,worker_name):
|
||||
worker_deploy = {
|
||||
"apiVersion": "apps/v1",
|
||||
"kind": "Deployment",
|
||||
"metadata": {
|
||||
"namespace": KFJ_NAMESPACE,
|
||||
"name": worker_name,
|
||||
"labels": {
|
||||
"run-id":os.getenv('KFJ_RUN_ID','unknown'),
|
||||
"run-rtx":os.getenv('KFJ_RUNNER','unknown'),
|
||||
"pipeline-rtx": os.getenv('KFJ_CREATOR', 'unknown'),
|
||||
"task-id":os.getenv('KFJ_TASK_ID','unknown'),
|
||||
"pipeline-id": os.getenv('KFJ_PIPELINE_ID', 'unknown')
|
||||
}
|
||||
},
|
||||
"spec": {
|
||||
"replicas": NUM_WORKER,
|
||||
"selector": {
|
||||
"matchLabels": {
|
||||
"component": worker_name,
|
||||
"type": "ray"
|
||||
}
|
||||
},
|
||||
"template": {
|
||||
"metadata": {
|
||||
"labels": {
|
||||
"pipeline-id": KFJ_PIPELINE_ID,
|
||||
"pipeline-name": KFJ_PIPELINE_NAME,
|
||||
"task-name": KFJ_TASK_NAME,
|
||||
'rtx-user': KFJ_RUNNER,
|
||||
"component": worker_name,
|
||||
"type": "ray",
|
||||
"run-id": os.getenv('KFJ_RUN_ID', 'unknown'),
|
||||
|
||||
}
|
||||
},
|
||||
|
||||
"spec": {
|
||||
"affinity": {
|
||||
"nodeAffinity": {
|
||||
"requiredDuringSchedulingIgnoredDuringExecution": {
|
||||
"nodeSelectorTerms": [
|
||||
{
|
||||
"matchExpressions": [
|
||||
{
|
||||
"key": node_selector_key,
|
||||
"operator": "In",
|
||||
"values": [
|
||||
KFJ_TASK_NODE_SELECTOR[node_selector_key]
|
||||
]
|
||||
} for node_selector_key in KFJ_TASK_NODE_SELECTOR
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"podAntiAffinity": {
|
||||
"preferredDuringSchedulingIgnoredDuringExecution": [
|
||||
{
|
||||
"weight": 5,
|
||||
"podAffinityTerm": {
|
||||
"topologyKey": "kubernetes.io/hostname",
|
||||
"labelSelector": {
|
||||
"matchLabels": {
|
||||
"component": worker_name
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"imagePullSecrets": [
|
||||
{
|
||||
"name": "hubsecret"
|
||||
},
|
||||
{
|
||||
"name": "csig-hubsecret"
|
||||
}
|
||||
],
|
||||
"restartPolicy": "Always",
|
||||
"volumes": k8s_volumes,
|
||||
"containers": [
|
||||
{
|
||||
"name": "ray-worker",
|
||||
"image": KFJ_TASK_IMAGES,
|
||||
"imagePullPolicy": "Always",
|
||||
"command": [
|
||||
"/bin/bash",
|
||||
"-c",
|
||||
"%s ray start --num-cpus=$MY_CPU_REQUEST --address=$RAY_HEAD_SERVICE_HOST:$RAY_HEAD_SERVICE_PORT_REDIS --object-manager-port=12345 --node-manager-port=12346 --block"%INIT_FILE
|
||||
],
|
||||
"volumeMounts": k8s_volume_mounts,
|
||||
"env": [
|
||||
{
|
||||
"name": "MY_CPU_REQUEST",
|
||||
"valueFrom": {
|
||||
"resourceFieldRef": {
|
||||
"resource": "requests.cpu"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "RAY_HEAD_SERVICE_HOST",
|
||||
"value": header_name
|
||||
},
|
||||
{
|
||||
"name": "RAY_HEAD_SERVICE_PORT_REDIS",
|
||||
"value": "6379"
|
||||
}
|
||||
],
|
||||
"resources": {
|
||||
"requests": {
|
||||
"cpu": KFJ_TASK_RESOURCE_CPU,
|
||||
"memory": KFJ_TASK_RESOURCE_MEMORY
|
||||
},
|
||||
"limits": {
|
||||
"cpu": KFJ_TASK_RESOURCE_CPU,
|
||||
"memory": KFJ_TASK_RESOURCE_MEMORY
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if GPU_TYPE=='NVIDIA' and GPU_RESOURCE:
|
||||
worker_deploy['spec']['template']['spec']['containers'][0]['resources']['requests']['nvidia.com/gpu'] = GPU_RESOURCE.split(',')[0]
|
||||
worker_deploy['spec']['template']['spec']['containers'][0]['resources']['limits']['nvidia.com/gpu'] = GPU_RESOURCE.split(',')[0]
|
||||
|
||||
if GPU_TYPE=='TENCENT' and GPU_RESOURCE:
|
||||
if len(GPU_RESOURCE.split(','))==2:
|
||||
gpu_core,gpu_mem = GPU_RESOURCE.split(',')[0],str(4*int(GPU_RESOURCE.split(',')[1]))
|
||||
if gpu_core and gpu_mem:
|
||||
worker_deploy['spec']['template']['spec']['containers'][0]['resources']['requests'][
|
||||
'tencent.com/vcuda-core'] = gpu_core
|
||||
worker_deploy['spec']['template']['spec']['containers'][0]['resources']['requests'][
|
||||
'tencent.com/vcuda-memory'] = gpu_mem
|
||||
worker_deploy['spec']['template']['spec']['containers'][0]['resources']['limits'][
|
||||
'tencent.com/vcuda-core'] = gpu_core
|
||||
worker_deploy['spec']['template']['spec']['containers'][0]['resources']['limits'][
|
||||
'tencent.com/vcuda-memory'] = gpu_mem
|
||||
|
||||
|
||||
return worker_deploy
|
||||
|
||||
|
||||
# @pysnooper.snoop()
|
||||
def wait_for_nodes():
|
||||
# Wait for all nodes to join the cluster.
|
||||
while True:
|
||||
resources = ray.cluster_resources()
|
||||
node_keys = [key for key in resources if "node" in key]
|
||||
num_nodes = sum(resources[node_key] for node_key in node_keys)
|
||||
if num_nodes < NUM_WORKER:
|
||||
print("{} nodes have joined so far, waiting for {} more.".format(num_nodes, NUM_WORKER - num_nodes))
|
||||
sys.stdout.flush()
|
||||
time.sleep(1)
|
||||
else:
|
||||
break
|
||||
|
||||
|
||||
# @pysnooper.snoop()
|
||||
def launcher_cluster(deal=None):
|
||||
# 清理一下之前存在的
|
||||
try:
|
||||
print('begin delete old header service')
|
||||
k8s_client.v1.delete_namespaced_service(HEADER_NAME, KFJ_NAMESPACE)
|
||||
except Exception as e1:
|
||||
pass
|
||||
print(e1)
|
||||
|
||||
try:
|
||||
print('begin delete old header deployment')
|
||||
k8s_client.AppsV1Api.delete_namespaced_deployment(HEADER_NAME, KFJ_NAMESPACE)
|
||||
except Exception as e1:
|
||||
pass
|
||||
print(e1)
|
||||
|
||||
try:
|
||||
print('begin delete old worker deployment')
|
||||
k8s_client.AppsV1Api.delete_namespaced_deployment(WORKER_NAME, KFJ_NAMESPACE)
|
||||
except Exception as e1:
|
||||
pass
|
||||
print(e1)
|
||||
time.sleep(3)
|
||||
|
||||
if deal=='create':
|
||||
header_service = create_header_service(HEADER_NAME)
|
||||
header_deploy = create_header_deploy(HEADER_NAME)
|
||||
worker_deploy = create_worker_deploy(HEADER_NAME,WORKER_NAME)
|
||||
try:
|
||||
print(KFJ_NAMESPACE)
|
||||
print(header_service)
|
||||
print('begin create ray header service,%s ' % datetime.datetime.now())
|
||||
k8s_client.v1.create_namespaced_service(KFJ_NAMESPACE, header_service, pretty='true')
|
||||
print('begin create ray header deployment,%s ' % datetime.datetime.now())
|
||||
print(header_deploy)
|
||||
k8s_client.AppsV1Api.create_namespaced_deployment(KFJ_NAMESPACE, header_deploy, pretty='true')
|
||||
print('begin create ray worker deployment,%s ' % datetime.datetime.now())
|
||||
print(worker_deploy)
|
||||
k8s_client.AppsV1Api.create_namespaced_deployment(KFJ_NAMESPACE, worker_deploy, pretty='true')
|
||||
# 等待创建完成
|
||||
time.sleep(20)
|
||||
header_host = "%s:10001" % HEADER_NAME
|
||||
print('begin connect ray cluster %s,%s ' % (header_host,datetime.datetime.now()))
|
||||
|
||||
ray.util.connect(header_host,connection_retries=20)
|
||||
wait_for_nodes()
|
||||
print('ray cluster all node ready,%s ' % datetime.datetime.now())
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
try:
|
||||
print('begin delete error header service')
|
||||
k8s_client.v1.delete_namespaced_service(HEADER_NAME, KFJ_NAMESPACE)
|
||||
except Exception as e1:
|
||||
pass
|
||||
# print(e1)
|
||||
try:
|
||||
print('begin delete error header deployment')
|
||||
k8s_client.AppsV1Api.delete_namespaced_deployment(HEADER_NAME, KFJ_NAMESPACE)
|
||||
except Exception as e1:
|
||||
pass
|
||||
# print(e1)
|
||||
try:
|
||||
print('begin delete error worker deployment')
|
||||
k8s_client.AppsV1Api.delete_namespaced_deployment(WORKER_NAME, KFJ_NAMESPACE)
|
||||
except Exception as e1:
|
||||
pass
|
||||
print(e1)
|
||||
# 如果出现错误,报错退出。不进行下一步代码
|
||||
raise e
|
||||
|
||||
def ray_launcher(num_workers, init_file, deal):
|
||||
NUM_WORKER = int(num_workers)
|
||||
INIT_FILE = "bash "+init_file.strip()+" && "
|
||||
launcher_cluster(deal=deal)
|
||||
return "%s:10001" % HEADER_NAME
|
10
job-template/job/ray_sklearn/sklearn_estimator/stern.sh
Normal file
10
job-template/job/ray_sklearn/sklearn_estimator/stern.sh
Normal file
@ -0,0 +1,10 @@
|
||||
#!/bin/sh
|
||||
name=$1
|
||||
namespace=$2
|
||||
env=$3
|
||||
while true;
|
||||
do
|
||||
# echo "stern $name --namespace $namespace --tail 0 --template '{{.PodName}} {{.Message}}'";
|
||||
# timeout 600s stern $name --namespace $namespace --tail 0 --template '{{.PodName}} {{.Message}}';
|
||||
timeout 600s stern $name --namespace $namespace --tail 0 --template '{{.Message}}';
|
||||
done
|
Loading…
Reference in New Issue
Block a user