This commit is contained in:
pengluan 2021-10-14 17:35:48 +08:00
parent 842a46a183
commit 84d1e1b258
12 changed files with 374 additions and 196 deletions

View File

@ -28,65 +28,6 @@ import json
# 首页显示内容
# 数据格式说明 dict:
# 'id': 唯一标识,
# 'type': 类型 一级支持 title | boxlist | list 二级支持 text | markdown,
# 'open': 是否当前页打开 1 从当前窗口打开 默认打开新的标签页,
# 'url': 点击时打开的链接 仅支持 http(s)://开头,
# 'cover': 封面图链接 支持base64图片编码,
# 'content': 内容 支持markdown(需设置类型为markdown),
# 'data': 嵌套下级内容数组
HOME_CONFIG = [
{
'id': 2,
'type': 'title',
'content': '平台主功能',
'data': [],
}, {
'id': 102,
'type': 'boxlist',
'content': 'Pipelines',
'data': [{
'id': 1021,
'type': 'base',
'open': 1,
'url': '/pipeline_modelview/list/',
'cover': '/static/assets/images/home/dag.jpg',
'content': '机器学习pipeline流水线',
}, {
'id': 1022,
'type': 'base',
'open': 1,
'url': '/notebook_modelview/add',
'cover': '/static/assets/images/home/vscode.png',
'content': '在线ide编辑器',
},
{
'id': 1023,
'type': 'base',
'open': 1,
'url': '/nni_modelview/add',
'cover': '/static/assets/images/home/private.png',
'content': '超参搜索',
}, {
'id': 1024,
'type': 'base',
'open': 1,
'url': '/service_modelview/list/',
'cover': '/static/assets/images/home/service.png',
'content': '模型服务化',
}],
}
]
# 推送给管理员消息的函数
def push_admin(message):
pass

View File

@ -212,7 +212,7 @@ def push_task_time(workflow,dbsession):
task_name = pods[pod_name]['displayName']
finishedAt = datetime.datetime.strptime(pods[pod_name]['finishedAt'].replace('T',' ').replace('Z',''),'%Y-%m-%d %H:%M:%S')
startAt = datetime.datetime.strptime(pods[pod_name]['startedAt'].replace('T', ' ').replace('Z', ''),'%Y-%m-%d %H:%M:%S')
run_time= round((finishedAt-startAt).seconds/60/60,2)
run_time= round((finishedAt-startAt).days*24+(finishedAt-startAt).seconds/60/60,2)
db_task_name = task_name[:task_name.index('(')] if '(' in task_name else task_name
task = dbsession.query(Task).filter(Task.pipeline_id == int(pipeline_id)).filter(Task.name == db_task_name).first()
if startAt in task_pod_time and task_pod_time[startAt]:
@ -244,7 +244,7 @@ def push_task_time(workflow,dbsession):
workflow.info_json = json.dumps(info_json, indent=4, ensure_ascii=False)
dbsession.commit()
push_admin(message)
push_message(conf.get('ADMIN_USER').split(','),message)
alert_user = pipeline.alert_user.split(',') if pipeline.alert_user else []
alert_user = [user.strip() for user in alert_user if user.strip()]
@ -375,7 +375,7 @@ def deal_event(event,workflow_info,namespace):
deliver_message(workflow,dbsession)
except Exception as e1:
print('push fail:', e1)
push_admin(str(e1))
push_message(conf.get('ADMIN_USER').split(','),'push fail'+str(e1))
save_history(workflow,dbsession)
save_monitoring(workflow, dbsession)

View File

@ -1395,8 +1395,8 @@ def check_resource_memory(resource_memory,src_resource_memory=None):
if resource_int<=src_resource_int:
return resource
if resource_int>50000:
return '50G'
if resource_int>250000:
return '250G'
else:
return resource
@ -1442,9 +1442,9 @@ def check_resource_cpu(resource_cpu,src_resource_cpu=None):
if resource_int<=src_resource_int:
return resource
if resource_int>20:
return '20'
# raise MyappException('resource cpu max 20')
if resource_int>50:
return '50'
# raise MyappException('resource cpu max 50')
return resource
resource = resource_cpu.upper().replace('-', '~').replace('_', '~').strip()

View File

