add cache

This commit is contained in:
cdllp2 2022-08-28 20:24:10 +08:00
parent b3a65bb09b
commit 299aefb033
13 changed files with 183 additions and 158 deletions

View File

@ -113,10 +113,12 @@ class MyCodeArea(object):
from wtforms import widgets
class MyBS3TextAreaFieldWidget(widgets.TextArea):
def __init__(self, rows=3,readonly=0):
def __init__(self, rows=3,readonly=0,expand_filed=None): # 扩展成list类型字段
self.rows=rows
self.readonly = readonly
self.expand_filed=expand_filed
return super(MyBS3TextAreaFieldWidget, self).__init__()
def __call__(self, field, **kwargs):
kwargs["class"] = u"form-control"
kwargs["rows"] = self.rows
@ -128,9 +130,11 @@ class MyBS3TextAreaFieldWidget(widgets.TextArea):
class MyBS3TextFieldWidget(widgets.TextInput):
def __init__(self, value='',readonly=0):
def __init__(self, value='',readonly=0,is_date=False,is_date_range=False):
self.value=value
self.readonly = readonly
self.is_date=is_date
self.is_date_range=is_date_range
return super(MyBS3TextFieldWidget, self).__init__()
def __call__(self, field, **kwargs):

View File

@ -249,6 +249,7 @@ class MyappModelBase():
"priority":"优先级",
"owner": "责任人",
"industry":"行业",
"skip":"跳过",
"etl_pipeline": "任务流",
"etl_pipeline_id": "任务流id",
"etl_task": "任务",
@ -268,7 +269,8 @@ class MyappModelBase():
"update_time":"更新时间",
"changed_on": "修改时间",
"change_time":"更新时间",
"modified": "修改时间"
"modified": "修改时间",
"cronjob_start_time":"补录起点"
}
# 获取列的中文显示

View File

@ -338,8 +338,6 @@ class MyappSecurityManager(SecurityManager):
@staticmethod
def before_request():
g.user = current_user
# if len(request.path)>7 and request.path[:7]!='/static' and g.user and hasattr(g.user, 'username'):
# logging.info('------------%s(%s):%s'%(request.method,g.user.username,request.path))
def __init__(self, appbuilder):
super(MyappSecurityManager, self).__init__(appbuilder)
@ -534,7 +532,7 @@ class MyappSecurityManager(SecurityManager):
# 添加注册远程用户
# @pysnooper.snoop()
def auth_user_remote_org_user(self, username,org_name='',password=''):
def auth_user_remote_org_user(self, username,org_name='',password='',email='',first_name='',last_name=''):
if not username:
return None
# 查找用户
@ -546,17 +544,22 @@ class MyappSecurityManager(SecurityManager):
if user is None:
user = self.add_org_user(
username=username,
first_name=username,
last_name=username,
first_name=first_name if first_name else username,
last_name=last_name if last_name else username,
password=password,
org=org_name, # 添加组织架构
email=username + "@tencent.com",
email=username + "@tencent.com" if not email else email,
roles=[self.find_role(self.auth_user_registration_role),rtx_role] if self.find_role(self.auth_user_registration_role) else [rtx_role,] # org_role 添加gamma默认角色, 组织架构角色先不自动添加
)
elif not user.is_active: # 如果用户未激活不允许接入
print(LOGMSG_WAR_SEC_LOGIN_FAILED.format(username))
return None
if user:
user.org = org_name if org_name else user.org
user.email = email if email else user.email
user.first_name = first_name if first_name else user.first_name
user.last_name = last_name if last_name else user.last_name
gamma_role = self.find_role(self.auth_user_registration_role)
if gamma_role and gamma_role not in user.roles:
user.roles.append(gamma_role)

View File

@ -593,7 +593,8 @@ class K8s():
# @pysnooper.snoop()
def get_volume_mounts(self,volume_mount,username):
@staticmethod
def get_volume_mounts(volume_mount,username):
k8s_volumes = []
k8s_volume_mounts = []
if volume_mount and ":" in volume_mount:

View File

@ -80,7 +80,7 @@ from flask_appbuilder.exceptions import FABException, InvalidOrderByColumnFABExc
from flask_appbuilder.security.decorators import permission_name, protect,has_access
from flask_appbuilder.api import BaseModelApi,BaseApi,ModelRestApi
from sqlalchemy.sql import sqltypes
from myapp import app, appbuilder,db,event_logger
from myapp import app, appbuilder,db,event_logger,cache
conf = app.config
log = logging.getLogger(__name__)
@ -501,10 +501,17 @@ class MyappModelRestApi(ModelRestApi):
col_info['choices'] = column_field_kwargs.get('choices', [])
if 'widget' in column_field_kwargs:
col_info['widget'] = column_field_kwargs['widget'].__class__.__name__.replace('Widget', '').replace('Field','').replace('My', '')
col_info['disable'] = column_field_kwargs['widget'].readonly if hasattr(column_field_kwargs['widget'],'readonly') else False
# if hasattr(column_field_kwargs['widget'],'can_input'):
# print(field.name,column_field_kwargs['widget'].can_input)
col_info['ui-type'] = 'input-select' if hasattr(column_field_kwargs['widget'], 'can_input') and column_field_kwargs['widget'].can_input else False
if hasattr(column_field_kwargs['widget'],'readonly') and column_field_kwargs['widget'].readonly:
col_info['disable'] = True
# 处理可选可填类型
if hasattr(column_field_kwargs['widget'], 'can_input') and column_field_kwargs['widget'].can_input:
col_info['ui-type'] = 'input-select'
# 处理时间类型
if hasattr(column_field_kwargs['widget'], 'is_date') and column_field_kwargs['widget'].is_date:
col_info['ui-type'] = 'datePicker'
# 处理时间类型
if hasattr(column_field_kwargs['widget'], 'is_date_range') and column_field_kwargs['widget'].is_date_range:
col_info['ui-type'] = 'rangePicker'
col_info = self.make_ui_info(col_info)
ret.append(col_info)
@ -654,7 +661,29 @@ class MyappModelRestApi(ModelRestApi):
search_filters[col]['type'] = column_field.field_class.__name__.replace('Field', '').replace('My','')
search_filters[col]['choices'] = column_field_kwargs.get('choices', [])
# 选-填 字段在搜索时为填写字段
search_filters[col]['ui-type'] = 'input' if hasattr(column_field_kwargs.get('widget',{}),'can_input') and column_field_kwargs['widget'].can_input else False
if hasattr(column_field_kwargs.get('widget', {}), 'can_input') and column_field_kwargs['widget'].can_input:
search_filters[col]['ui-type'] = 'input'
# 对于那种配置使用过往记录作为可选值的参数进行处理
if hasattr(column_field_kwargs.get('widget', {}), 'conten2choices') and column_field_kwargs['widget'].conten2choices:
# 先从缓存中拿结果
field_contents = None
try:
field_contents = cache.get(self.datamodel.obj.__tablename__ + "_" + col)
except Exception as e:
print(e)
# 缓存没有数据,再从数据库中读取
if not field_contents:
try:
field_contents = db.session.query(getattr(self.datamodel.obj, col)).group_by(getattr(self.datamodel.obj, col)).all()
field_contents = list(set([item[0] for item in field_contents]))
cache.set(self.datamodel.obj.__tablename__ + "_" + col, field_contents,timeout=60 * 60)
except Exception as e:
print(e)
if field_contents:
search_filters[col]['ui-type'] = 'select'
search_filters[col]['choices']= [[x, x] for x in list(set(field_contents))]
search_filters[col] = self.make_ui_info(search_filters[col])
# 多选字段在搜索时为单选字段
@ -1530,7 +1559,7 @@ class MyappModelRestApi(ModelRestApi):
if choices:
values=[]
for choice in choices:
if len(choice)==2:
if choice and len(choice)==2:
values.append({
"id":choice[0],
"value":choice[1]
@ -1540,11 +1569,13 @@ class MyappModelRestApi(ModelRestApi):
ret['ui-type']='select2' if 'SelectMultiple' in ret['type'] else 'select'
# 字符串
if ret.get('type','') in ['String',]:
if ret.get('widget','BS3Text')=='BS3Text':
ret['ui-type'] = 'input'
else:
ret['ui-type'] = 'textArea'
if ret.get('ui-type','') not in ['list','datePicker']: # list,datePicker 类型,保持原样
if ret.get('type','') in ['String',]:
if ret.get('widget','BS3Text')=='BS3Text':
ret['ui-type'] = 'input'
else:
ret['ui-type'] = 'textArea'
# 长文本输入
if 'text' in ret.get('type','').lower():
ret['ui-type'] = 'textArea'
@ -1666,20 +1697,50 @@ class MyappModelRestApi(ModelRestApi):
ret['choices'] = column_field_kwargs.get('choices', [])
if 'widget' in column_field_kwargs:
ret['widget']=column_field_kwargs['widget'].__class__.__name__.replace('Widget','').replace('Field','').replace('My','')
ret['disable']=column_field_kwargs['widget'].readonly if hasattr(column_field_kwargs['widget'],'readonly') else False
ret['retry_info'] = column_field_kwargs['widget'].retry_info if hasattr(column_field_kwargs['widget'],'retry_info') else False
# if hasattr(column_field_kwargs['widget'],'can_input'):
# print(field.name,column_field_kwargs['widget'].can_input)
ret['ui-type'] = 'input-select' if hasattr(column_field_kwargs['widget'],'can_input') and column_field_kwargs['widget'].can_input else False
# 对于那种配置使用过往记录作为可选值的参数进行处理
# 处理禁止编辑
if hasattr(column_field_kwargs['widget'], 'readonly') and column_field_kwargs['widget'].readonly:
ret['disable']=True
# 处理重新拉取info
if hasattr(column_field_kwargs['widget'], 'retry_info') and column_field_kwargs['widget'].retry_info:
ret['retry_info'] = True
# 处理选填类型
if hasattr(column_field_kwargs['widget'], 'can_input') and column_field_kwargs['widget'].can_input:
ret['ui-type'] = 'input-select'
# 处理时间类型
if hasattr(column_field_kwargs['widget'], 'is_date') and column_field_kwargs['widget'].is_date:
ret['ui-type'] = 'datePicker'
# 处理时间类型
if hasattr(column_field_kwargs['widget'], 'is_date_range') and column_field_kwargs['widget'].is_date_range:
ret['ui-type'] = 'rangePicker'
# 处理扩展字段一个字段存储一个list的值
if hasattr(column_field_kwargs['widget'], 'expand_filed') and column_field_kwargs['widget'].expand_filed:
print(field.name)
ret['ui-type'] = 'list'
ret["info"]=self.columnsfield2info(column_field_kwargs['widget'].expand_filed)
# 处理内容自动填充可取值,对于那种配置使用过往记录作为可选值的参数进行处理
if hasattr(column_field_kwargs['widget'], 'conten2choices') and column_field_kwargs['widget'].conten2choices:
# 先从缓存中拿结果
field_contents=None
try:
field_contents = db.session.query(getattr(self.datamodel.obj,field.name)).group_by(getattr(self.datamodel.obj,field.name)).all()
field_contents = [item[0] for item in field_contents]
if field_contents:
ret['choices']=[[x,x] for x in list(set(field_contents))]
field_contents = cache.get(self.datamodel.obj.__tablename__+"_"+field.name)
except Exception as e:
print(e)
# 缓存没有数据,再从数据库中读取
if not field_contents:
try:
field_contents = db.session.query(getattr(self.datamodel.obj,field.name)).group_by(getattr(self.datamodel.obj,field.name)).all()
field_contents = list(set([item[0] for item in field_contents]))
cache.set(self.datamodel.obj.__tablename__+"_"+field.name, field_contents, timeout=60 * 60)
except Exception as e:
print(e)
if field_contents:
ret['choices'] = [[x, x] for x in list(set(field_contents))]
# 补充数据库model中定义的是否必填

View File

@ -689,33 +689,11 @@ class Myapp(BaseMyappView):
return self.render_template('external_link.html', data=data)
# @expose('/schedule/node/<ip>')
# def schedule_node(self,ip):
# all_node_json = resource_used['data']
# for cluster_name in all_node_json:
# nodes = all_node_json[cluster_name]
# if ip in nodes:
# clusters = conf.get('CLUSTERS', {})
# cluster = clusters[cluster_name]
# k8s_client = K8s(cluster.get('KUBECONFIG',''))
# # 获取最新的节点信息
# nodes = k8s_client.get_node(ip=ip)
# if nodes:
# node = nodes[0]
# enable_train = node['labels'].get('train','true')
# k8s_client.label_node([ip],{"train":"false" if enable_train=='true' else "true"})
# break
#
# return redirect('/myapp/home')
# from myapp import tracer
# 机器学习首页资源弹窗
def mlops_traffic(self,url):
if 1 or not node_resource_used['check_time'] or node_resource_used['check_time'] < (
datetime.datetime.now() - datetime.timedelta(minutes=10)):
if 1 or not node_resource_used['check_time'] or node_resource_used['check_time'] < (datetime.datetime.now() - datetime.timedelta(minutes=10)):
clusters = conf.get('CLUSTERS', {})
for cluster_name in clusters:
cluster = clusters[cluster_name]
@ -960,7 +938,21 @@ class Myapp(BaseMyappView):
url = request.values.get("url", type=str, default=None)
print(url)
if '/myapp/home' in url:
return self.mlops_traffic(url)
try:
return self.mlops_traffic(url)
except Exception as e:
print(e)
data = {
'content': '未能成功获取负算力负载信息请检查kubeconfig文件配置',
'delay': 30000,
'hit': True,
'target': url,
'title': '检查失败',
'type': 'html',
}
flash('未能成功获取负算力负载信息', 'warning')
return jsonify(data)
if url=='/train/total_resource':
if g.user.username in conf.get('ADMIN_USER',''):

View File

@ -91,8 +91,8 @@ class InferenceService_ModelView_base():
# add_columns = ['service_type','project','name', 'label','images','resource_memory','resource_cpu','resource_gpu','min_replicas','max_replicas','ports','host','hpa','metrics','health']
add_columns = ['service_type', 'project', 'label', 'model_name', 'model_version', 'images', 'model_path', 'resource_memory', 'resource_cpu', 'resource_gpu', 'min_replicas', 'max_replicas', 'hpa','priority', 'canary', 'shadow', 'host','inference_config', 'working_dir', 'command','volume_mount', 'env', 'ports', 'metrics', 'health','expand','sidecar']
show_columns = ['service_type','project', 'name', 'label','model_name', 'model_version', 'images', 'model_path', 'model_input', 'model_output', 'images', 'volume_mount','sidecar','working_dir', 'command', 'env', 'resource_memory',
'resource_cpu', 'resource_gpu', 'min_replicas', 'max_replicas', 'ports', 'host','hpa','priority', 'canary', 'shadow', 'health','model_status','expand','metrics','deploy_history','inference_config','metrics']
show_columns = ['service_type','project', 'name', 'label','model_name', 'model_version', 'images', 'model_path', 'images', 'volume_mount','sidecar','working_dir', 'command', 'env', 'resource_memory',
'resource_cpu', 'resource_gpu', 'min_replicas', 'max_replicas', 'ports', 'inference_host_url','hpa','priority', 'canary', 'shadow', 'health','model_status','expand','metrics','deploy_history','host','inference_config']
edit_columns = add_columns
@ -101,20 +101,19 @@ class InferenceService_ModelView_base():
}
edit_form_query_rel_fields = add_form_query_rel_fields
list_columns = ['project','service_type','label','model_name_url','model_version','inference_host_url','ip','model_status','resource','creator','modified','operate_html']
list_columns = ['project','service_type','label','model_name_url','model_version','inference_host_url','ip','model_status','resource','replicas_html','creator','modified','operate_html']
cols_width={
"project":{"type": "ellip2", "width": 150},
"label": {"type": "ellip1", "width": 250},
"label": {"type": "ellip2", "width": 300},
"service_type": {"type": "ellip2", "width": 100},
"model_name_url":{"type": "ellip2", "width": 300},
"model_name_url":{"type": "ellip2", "width": 250},
"model_version": {"type": "ellip2", "width": 200},
"inference_host_url": {"type": "ellip2", "width": 500},
"ip": {"type": "ellip2", "width": 200},
"model_status": {"type": "ellip2", "width": 100},
"modified": {"type": "ellip2", "width": 150},
"operate_html": {"type": "ellip2", "width": 350},
"resource": {"type": "ellip2", "width": 300},
"resource": {"type": "ellip2", "width": 250},
}
search_columns = ['name','created_by','project','service_type','label','model_name','model_version','model_path','host','model_status','resource_gpu']
@ -128,9 +127,10 @@ class InferenceService_ModelView_base():
for item in INFERNENCE_IMAGES:
images += item
service_type_choices= ['serving','tfserving','torch-server','onnxruntime','triton-server']
sepc_label_columns = {
spec_label_columns = {
# "host": _("域名测试环境test.xx调试环境 debug.xx"),
"resource":"资源"
"resource":"资源",
"replicas_html":"副本数"
}
service_type_choices = [x.replace('_','-') for x in service_type_choices]
add_form_extra_fields={
@ -1063,7 +1063,7 @@ class InferenceService_ModelView(InferenceService_ModelView_base,MyappModelView)
datamodel = SQLAInterface(InferenceService)
appbuilder.add_view(InferenceService_ModelView,"推理服务",icon = 'fa-space-shuttle',category = '服务化')
appbuilder.add_view_no_menu(InferenceService_ModelView)
# 添加api
class InferenceService_ModelView_Api(InferenceService_ModelView_base,MyappModelRestApi):

View File

@ -67,7 +67,7 @@ from myapp import security_manager
from myapp.views.view_team import filter_join_org_project
import kfp
from werkzeug.datastructures import FileStorage
from kubernetes import client as k8s_client
from kubernetes import client
from .base import (
api,
BaseMyappView,
@ -332,41 +332,27 @@ def dag_to_pipeline(pipeline,dbsession,**kwargs):
# 添加用户自定义挂载
task.volume_mount=task.volume_mount.strip() if task.volume_mount else ''
if task.volume_mount:
volume_mounts = re.split(',|;',task.volume_mount)
for volume_mount in volume_mounts:
volume,mount = volume_mount.split(":")[0].strip(),volume_mount.split(":")[1].strip()
if "(pvc)" in volume:
pvc_name = volume.replace('(pvc)','').replace(' ','')
temps = re.split('_|\.|/', pvc_name)
temps = [temp for temp in temps if temp]
volumn_name = ('-'.join(temps))[:60].lower().strip('-')
ops=ops.add_volume(k8s_client.V1Volume(name=volumn_name,
persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name=pvc_name)))\
.add_volume_mount(k8s_client.V1VolumeMount(mount_path=os.path.join(mount,task.pipeline.created_by.username), name=volumn_name,sub_path=task.pipeline.created_by.username))
if "(hostpath)" in volume:
hostpath_name = volume.replace('(hostpath)', '').replace(' ', '')
temps = re.split('_|\.|/', hostpath_name)
temps = [temp for temp in temps if temp]
volumn_name = ('-'.join(temps))[:60].lower().strip('-')
try:
k8s_volumes,k8s_volume_mounts = py_k8s.K8s.get_volume_mounts(task.volume_mount,pipeline.created_by.username)
for volume in k8s_volumes:
claim_name = volume.get('persistentVolumeClaim',{}).get('claimName',None)
hostpath = volume.get('hostPath',{}).get('path',None)
configmap_name = volume.get('configMap',{}).get('name',None)
memory_size=volume.get('emptyDir',{}).get('sizeLimit',None) if volume.get('emptyDir',{}).get('medium','')=='Memory' else None
ops = ops.add_volume(client.V1Volume(
name=volume['name'],
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(claim_name=claim_name) if claim_name else None,
host_path=client.V1HostPathVolumeSource(path=hostpath) if hostpath else None,
config_map=client.V1ConfigMapVolumeSource(name=configmap_name) if configmap_name else None,
empty_dir=client.V1EmptyDirVolumeSource(medium='Memory', size_limit=memory_size) if memory_size else None
))
ops = ops.add_volume(k8s_client.V1Volume(name=volumn_name,
host_path=k8s_client.V1HostPathVolumeSource(path=hostpath_name))) \
.add_volume_mount(k8s_client.V1VolumeMount(mount_path=mount, name=volumn_name))
if "(configmap)" in volume:
configmap_name = volume.replace('(configmap)', '').replace(' ', '')
temps = re.split('_|\.|/', configmap_name)
temps = [temp for temp in temps if temp]
volumn_name = ('-'.join(temps))[:60].lower().strip('-')
ops = ops.add_volume(k8s_client.V1Volume(name=volumn_name,
config_map=k8s_client.V1ConfigMapVolumeSource(name=configmap_name))) \
.add_volume_mount(k8s_client.V1VolumeMount(mount_path=mount, name=volumn_name))
if "(memory)" in volume:
memory_size = volume.replace('(memory)', '').replace(' ', '').lower().replace('g','')
volumn_name = ('memory-%s'%memory_size)[:60].lower().strip('-')
ops = ops.add_volume(k8s_client.V1Volume(name=volumn_name,empty_dir=k8s_client.V1EmptyDirVolumeSource(medium='Memory',size_limit='%sGi'%memory_size)))\
.add_volume_mount(k8s_client.V1VolumeMount(mount_path=mount, name=volumn_name))
for mount in k8s_volume_mounts:
mountPath = mount.get('mountPath',None)
subPath = mount.get('subPath',None)
ops = ops.add_volume_mount(client.V1VolumeMount(mount_path=mountPath, name=mount.get('name',''),sub_path=subPath))
except Exception as e:
print(e)
@ -376,17 +362,14 @@ def dag_to_pipeline(pipeline,dbsession,**kwargs):
if type(upstream_tasks)==dict:
upstream_tasks=list(upstream_tasks.keys())
if type(upstream_tasks)==str:
upstream_tasks = upstream_tasks.split(',;')
upstream_tasks = re.split(',|;',upstream_tasks)
# upstream_tasks = upstream_tasks.split(',;')
if type(upstream_tasks)!=list:
raise MyappException('%s upstream is not valid'%task_name)
for upstream_task in upstream_tasks: # 可能存在已删除的upstream_task这个时候是不添加的
add_upstream=False
for task1_name in all_ops:
if task1_name==upstream_task:
ops.after(all_ops[task1_name]) # 配置任务顺序
add_upstream=True
break
if not add_upstream:
for upstream_task in upstream_tasks: # 可能存在已删除的upstream_task
if upstream_task in all_ops:
ops.after(all_ops[upstream_task]) # 配置任务顺序
else:
raise MyappException('%s upstream %s is not exist' % (task_name,upstream_task))
# 添加node selector
@ -417,13 +400,13 @@ def dag_to_pipeline(pipeline,dbsession,**kwargs):
# 添加亲密度控制
affinity = k8s_client.V1Affinity(
pod_anti_affinity=k8s_client.V1PodAntiAffinity(
affinity = client.V1Affinity(
pod_anti_affinity=client.V1PodAntiAffinity(
preferred_during_scheduling_ignored_during_execution=[
k8s_client.V1WeightedPodAffinityTerm(
client.V1WeightedPodAffinityTerm(
weight=80,
pod_affinity_term=k8s_client.V1PodAffinityTerm(
label_selector=k8s_client.V1LabelSelector(
pod_affinity_term=client.V1PodAffinityTerm(
label_selector=client.V1LabelSelector(
match_labels={
# 'job-template':task.job_template.name,
"pipeline-id":str(pipeline.id)
@ -493,7 +476,7 @@ def dag_to_pipeline(pipeline,dbsession,**kwargs):
hubsecret = task_temp.job_template.images.repository.hubsecret
if hubsecret not in hubsecret_list:
hubsecret_list.append(hubsecret)
pipeline_conf.image_pull_secrets.append(k8s_client.V1LocalObjectReference(name=hubsecret))
pipeline_conf.image_pull_secrets.append(client.V1LocalObjectReference(name=hubsecret))
# # 配置host 在kfp中并不生效
@ -546,31 +529,31 @@ def upload_pipeline(pipeline_file,pipeline_name,kfp_host,pipeline_argo_id):
file = open(pipeline_name + '.yaml', mode='wb')
file.write(bytes(pipeline_file,encoding='utf-8'))
file.close()
client = kfp.Client(kfp_host) # pipeline.project.cluster.get('KFP_HOST')
kfp_client = kfp.Client(kfp_host) # pipeline.project.cluster.get('KFP_HOST')
pipeline_argo = None
if pipeline_argo_id:
try:
pipeline_argo = client.get_pipeline(pipeline_argo_id)
pipeline_argo = kfp_client.get_pipeline(pipeline_argo_id)
except Exception as e:
logging.error(e)
if pipeline_argo:
pipeline_argo_version = client.upload_pipeline_version(pipeline_package_path=pipeline_name + '.yaml', pipeline_version_name=pipeline_name+"_version_at_"+datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),pipeline_id=pipeline_argo.id)
pipeline_argo_version = kfp_client.upload_pipeline_version(pipeline_package_path=pipeline_name + '.yaml', pipeline_version_name=pipeline_name+"_version_at_"+datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),pipeline_id=pipeline_argo.id)
time.sleep(1) # 因为创建是异步的要等k8s反应所以有时延
return pipeline_argo.id,pipeline_argo_version.id
else:
exist_pipeline_argo_id = None
try:
exist_pipeline_argo_id = client.get_pipeline_id(pipeline_name)
exist_pipeline_argo_id = kfp_client.get_pipeline_id(pipeline_name)
except Exception as e:
logging.error(e)
if exist_pipeline_argo_id:
pipeline_argo_version = client.upload_pipeline_version(pipeline_package_path=pipeline_name + '.yaml',pipeline_version_name=pipeline_name + "_version_at_" + datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),pipeline_id=exist_pipeline_argo_id)
pipeline_argo_version = kfp_client.upload_pipeline_version(pipeline_package_path=pipeline_name + '.yaml',pipeline_version_name=pipeline_name + "_version_at_" + datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),pipeline_id=exist_pipeline_argo_id)
time.sleep(1)
return exist_pipeline_argo_id,pipeline_argo_version.id
else:
pipeline_argo = client.upload_pipeline(pipeline_name + '.yaml', pipeline_name=pipeline_name)
pipeline_argo = kfp_client.upload_pipeline(pipeline_name + '.yaml', pipeline_name=pipeline_name)
time.sleep(1)
return pipeline_argo.id,pipeline_argo.default_version.id
@ -584,22 +567,22 @@ def run_pipeline(pipeline_file,pipeline_name,kfp_host,pipeline_argo_id,pipeline_
if not pipeline_argo_id or not pipeline_argo_version_id:
pipeline_argo_id,pipeline_argo_version_id = upload_pipeline(pipeline_file,pipeline_name,kfp_host,pipeline_argo_id) # 必须上传新版本
client = kfp.Client(kfp_host)
kfp_client = kfp.Client(kfp_host)
# 先创建一个实验在在这个实验中运行指定pipeline
experiment=None
try:
experiment = client.get_experiment(experiment_name=pipeline_name)
experiment = kfp_client.get_experiment(experiment_name=pipeline_name)
except Exception as e:
logging.error(e)
if not experiment:
try:
experiment = client.create_experiment(name=pipeline_name,description=pipeline_name) # 现在要求describe不能是中文了
experiment = kfp_client.create_experiment(name=pipeline_name,description=pipeline_name) # 现在要求describe不能是中文了
except Exception as e:
print(e)
return None,None,None
# 直接使用pipeline最新的版本运行
try:
run = client.run_pipeline(experiment_id = experiment.id,pipeline_id=pipeline_argo_id,version_id=pipeline_argo_version_id,job_name=pipeline_name+"_version_at_"+datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S'))
run = kfp_client.run_pipeline(experiment_id = experiment.id,pipeline_id=pipeline_argo_id,version_id=pipeline_argo_version_id,job_name=pipeline_name+"_version_at_"+datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S'))
return pipeline_argo_id, pipeline_argo_version_id, run.id
except Exception as e:
print(e)
@ -659,6 +642,7 @@ class Pipeline_ModelView_Base():
),
"dag_json": StringField(
_(datamodel.obj.lab('dag_json')),
default='{}',
widget=MyBS3TextAreaFieldWidget(rows=10), # 传给widget函数的是外层的field对象以及widget函数的参数
),
"namespace": StringField(
@ -677,6 +661,7 @@ class Pipeline_ModelView_Base():
_(datamodel.obj.lab('image_pull_policy')),
description="镜像拉取策略(always为总是拉取远程镜像IfNotPresent为若本地存在则使用本地镜像)",
widget=Select2Widget(),
default='Always',
choices=[['Always','Always'],['IfNotPresent','IfNotPresent']]
),
@ -757,7 +742,7 @@ class Pipeline_ModelView_Base():
return False
# 验证args参数
# 验证args参数,并自动排版dag_json
# @pysnooper.snoop(watch_explode=('item'))
def pipeline_args_check(self, item):
core.validate_str(item.name,'name')
@ -833,12 +818,13 @@ class Pipeline_ModelView_Base():
if dag_json:
pipeline.dag_json = json.dumps(dag_json)
# @pysnooper.snoop()
# @pysnooper.snoop(watch_explode=('item'))
def pre_add(self, item):
if not item.project:
if not item.project or item.project.type!='org':
project=db.session.query(Project).filter_by(name='public').filter_by(type='org').first()
if project:
item.project = project
item.name = item.name.replace('_', '-')[0:54].lower().strip('-')
# item.alert_status = ','.join(item.alert_status)
self.pipeline_args_check(item)
@ -929,26 +915,6 @@ class Pipeline_ModelView_Base():
return json_response(message=str(e),status=-1,result={})
@event_logger.log_this
@expose("/list/")
@has_access
def list(self):
args = request.args.to_dict()
if '_flt_0_created_by' in args and args['_flt_0_created_by']=='':
print(request.url)
print(request.path)
flash('去除过滤条件->查看所有pipeline','success')
return redirect(request.url.replace('_flt_0_created_by=','_flt_0_created_by=%s'%g.user.id))
widgets = self._list()
res = self.render_template(
self.list_template, title=self.list_title, widgets=widgets
)
return res
# 删除手动发起的workflow不删除定时任务发起的workflow
def delete_bind_crd(self,crds):
@ -1269,7 +1235,7 @@ class Pipeline_ModelView(Pipeline_ModelView_Base,MyappModelView,DeleteMixin):
# base_order = ("changed_on", "desc")
# order_columns = ['changed_on']
appbuilder.add_view(Pipeline_ModelView,"任务流",href="/pipeline_modelview/list/?_flt_0_created_by=",icon = 'fa-sitemap',category = '训练')
appbuilder.add_view_no_menu(Pipeline_ModelView)
# 添加api

View File

@ -83,7 +83,7 @@ class RunHistory_ModelView_Base():
class RunHistory_ModelView(RunHistory_ModelView_Base,MyappModelView,DeleteMixin):
datamodel = SQLAInterface(RunHistory)
appbuilder.add_view(RunHistory_ModelView,"定时调度记录",icon = 'fa-clock-o',category = '训练')
appbuilder.add_view_no_menu(RunHistory_ModelView)

View File

@ -841,7 +841,7 @@ class Service_Pipeline_ModelView(Service_Pipeline_ModelView_Base,MyappModelView,
# order_columns = ['changed_on']
appbuilder.add_view(Service_Pipeline_ModelView,"推理pipeline",href="/service_pipeline_modelview/list/",icon = 'fa-sitemap',category = '服务化')
appbuilder.add_view_no_menu(Service_Pipeline_ModelView)
# 添加api
class Service_Pipeline_ModelView_Api(Service_Pipeline_ModelView_Base,MyappModelRestApi):

View File

@ -289,7 +289,7 @@ class Service_ModelView_base():
class Service_ModelView(Service_ModelView_base,MyappModelView,DeleteMixin):
datamodel = SQLAInterface(Service)
appbuilder.add_view(Service_ModelView,"内部服务",icon = 'fa-internet-explorer',category = '服务化')
appbuilder.add_view_no_menu(Service_ModelView)
class Service_ModelView_Api(Service_ModelView_base,MyappModelRestApi):

View File

@ -235,7 +235,6 @@ class Project_ModelView_Base():
# datamodel = SQLAInterface(Project)
# label_title = '模板分类'
#
# appbuilder.add_view(Project_ModelView_job_template,"模板分类",icon = 'fa-tasks',category = '项目组',category_icon = 'fa-users')
class Project_ModelView_job_template_Api(Project_ModelView_Base,MyappModelRestApi):
@ -270,7 +269,6 @@ appbuilder.add_api(Project_ModelView_job_template_Api)
# datamodel = SQLAInterface(Project)
# label_title = '项目分组'
#
# appbuilder.add_view(Project_ModelView_org,"项目分组",icon = 'fa-sitemap',category = '项目组',category_icon = 'fa-users')
#
@ -308,7 +306,6 @@ appbuilder.add_api(Project_ModelView_org_Api)
# datamodel = SQLAInterface(Project)
# label_title = '模型分组'
#
# appbuilder.add_view(Project_ModelView_train_model,"模型分组",icon = 'fa-address-book-o',category = '项目组',category_icon = 'fa-users')
#

View File

@ -261,7 +261,7 @@ class Workflow_ModelView(Workflow_ModelView_Base,MyappModelView,DeleteMixin):
datamodel = SQLAInterface(Workflow)
appbuilder.add_view(Workflow_ModelView,"运行实例",href='/workflow_modelview/list/?_flt_2_name=&_flt_2_labels=',icon = 'fa-tasks',category = '训练')
appbuilder.add_view_no_menu(Workflow_ModelView)
# 添加api
class Workflow_ModelView_Api(Workflow_ModelView_Base,MyappModelRestApi):
@ -276,7 +276,6 @@ appbuilder.add_api(Workflow_ModelView_Api)
# appbuilder.add_separator("训练") # 在指定菜单栏下面的每个子菜单中间添加一个分割线的显示。
#
#
# # list正在运行的tfjob