@ -121,33 +121,21 @@ class K8s():
# 指定命名空间指定服务名指定pod名称指定状态删除重启pod。status为运行状态,True 或者False
def delete_pods(self,namespace=None,service_name=None,pod_name=None,status=None,labels=None):
allresponse = []
if namespace and pod_name:
api_response = self.v1.delete_namespaced_pod(name = pod_name, namespace=namespace,grace_period_seconds=0)
allresponse.append(api_response)
return allresponse
if not namespace:
return []
all_pods=self.get_pods(namespace=namespace,pod_name=pod_name,service_name=service_name,labels=labels)
if status:
all_pods = [pod for pod in all_pods if pod['status']==status]
try:
if labels:
all_pod = self.get_pods(namespace=namespace,labels=labels)
for pod in all_pod:
api_response = self.v1.delete_namespaced_pod(pod['name'], namespace,grace_period_seconds=0)
allresponse.append(api_response)
if status:
all_pod = self.get_pods(namespace=namespace,service_name=service_name,pod_name=pod_name)
if all_pod:
for pod in all_pod:
if status==None: # 如果没有指定运行状态,则直接删除
api_response = self.v1.delete_namespaced_pod(pod['name'],namespace,grace_period_seconds=0)
allresponse.append(api_response)
elif pod['status']==status:
# body = kubernetes.client.V1DeleteOptions(grace_period_seconds=0,orphan_dependents=False) # 不正常的要设置强制删除
api_response = self.v1.delete_namespaced_pod(pod['name'], namespace,grace_period_seconds=0)
allresponse.append(api_response)
print('delete pod %s' % all_pod)
return allresponse
for pod in all_pods:
self.v1.delete_namespaced_pod(pod['name'], namespace,grace_period_seconds=0)
print('delete pod %s' % pod['name'])
except Exception as e:
print(e)
return []
return all_pods
# 获取指定label的nodeip列表
# @pysnooper.snoop()
@ -215,7 +203,7 @@ class K8s():
# 根据各种crd自定义的status结构判断最终评定的status
# @pysnooper.snoop()
def get_crd_status(self,crd_object,plural):
def get_crd_status(self,crd_object,group,plural):
status = ''
# workflows 使用最后一个node的状态为真是状态
if plural == 'workflows':
@ -233,6 +221,10 @@ class K8s():
for condition in crd_object['status']['conditions']:
if condition['type']=='Ready' and condition['status']=='True':
status='ready'
elif plural == 'jobs' and group=='batch.volcano.sh':
status = 'unready'
if 'status' in crd_object and 'state' in crd_object['status'] and 'phase' in crd_object['status']['state']:
return crd_object['status']['state']['phase']
else:
if 'status' in crd_object and 'phase' in crd_object['status']:
status = crd_object['status']['phase']
@ -264,7 +256,7 @@ class K8s():
return {}
# print(crd_object['status']['conditions'][-1]['type'])
status = self.get_crd_status(crd_object,plural)
status = self.get_crd_status(crd_object,group,plural)
creat_time = crd_object['metadata']['creationTimestamp'].replace('T', ' ').replace('Z', '')
creat_time = (datetime.datetime.strptime(creat_time, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')
@ -301,7 +293,7 @@ class K8s():
back_objects=[]
for crd_object in crd_objects:
# print(crd_object['status']['conditions'][-1]['type'])
status = self.get_crd_status(crd_object,plural)
status = self.get_crd_status(crd_object,group,plural)
creat_time = crd_object['metadata']['creationTimestamp'].replace('T', ' ').replace('Z', '')
creat_time = (datetime.datetime.strptime(creat_time, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')
@ -313,6 +305,13 @@ class K8s():
finish_time = crd_object['status']['completionTime'].replace('T', ' ').replace('Z', '')
finish_time = (datetime.datetime.strptime(finish_time, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')
# vcjob的结束时间
elif 'status' in crd_object and 'state' in crd_object['status'] and 'lastTransitionTime' in crd_object['status']['state']:
if crd_object['status']['state'].get('phase','')=='Completed':
finish_time = crd_object['status']['state']['lastTransitionTime'].replace('T', ' ').replace('Z', '')
finish_time = (datetime.datetime.strptime(finish_time, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')
back_object={
"name":crd_object['metadata']['name'],
"namespace":crd_object['metadata']['namespace'] if 'namespace' in crd_object['metadata'] else '',
@ -427,7 +426,6 @@ class K8s():
except Exception as e:
print(e)
# 删除pytorchjob
try:
crd_info = all_crd_info['pytorchjob']
@ -448,16 +446,19 @@ class K8s():
except Exception as e:
print(e)
# 删除framework
# 删除vcjob
try:
crd_info = all_crd_info['framework']
crd_names = self.delete_crd(group=crd_info['group'], version=crd_info['version'],
plural=crd_info['plural'], namespace=namespace,
labels={"run-id": str(run_id)})
crd_info = all_crd_info['vcjob']
crd_names = self.delete_crd(
group=crd_info['group'], version=crd_info['version'], plural=crd_info['plural'],
namespace=namespace, labels={'run-id': run_id}
)
except Exception as e:
print(e)
# 删除deployment
try:
self.delete_deployment(namespace=namespace, labels={'run-id': run_id})
@ -485,16 +486,6 @@ class K8s():
except Exception as e:
print(e)
# 删除sts
try:
stss = self.AppsV1Api.list_namespaced_stateful_set(namespace=namespace,label_selector="run-id=%s" % str(run_id)).items
if stss:
for sts in stss:
self.AppsV1Api.delete_namespaced_stateful_set(namespace=namespace,name=sts.metadata.name,grace_period_seconds=0)
except Exception as e:
print(e)
# 删除service
try:
services = self.v1.list_namespaced_service(namespace=namespace,label_selector="run-id=%s" % str(run_id)).items
@ -650,7 +641,7 @@ class K8s():
return k8s_volumes,k8s_volume_mounts
# @pysnooper.snoop(watch_explode=('envs'))
# @pysnooper.snoop(watch_explode=())
def make_container(self,name,command,args,volume_mount,working_dir,resource_memory,resource_cpu,resource_gpu,image_pull_policy,image,env,privileged=False,username='',ports=None):
if not '~' in resource_memory:
@ -706,8 +697,8 @@ class K8s():
"memory": requests_memory
}
resources_limits = {
"cpu": requests_cpu,
"memory": requests_memory
"cpu": limits_cpu,
"memory": limits_memory
}
if gpu_type == 'NVIDIA':
@ -1033,12 +1024,17 @@ class K8s():
# 创建pod
# @pysnooper.snoop()
def create_service(self,namespace,name,username,ports,selector=None):
def create_service(self,namespace,name,username,ports,selector=None,service_type='ClusterIP',externalIPs=None):
svc_metadata = v1_object_meta.V1ObjectMeta(name=name, namespace=namespace, labels={"app":name,'user':username})
ports = [client.V1ServicePort(name='http%s'%index, port=int(port), protocol='TCP', target_port=int(port)) for index,port in enumerate(ports)]
svc_spec = client.V1ServiceSpec(ports=ports, selector={"app": name, 'user': username}, type='ClusterIP')
service_ports=[]
for index,port in enumerate(ports):
if type(port)==list and len(port)>1:
service_ports.append(client.V1ServicePort(name='http%s'%index, port=int(port[0]), protocol='TCP', target_port=int(port[1])))
else:
service_ports.append(client.V1ServicePort(name='http%s' % index, port=int(port), protocol='TCP', target_port=int(port)))
svc_spec = client.V1ServiceSpec(ports=service_ports, selector={"app": name, 'user': username}, type=service_type,external_i_ps=externalIPs)
if selector:
svc_spec = client.V1ServiceSpec(ports=ports, selector=selector, type='ClusterIP')
svc_spec = client.V1ServiceSpec(ports=service_ports, selector=selector, type=service_type,external_i_ps=externalIPs)
service = client.V1Service(api_version='v1', kind='Service', metadata=svc_metadata, spec=svc_spec)
# print(service.to_dict())
try:

View File

@ -67,6 +67,109 @@ FRONTEND_CONF_KEYS = (
from flask_appbuilder.const import (
FLAMSG_ERR_SEC_ACCESS_DENIED,
LOGMSG_ERR_SEC_ACCESS_DENIED,
PERMISSION_PREFIX
)
from flask_appbuilder._compat import as_unicode
log = logging.getLogger(__name__)
def has_access(f):
"""
Use this decorator to enable granular security permissions to your methods.
Permissions will be associated to a role, and roles are associated to users.
By default the permission's name is the methods name.
"""
if hasattr(f, '_permission_name'):
permission_str = f._permission_name
else:
permission_str = f.__name__
def wraps(self, *args, **kwargs):
permission_str = "{}{}".format(PERMISSION_PREFIX, f._permission_name)
if self.method_permission_name:
_permission_name = self.method_permission_name.get(f.__name__)
if _permission_name:
permission_str = "{}{}".format(PERMISSION_PREFIX, _permission_name)
if (permission_str in self.base_permissions and
self.appbuilder.sm.has_access(
permission_str,
self.class_permission_name
)):
return f(self, *args, **kwargs)
else:
log.warning(
LOGMSG_ERR_SEC_ACCESS_DENIED.format(
permission_str,
self.__class__.__name__
)
)
flash(as_unicode(FLAMSG_ERR_SEC_ACCESS_DENIED), "danger")
return redirect(
url_for(
self.appbuilder.sm.auth_view.__class__.__name__ + ".login",
next=request.url
)
)
f._permission_name = permission_str
return functools.update_wrapper(wraps, f)
def has_access_api(f):
"""
Use this decorator to enable granular security permissions to your API methods.
Permissions will be associated to a role, and roles are associated to users.
By default the permission's name is the methods name.
this will return a message and HTTP 401 is case of unauthorized access.
"""
if hasattr(f, '_permission_name'):
permission_str = f._permission_name
else:
permission_str = f.__name__
def wraps(self, *args, **kwargs):
permission_str = "{}{}".format(PERMISSION_PREFIX, f._permission_name)
if self.method_permission_name:
_permission_name = self.method_permission_name.get(f.__name__)
if _permission_name:
permission_str = "{}{}".format(PERMISSION_PREFIX, _permission_name)
if (permission_str in self.base_permissions and
self.appbuilder.sm.has_access(
permission_str,
self.class_permission_name
)):
return f(self, *args, **kwargs)
else:
log.warning(
LOGMSG_ERR_SEC_ACCESS_DENIED.format(
permission_str,
self.__class__.__name__
)
)
response = make_response(
jsonify(
{
'message': str(FLAMSG_ERR_SEC_ACCESS_DENIED),
'severity': 'danger'
}
),
401
)
response.headers['Content-Type'] = "application/json"
return response
f._permission_name = permission_str
return functools.update_wrapper(wraps, f)
def get_error_msg():
if conf.get("SHOW_STACKTRACE"):
error_msg = traceback.format_exc()
@ -226,7 +329,7 @@ class MyappModelView(ModelView):
post_list = None
pre_show = None
post_show = None
check_edit_permission=None
label_title = ''
conv = GeneralModelConverter(datamodel)
@ -413,14 +516,6 @@ class MyappModelView(ModelView):
self.add_template, title=self.add_title, widgets=widget
)
# 检测是否具有编辑权限只有creator和admin可以编辑
def check_edit_permission(self, item):
user_roles = [role.name.lower() for role in list(get_user_roles())]
if "admin" in user_roles:
return
if g.user and g.user.username and hasattr(item,'created_by'):
if g.user.username!=item.created_by.username:
raise MyappException('just creator can edit/delete ')
# @pysnooper.snoop(watch_explode=('item'))
@ -512,13 +607,22 @@ class MyappModelView(ModelView):
if self.src_item_object:
self.src_item_json = self.src_item_object.to_json()
if self.check_redirect_list_url:
try:
self.check_edit_permission(self.src_item_object)
except Exception as e:
print(e)
flash(str(e), 'warning')
return redirect(self.check_redirect_list_url)
# if self.check_redirect_list_url:
try:
if self.check_edit_permission:
has_permission = self.check_edit_permission(self.src_item_object)
if not has_permission:
self.update_redirect()
url = self.get_redirect()
return redirect(url)
except Exception as e:
print(e)
flash(str(e), 'warning')
self.update_redirect()
return redirect(self.get_redirect())
# return redirect(self.check_redirect_list_url)
widgets = self._edit(pk)
@ -543,7 +647,10 @@ class MyappModelView(ModelView):
self.src_item_json = self.src_item_object.to_json()
if self.check_redirect_list_url:
try:
self.check_edit_permission(self.src_item_object)
if self.check_edit_permission:
if not self.check_edit_permission(self.src_item_object):
flash(str('no permission delete'), 'warning')
return redirect(self.check_redirect_list_url)
except Exception as e:
print(e)
flash(str(e), 'warning')

View File

@ -202,6 +202,17 @@ def merge_response_func(func, key):
return wrap
def json_response(message,status,result):
return jsonify(
{
"message":message,
"status":status,
"result":result
}
)
import pysnooper
# @pysnooper.snoop(depth=5)
# 暴露url+视图函数。视图函数会被覆盖暴露url也会被覆盖
@ -212,6 +223,7 @@ class MyappModelRestApi(ModelRestApi):
page_size = 100
src_item_object = None # 原始model对象
src_item_json={} # 原始model对象的json
check_edit_permission = None
datamodel=None
post_list=None
pre_json_load=None
@ -524,6 +536,18 @@ class MyappModelRestApi(ModelRestApi):
item = self.datamodel.get(pk, self._base_filters)
self.src_item_json = item.to_json()
# if self.check_redirect_list_url:
try:
if self.check_edit_permission:
has_permission = self.check_edit_permission(item)
if not has_permission:
return json_response(message='no permission to edit',status=1,result={})
except Exception as e:
print(e)
return json_response(message='check edit permission'+str(e),status=1,result={})
if not request.is_json:
return self.response_error(400, message="Request is not JSON")
if not item:
@ -541,6 +565,8 @@ class MyappModelRestApi(ModelRestApi):
if isinstance(item.data, dict):
return self.response_error(422,message=item.errors)
self.pre_update(item.data)
try:
self.datamodel.edit(item.data, raise_exception=True)
self.post_update(item.data)

View File

@ -51,10 +51,8 @@ class Myapp(BaseMyappView):
@expose('/home')
def home(self):
from myapp.project import HOME_CONFIG
data = HOME_CONFIG
# 返回模板
return self.render_template('home.html', data=data)
return self.render_template('home.html')
@expose("/web/log/<cluster_name>/<namespace>/<pod_name>", methods=["GET",])

View File

@ -253,6 +253,7 @@ class Notebook_ModelView_Base():
item.changed_by_fk = int(self.src_item_json.get('changed_by_fk'))
if self.src_item_json:
item.created_by_fk = int(self.src_item_json.get('created_by_fk'))
db.session.commit()
def post_list(self,items):
@ -287,6 +288,8 @@ class Notebook_ModelView_Base():
# @pysnooper.snoop(watch_explode=('notebook'))
def reset_notebook(self, notebook):
notebook.changed_on=datetime.datetime.now()
db.session.commit()
self.reset_theia(notebook)
@ -310,7 +313,7 @@ class Notebook_ModelView_Base():
"--no-browser --allow-root --port=%s "
"--NotebookApp.token='' --NotebookApp.password='' "
"--NotebookApp.allow_origin='*' "
"--NotebookApp.base_url=%s" % ('/mnt/%s' % notebook.created_by.username if "(pvc)" in notebook.volume_mount else "/mnt/",port,rewrite_url)]
"--NotebookApp.base_url=%s" % (notebook.mount,port,rewrite_url)]
volume_mount +=',2G(memory):/dev/shm'
elif notebook.ide_type=='theia':
@ -337,8 +340,8 @@ class Notebook_ModelView_Base():
volume_mount=volume_mount,
working_dir=workingDir,
node_selector=notebook.get_node_selector(),
resource_memory=notebook.resource_memory,
resource_cpu=notebook.resource_cpu,
resource_memory="0G~"+notebook.resource_memory,
resource_cpu="0~"+notebook.resource_cpu,
resource_gpu=notebook.resource_gpu,
image_pull_policy=notebook.image_pull_policy,
image_pull_secrets=image_secrets,
@ -414,6 +417,7 @@ class Notebook_ModelView_Base():
@expose('/reset/<notebook_id>',methods=['GET','POST'])
def reset(self,notebook_id):
notebook = db.session.query(Notebook).filter_by(id=notebook_id).first()
try:
notebook_crd = self.reset_notebook(notebook)
flash('已重置Running状态后可进入。注意notebook会定时清理如要运行长期任务请在pipeline中创建任务流进行。','warning')

View File

@ -32,7 +32,7 @@ from myapp.views.view_task import Task_ModelView
from sqlalchemy import and_, or_, select
from myapp.exceptions import MyappException
from wtforms import BooleanField, IntegerField,StringField, SelectField,FloatField,DateField,DateTimeField,SelectMultipleField,FormField,FieldList
from myapp.project import push_message,push_admin
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget,BS3PasswordFieldWidget,DatePickerWidget,DateTimePickerWidget,Select2ManyWidget,Select2Widget,BS3TextAreaFieldWidget
from myapp.forms import MyBS3TextAreaFieldWidget,MySelect2Widget,MyCodeArea,MyLineSeparatedListField,MyJSONField,MyBS3TextFieldWidget,MySelectMultipleField
from myapp.utils.py import py_k8s
@ -253,7 +253,7 @@ def dag_to_pipeline(pipeline,dbsession,**kwargs):
else:
task_command += " && ".join(commands)
job_template_entrypoint = task.job_template.entrypoint.strip().replace(" ",'') if task.job_template.entrypoint else ''
job_template_entrypoint = task.job_template.entrypoint.strip() if task.job_template.entrypoint else ''
command=None
@ -261,7 +261,7 @@ def dag_to_pipeline(pipeline,dbsession,**kwargs):
command = job_template_entrypoint
if task_command:
command = task_command.replace(" ",'')
command = task_command
# entrypoint = task.job_template.images.entrypoint
@ -302,11 +302,14 @@ def dag_to_pipeline(pipeline,dbsession,**kwargs):
# if task_command:
# task_command = task_command.split(' ')
# task_command = [command for command in task_command if command]
command = command.split(' ') if command else []
command = [com for com in command if com]
ops = kfp.dsl.ContainerOp(
name=task.name,
image=task.job_template.images.name,
arguments=ops_args,
command=command.split(' ') if command else None,
command=command if command else None,
container_kwargs=container_kwargs,
file_outputs=json.loads(task.outputs) if task.outputs and json.loads(task.outputs) else None
)
@ -413,7 +416,7 @@ def dag_to_pipeline(pipeline,dbsession,**kwargs):
pod_anti_affinity=k8s_client.V1PodAntiAffinity(
preferred_during_scheduling_ignored_during_execution=[
k8s_client.V1WeightedPodAffinityTerm(
weight=5,
weight=20,
pod_affinity_term=k8s_client.V1PodAffinityTerm(
label_selector=k8s_client.V1LabelSelector(
match_labels={
@ -491,27 +494,25 @@ def dag_to_pipeline(pipeline,dbsession,**kwargs):
# 配置拉取秘钥。本来在contain里面workflow在外面
task_temp = all_tasks[task_name]
if task_temp.job_template.images.repository.hubsecret:
hubsecret = task_temp.job_template.images.repository.hubsecret
hubsecret = task_temp.job_template.images.repository.hubsecret
if hubsecret not in hubsecret_list:
hubsecret_list.append(hubsecret)
pipeline_conf.image_pull_secrets.append(k8s_client.V1LocalObjectReference(name=hubsecret))
# 配置host
hostAliases = conf.get('HOSTALIASES', '')
if task_temp.job_template.hostAliases:
hostAliases+="\n"+ task_temp.job_template.hostAliases
if hostAliases:
hostAliases_list = re.split('\r|\n', hostAliases)
hostAliases_list = [host.strip() for host in hostAliases_list if host.strip()]
for row in hostAliases_list:
hosts = row.strip().split(' ')
hosts = [host for host in hosts if host]
# print(hosts)
# print('------------')
if len(hosts) > 1:
pipeline_conf.set_host_aliases(ip=hosts[0],hostnames=hosts[1:])
# # 配置host 在kfp中并不生效
# hostAliases = conf.get('HOSTALIASES', '')
# if task_temp.job_template.hostAliases:
# hostAliases+="\n"+ task_temp.job_template.hostAliases
# if hostAliases:
# hostAliases_list = re.split('\r|\n', hostAliases)
# hostAliases_list = [host.strip() for host in hostAliases_list if host.strip()]
# for row in hostAliases_list:
# hosts = row.strip().split(' ')
# hosts = [host for host in hosts if host]
#
# if len(hosts) > 1:
# pipeline_conf.set_host_aliases(ip=hosts[0],hostnames=hosts[1:])
# 配置默认拉取策略
@ -532,8 +533,6 @@ def dag_to_pipeline(pipeline,dbsession,**kwargs):
pipeline_conf.labels['pipeline-id'] = str(pipeline.id)
pipeline_conf.labels['run-id'] = global_envs.get('KFJ_RUN_ID','') # 以此来绑定运行时id不能用kfp的run—id。那个是传到kfp以后才产生的。
kfp.compiler.Compiler().compile(my_pipeline, pipeline.name+'.yaml',pipeline_conf=pipeline_conf)
file = open(pipeline.name+'.yaml',mode='rb')
pipeline_file = template_str(str(file.read(),encoding='utf-8'))
@ -581,7 +580,7 @@ def upload_pipeline(pipeline_file,pipeline_name,kfp_host,pipeline_argo_id):
@pysnooper.snoop(watch_explode=())
# @pysnooper.snoop(watch_explode=())
def run_pipeline(pipeline_file,pipeline_name,kfp_host,pipeline_argo_id,pipeline_argo_version_id):
# logging.info(pipeline)
# return
@ -589,7 +588,6 @@ def run_pipeline(pipeline_file,pipeline_name,kfp_host,pipeline_argo_id,pipeline_
if not pipeline_argo_id or not pipeline_argo_version_id:
pipeline_argo_id,pipeline_argo_version_id = upload_pipeline(pipeline_file,pipeline_name,kfp_host,pipeline_argo_id) # 必须上传新版本
client = kfp.Client(kfp_host)
# 先创建一个实验在在这个实验中运行指定pipeline
experiment=None
@ -626,7 +624,7 @@ class Pipeline_ModelView_Base():
order_columns = ['id']
list_columns = ['id','project','pipeline_url','creator','modified']
add_columns = ['project','name','describe','namespace','schedule_type','cron_time','depends_on_past','max_active_runs','parallelism','global_env','alert_status','alert_user']
add_columns = ['project','name','describe','namespace','schedule_type','cron_time','depends_on_past','max_active_runs','parallelism','global_env','alert_status','alert_user','parameter']
show_columns = ['project','name','describe','namespace','schedule_type','cron_time','depends_on_past','max_active_runs','parallelism','global_env','dag_json_html','pipeline_file_html','pipeline_argo_id','version_id','run_id','created_by','changed_by','created_on','changed_on','expand_html','parameter_html']
edit_columns = add_columns
@ -684,6 +682,13 @@ class Pipeline_ModelView_Base():
default=1,
validators=[DataRequired()]
),
"expired_limit": IntegerField(
_(datamodel.obj.lab('expired_limit')),
description="定时调度过期记录保留个数",
widget=BS3TextFieldWidget(),
default=1,
validators=[DataRequired()]
),
"parallelism": IntegerField(
_(datamodel.obj.lab('parallelism')),
description="一个任务流实例中可同时运行的task数目",
@ -728,6 +733,19 @@ class Pipeline_ModelView_Base():
related_views = [Task_ModelView, ]
# 检测是否具有编辑权限只有creator和admin可以编辑
def check_edit_permission(self, item):
user_roles = [role.name.lower() for role in list(get_user_roles())]
if "admin" in user_roles:
return True
if g.user and g.user.username and hasattr(item,'created_by'):
if g.user.username==item.created_by.username:
return True
flash('just creator can edit/delete ', 'warning')
return False
# 验证args参数
# @pysnooper.snoop(watch_explode=('item'))
def pipeline_args_check(self, item):
@ -814,6 +832,7 @@ class Pipeline_ModelView_Base():
item.parameter = json.dumps({"cronjob_start_time":datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}, indent=4, ensure_ascii=False)
# @pysnooper.snoop()
def pre_update(self, item):
@ -827,15 +846,29 @@ class Pipeline_ModelView_Base():
self.merge_upstream(item)
self.pipeline_args_check(item)
item.change_datetime = datetime.datetime.now()
if item.parameter:
item.parameter = json.dumps(json.loads(item.parameter),indent=4,ensure_ascii=False)
else:
item.parameter = '{}'
if (item.schedule_type=='crontab' and self.src_item_json.get("schedule_type")=='once') or (item.cron_time!=self.src_item_json.get("cron_time",'')):
parameter = json.loads(item.parameter if item.parameter else '{}')
parameter.update({"cronjob_start_time":datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
item.parameter = json.dumps(parameter,indent=4,ensure_ascii=False)
# 限制提醒
if item.schedule_type=='crontab':
if not item.project.node_selector:
flash('无法保障公共集群的稳定性,定时任务请选择专门的日更集群项目组','warning')
else:
org = item.project.node_selector.replace('org=','')
if not org or org=='public':
flash('无法保障公共集群的稳定性,定时任务请选择专门的日更集群项目组','warning')
def pre_update_get(self,item):
item.dag_json = item.fix_dag_json()
item.expand = json.dumps(item.fix_expand(),indent=4,ensure_ascii=False)
# item.expand = json.dumps(item.fix_expand(),indent=4,ensure_ascii=False)
db.session.commit()
# 删除前先把下面的task删除了
@ -862,6 +895,20 @@ class Pipeline_ModelView_Base():
return json_response(message=str(e),status=-1,result={})
@expose("/demo/list/")
@pysnooper.snoop()
def demo(self):
try:
pipelines = db.session.query(Pipeline).filter(Pipeline.parameter.contains('"demo": "true"')).all()
back=[]
for pipeline in pipelines:
back.append(pipeline.to_json())
return json_response(message='success',status=0,result=back)
except Exception as e:
print(e)
return json_response(message=str(e),status=-1,result={})
@event_logger.log_this
@expose("/list/")
@has_access
@ -937,6 +984,7 @@ class Pipeline_ModelView_Base():
namespace=crd['namespace'],
run_id = run_id
)
push_message(conf.get('ADMIN_USER', '').split(','),'手动运行新的pipeline %s进而删除旧的pipeline run-id: %s' % (pipeline.describe,run_id,))
if db_crd:
db_crd.status='Deleted'
db_crd.change_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
@ -1052,7 +1100,7 @@ class Pipeline_ModelView_Base():
pipeline = db.session.query(Pipeline).filter_by(id=pipeline_id).first()
pipeline.dag_json = pipeline.fix_dag_json()
pipeline.expand = json.dumps(pipeline.fix_expand(), indent=4, ensure_ascii=False)
# pipeline.expand = json.dumps(pipeline.fix_expand(), indent=4, ensure_ascii=False)
# db_tasks = pipeline.get_tasks(db.session)
# if db_tasks:
@ -1096,6 +1144,26 @@ class Pipeline_ModelView_Base():
return redirect('/pipeline_modelview/web/%s'%pipeline.id)
# # @event_logger.log_this
@expose("/web/monitoring/<pipeline_id>", methods=["GET"])
def web_monitoring(self,pipeline_id):
pipeline = db.session.query(Pipeline).filter_by(id=int(pipeline_id)).first()
if pipeline.run_id:
data = {
"url": pipeline.project.cluster.get('GRAFANA_TASK','') + pipeline.name,
# "target": "div.page_f1flacxk:nth-of-type(0)", # "div.page_f1flacxk:nth-of-type(0)",
"delay":1000,
"loading": True
}
# 返回模板
if pipeline.project.cluster['NAME']==conf.get('ENVIRONMENT'):
return self.render_template('link.html', data=data)
else:
return self.render_template('external_link.html', data=data)
else:
flash('no running instance','warning')
return redirect('/pipeline_modelview/web/%s'%pipeline.id)
# # @event_logger.log_this
@expose("/web/pod/<pipeline_id>", methods=["GET"])
def web_pod(self,pipeline_id):
@ -1113,7 +1181,7 @@ class Pipeline_ModelView_Base():
return self.render_template('external_link.html', data=data)
@pysnooper.snoop(watch_explode=('expand'))
# @pysnooper.snoop(watch_explode=('expand'))
def copy_db(self,pipeline):
new_pipeline = pipeline.clone()
expand = json.loads(pipeline.expand) if pipeline.expand else {}

View File

@ -11,6 +11,7 @@ from flask_appbuilder.actions import action
from myapp import app, appbuilder,db,event_logger
import logging
import re
import copy
import uuid
import requests
from myapp.exceptions import MyappException
@ -168,7 +169,7 @@ class Service_ModelView(MyappModelView):
namespace = conf.get('SERVICE_NAMESPACE')
# 设定notebook的机器选择器
# 设定service的机器选择器
if core.get_gpu(service.resource_gpu)[0]:
service.node_selector = service.node_selector.replace('cpu=true','gpu=true')
db.session.commit()
@ -201,12 +202,15 @@ class Service_ModelView(MyappModelView):
ports=[int(port) for port in service.ports.split(',')]
)
k8s.create_service(namespace=namespace,
name=service.name,
username=service.created_by.username,
ports=[int(port) for port in service.ports.split(',')]
)
ports = [int(port) for port in service.ports.split(',')]
k8s.create_service(
namespace=namespace,
name=service.name,
username=service.created_by.username,
ports=ports
)
# 如果域名配置的gateway就用这个
host = service.name+"."+conf.get('SERVICE_DOMAIN')
if service.host:
@ -217,6 +221,21 @@ class Service_ModelView(MyappModelView):
ports=service.ports.split(',')
)
# 以ip形式访问的话使用的代理ip。不然不好处理机器服务化机器扩容和缩容时ip变化
SERVICE_EXTERNAL_IP = conf.get('SERVICE_EXTERNAL_IP',None)
if SERVICE_EXTERNAL_IP:
service_ports = [[30000+10*service.id+index,port] for index,port in enumerate(ports)]
k8s.create_service(
namespace=namespace,
name=(service.name+"-external")[:60],
username=service.created_by.username,
ports=service_ports,
selector={"app": service.name, 'user': service.created_by.username},
externalIPs=conf.get('SERVICE_EXTERNAL_IP',None)
)
# # 创建虚拟服务做代理
# crd_info = conf.get('CRD_INFO', {}).get('virtualservice', {})
# crd_name = "service-%s"%service.name
@ -275,6 +294,7 @@ class Service_ModelView(MyappModelView):
return redirect('/service_modelview/list/')
@expose('/link/<service_id>')
def link(self, service_id):
service = db.session.query(Service).filter_by(id=service_id).first()

View File

@ -211,6 +211,19 @@ class Task_ModelView_Base():
del form._fields['job_describe'] # 不处理这个字段
# 检测是否具有编辑权限只有creator和admin可以编辑
def check_edit_permission(self, item):
user_roles = [role.name.lower() for role in list(get_user_roles())]
if "admin" in user_roles:
return True
if g.user and g.user.username and item.pipeline and hasattr(item.pipeline,'created_by'):
if g.user.username==item.pipeline.created_by.username:
return True
flash('just creator can edit/delete ', 'warning')
return False
# 验证args参数
# @pysnooper.snoop(watch_explode=('item'))
def task_args_check(self,item):
@ -370,6 +383,7 @@ class Task_ModelView_Base():
# 因为删除就找不到pipeline了
def pre_delete(self, item):
self.check_redirect_list_url = '/pipeline_modelview/edit/' + str(item.pipeline.id)
self.pipeline = item.pipeline
@ -408,23 +422,24 @@ class Task_ModelView_Base():
"list": MyLineSeparatedListField
}
@event_logger.log_this
@expose("/delete/<pk>")
@has_access
def delete(self, pk):
pk = self._deserialize_pk_if_composite(pk)
self.src_item_object = self.datamodel.get(pk, self._base_filters)
if self.check_redirect_list_url:
self.check_redirect_list_url = '/pipeline_modelview/edit/' + str(self.src_item_object.pipeline.id)
try:
self.check_edit_permission(self.src_item_object)
except Exception as e:
print(e)
flash(str(e), 'warning')
return redirect(self.check_redirect_list_url)
self._delete(pk)
return self.post_delete_redirect()
# @event_logger.log_this
# @expose("/delete/<pk>")
# @has_access
# def delete(self, pk):
# pk = self._deserialize_pk_if_composite(pk)
# self.src_item_object = self.datamodel.get(pk, self._base_filters)
# if self.check_redirect_list_url:
# self.check_redirect_list_url = '/pipeline_modelview/edit/' + str(self.src_item_object.pipeline.id)
# try:
# self.check_edit_permission(self.src_item_object)
# except Exception as e:
# print(e)
# flash(str(e), 'warning')
# return redirect(self.check_redirect_list_url)
#
# self._delete(pk)
# return self.post_delete_redirect()
def run_pod(self,task,k8s_client,run_id,namespace,pod_name,image,working_dir,command,args):
@ -503,9 +518,10 @@ class Task_ModelView_Base():
@expose("/debug/<task_id>", methods=["GET", "POST"])
def debug(self,task_id):
task = db.session.query(Task).filter_by(id=task_id).first()
if not g.user.is_admin() and task.job_template.created_by.username!=g.user.username:
flash('仅管理员或当前任务模板创建者可启动debug模式', 'warning')
return redirect('/pipeline_modelview/web/%s' % str(task.pipeline.id))
if task.job_template.name != conf.get('CUSTOMIZE_JOB'):
if not g.user.is_admin() and task.job_template.created_by.username!=g.user.username:
flash('仅管理员或当前任务模板创建者可启动debug模式', 'warning')
return redirect('/pipeline_modelview/web/%s' % str(task.pipeline.id))
from myapp.utils.py.py_k8s import K8s

View File

@ -7,7 +7,7 @@ from flask_babel import gettext as __
from myapp.models.model_job import Repository,Images,Job_Template,Task,Pipeline,Workflow,Tfjob,Xgbjob,RunHistory,Pytorchjob
from myapp.models.model_team import Project,Project_User
from flask_appbuilder.actions import action
from myapp.project import push_message,push_admin
from myapp import app, appbuilder,db,event_logger
from sqlalchemy import and_, or_, select
@ -151,6 +151,8 @@ class Crd_ModelView_Base():
item.status='Deleted'
item.change_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
db.session.commit()
push_message(conf.get('ADMIN_USER', '').split(','), '手动触发stop %s %s' % (crd_info['plural'],item.name))
except Exception as e:
flash(str(e), "danger")
@ -247,7 +249,7 @@ class Workflow_ModelView(Crd_ModelView_Base,MyappModelView,DeleteMixin):
label_title = '运行实例'
datamodel = SQLAInterface(Workflow)
list_columns = ['name','project','pipeline_url', 'namespace_url', 'create_time','change_time', 'final_status','status', 'username', 'log','stop']
list_columns = ['name','project','pipeline_url', 'namespace_url','execution_date', 'create_time','change_time', 'final_status','status', 'username', 'log','stop']
crd_name = 'workflow'
appbuilder.add_view(Workflow_ModelView,"运行实例",href='/workflow_modelview/list/?_flt_2_name=&_flt_2_labels=',icon = 'fa-tasks',category = '训练')