from flask import render_template,redirect from flask_appbuilder.models.sqla.interface import SQLAInterface from flask_appbuilder import ModelView, ModelRestApi from flask_appbuilder import ModelView,AppBuilder,expose,BaseView,has_access from importlib import reload from flask_babel import gettext as __ from flask_babel import lazy_gettext as _ import uuid import re from kfp import compiler from sqlalchemy.exc import InvalidRequestError # 将model添加成视图,并控制在前端的显示 from myapp.models.model_job import Repository,Images,Job_Template,Task,Pipeline,Workflow,Tfjob,Xgbjob,RunHistory,Pytorchjob from myapp.models.model_team import Project,Project_User from myapp.views.view_team import Project_Join_Filter from flask_appbuilder.actions import action from flask import current_app, flash, jsonify, make_response, redirect, request, url_for from flask_appbuilder.models.sqla.filters import FilterEqualFunction, FilterStartsWith,FilterEqual,FilterNotEqual from wtforms.validators import EqualTo,Length from flask_babel import lazy_gettext,gettext from flask_appbuilder.security.decorators import has_access from flask_appbuilder.forms import GeneralModelConverter from myapp.utils import core from myapp import app, appbuilder,db,event_logger from wtforms.ext.sqlalchemy.fields import QuerySelectField from jinja2 import Template from jinja2 import contextfilter from jinja2 import Environment, BaseLoader, DebugUndefined, StrictUndefined import os,sys from wtforms.validators import DataRequired, Length, NumberRange, Optional,Regexp from myapp.forms import JsonValidator from myapp.views.view_task import Task_ModelView from sqlalchemy import and_, or_, select from myapp.exceptions import MyappException from wtforms import BooleanField, IntegerField,StringField, SelectField,FloatField,DateField,DateTimeField,SelectMultipleField,FormField,FieldList from myapp.project import push_message,push_admin from flask_appbuilder.fieldwidgets import BS3TextFieldWidget,BS3PasswordFieldWidget,DatePickerWidget,DateTimePickerWidget,Select2ManyWidget,Select2Widget,BS3TextAreaFieldWidget from myapp.forms import MyBS3TextAreaFieldWidget,MySelect2Widget,MyCodeArea,MyLineSeparatedListField,MyJSONField,MyBS3TextFieldWidget,MySelectMultipleField from myapp.utils.py import py_k8s from flask_wtf.file import FileField import shlex import re,copy from kubernetes.client.models import ( V1Container, V1EnvVar, V1EnvFromSource, V1SecurityContext, V1Probe, V1ResourceRequirements, V1VolumeDevice, V1VolumeMount, V1ContainerPort, V1Lifecycle, V1Volume,V1SecurityContext ) from .baseApi import ( MyappModelRestApi ) from flask import ( current_app, abort, flash, g, Markup, make_response, redirect, render_template, request, send_from_directory, Response, url_for, ) from myapp import security_manager from myapp.views.view_team import filter_join_org_project import kfp # 使用自定义的就要把pip安装的删除了 from werkzeug.datastructures import FileStorage from kubernetes import client as k8s_client from .base import ( api, BaseMyappView, check_ownership, data_payload_response, DeleteMixin, generate_download_headers, get_error_msg, get_user_roles, handle_api_exception, json_error_response, json_success, MyappFilter, MyappModelView, json_response ) from flask_appbuilder import CompactCRUDMixin, expose import pysnooper,datetime,time,json conf = app.config logging = app.logger class Pipeline_Filter(MyappFilter): # @pysnooper.snoop() def apply(self, query, func): user_roles = [role.name.lower() for role in list(self.get_user_roles())] if "admin" in user_roles: return query join_projects_id = security_manager.get_join_projects_id(db.session) # public_project_id = # logging.info(join_projects_id) return query.filter( or_( self.model.project_id.in_(join_projects_id), # self.model.project.name.in_(['public']) ) ) from sqlalchemy.exc import InvalidRequestError,OperationalError # 将定义pipeline的流程 # @pysnooper.snoop(watch_explode=()) def dag_to_pipeline(pipeline,dbsession,**kwargs): if not pipeline.id: return pipeline.dag_json = pipeline.fix_dag_json(dbsession) dbsession.commit() dag = json.loads(pipeline.dag_json) # 如果dag为空,就直接退出 if not dag: return None all_tasks = {} for task_name in dag: # 使用临时连接,避免连接中断的问题 # try: # db.session().ping() task = dbsession.query(Task).filter_by(name=task_name, pipeline_id=pipeline.id).first() if not task: raise MyappException('task %s not exist ' % task_name) all_tasks[task_name]=task all_ops = {} pipeline_conf = kfp.dsl.PipelineConf() # 设置机器选择器 # 如果项目中设置了机器选择,就使用项目中设置的 # node_selector = pipeline.project.get('node_selector','') # if not node_selector and pipeline.node_selector: # node_selector = pipeline.node_selector # # if node_selector: # for selector in re.split(',|;|\n|\t', str(pipeline.node_selector)): # pipeline_conf.set_default_pod_node_selector(selector.split('=')[0].strip(),selector.split('=')[1].strip()) # 渲染字符串模板变量 def template_str(src_str): rtemplate = Environment(loader=BaseLoader, undefined=DebugUndefined).from_string(src_str) des_str = rtemplate.render(creator=pipeline.created_by.username, datetime=datetime, runner=g.user.username if g and g.user and g.user.username else pipeline.created_by.username, uuid = uuid, pipeline_id=pipeline.id, pipeline_name=pipeline.name, cluster_name=pipeline.project.cluster['NAME'], **kwargs ) return des_str pipeline_global_env = template_str(pipeline.global_env.strip()) if pipeline.global_env else '' # 优先渲染,不然里面如果有date就可能存在不一致的问题 pipeline_global_env = [ env.strip() for env in pipeline_global_env.split('\n') if '=' in env.strip()] global_envs = json.loads(template_str(json.dumps(conf.get('GLOBAL_ENV', {}),indent=4,ensure_ascii=False))) for env in pipeline_global_env: key,value = env[:env.index('=')],env[env.index('=')+1:] global_envs[key]=value # @pysnooper.snoop() def get_ops(task_name): task = all_tasks[task_name] ops_args = [] task_args = json.loads(task.args) for task_attr_name in task_args: # 添加参数名 if type(task_args[task_attr_name])==bool: if task_args[task_attr_name]: ops_args.append('%s' % str(task_attr_name)) # 添加参数值 elif type(task_args[task_attr_name])==dict or type(task_args[task_attr_name])==list: ops_args.append('%s' % str(task_attr_name)) ops_args.append('%s' % json.dumps(task_args[task_attr_name],ensure_ascii=False)) elif not task_args[task_attr_name]: # 如果参数值为空,则都不添加 pass else: ops_args.append('%s' % str(task_attr_name)) ops_args.append('%s'%str(task_args[task_attr_name])) # 这里应该对不同类型的参数名称做不同的参数处理,比如bool型,只有参数,没有值 # pipeline_global_args = global_env.strip().split(' ') if global_env else [] # pipeline_global_args = [arg.strip() for arg in pipeline_global_args if arg.strip()] # for global_arg in pipeline_global_args: # ops_args.append(global_arg) # 创建ops的pod的创建参数 container_kwargs={} # 设置privileged if task.job_template.privileged: container_kwargs['security_context'] = V1SecurityContext(privileged=task.job_template.privileged) # 设置环境变量 container_envs = [] if task.job_template.env: envs = re.split('\r|\n',task.job_template.env) envs=[env.strip() for env in envs if env.strip()] for env in envs: env_key,env_value = env.split('=')[0],env.split('=')[1] container_envs.append(V1EnvVar(env_key,env_value)) # 设置全局环境变量 for global_env_key in global_envs: container_envs.append(V1EnvVar(global_env_key,global_envs[global_env_key])) # 设置task的默认环境变量 container_envs.append(V1EnvVar("KFJ_TASK_ID", str(task.id))) container_envs.append(V1EnvVar("KFJ_TASK_NAME", str(task.name))) container_envs.append(V1EnvVar("KFJ_TASK_NODE_SELECTOR", str(task.get_node_selector()))) container_envs.append(V1EnvVar("KFJ_TASK_VOLUME_MOUNT", str(task.volume_mount))) container_envs.append(V1EnvVar("KFJ_TASK_IMAGES", str(task.job_template.images))) container_envs.append(V1EnvVar("KFJ_TASK_RESOURCE_CPU", str(task.resource_cpu))) container_envs.append(V1EnvVar("KFJ_TASK_RESOURCE_MEMORY", str(task.resource_memory))) container_envs.append(V1EnvVar("KFJ_TASK_RESOURCE_GPU", str(task.resource_gpu.replace("+",'')))) container_envs.append(V1EnvVar("GPU_TYPE", os.environ.get("GPU_TYPE", "NVIDIA"))) container_kwargs['env']=container_envs # 创建工作目录 if task.job_template.workdir and task.job_template.workdir.strip(): container_kwargs['working_dir'] = task.job_template.workdir.strip() if task.working_dir and task.working_dir.strip(): container_kwargs['working_dir'] = task.working_dir.strip() # # 创建label,这样能让每个pod都找到运行人。 # container_labels={ # 'upload-rtx': g.user.username if g and g.user and g.user.username else pipeline.created_by.username, # 'run-rtx': g.user.username if g and g.user and g.user.username else pipeline.created_by.username # } # container_kwargs['labels']=container_labels task_command = '' if task.command: commands = re.split('\r|\n',task.command) commands = [command.strip() for command in commands if command.strip()] if task_command: task_command += " && " + " && ".join(commands) else: task_command += " && ".join(commands) job_template_entrypoint = task.job_template.entrypoint.strip() if task.job_template.entrypoint else '' command=None if job_template_entrypoint: command = job_template_entrypoint if task_command: command = task_command # entrypoint = task.job_template.images.entrypoint # overwrite_entrypoint = task.overwrite_entrypoint # if entrypoint and entrypoint.strip() and not overwrite_entrypoint: # if task_command: # task_command+=" && "+entrypoint.strip() # else: # task_command += entrypoint.strip() # if not overwrite_entrypoint and (not entrypoint or not entrypoint.strip()): # raise MyappException('job template %s 的镜像的入口命令未填写,联系%s添加镜像入口命令,或选择覆盖入口命令'%(task.job_template.name,task.job_template.created_by.username)) # task_commands = re.split(''' (?=(?:[^'"]|'[^']*'|"[^"]*")*$)''', task_command) # task_command.split(' ') # task_commands = re.split(' ',task_command) # task_commands = [task_command for task_command in task_commands if task_command ] # ops = kfp.dsl.ContainerOp( # name=task.name, # image=task.job_template.images.name, # arguments=ops_args, # command=task_commands if task_commands else None, # container_kwargs=container_kwargs # ) if task.job_template.name==conf.get('CUSTOMIZE_JOB'): container_kwargs['working_dir']=json.loads(task.args).get('workdir') ops = kfp.dsl.ContainerOp( name=task.name, image=json.loads(task.args).get('images'), command=['bash','-c',json.loads(task.args).get('command')], container_kwargs=container_kwargs, file_outputs = json.loads(task.outputs) if task.outputs and json.loads(task.outputs) else None ) else: # 数组方式 # if task_command: # task_command = task_command.split(' ') # task_command = [command for command in task_command if command] command = command.split(' ') if command else [] command = [com for com in command if com] ops = kfp.dsl.ContainerOp( name=task.name, image=task.job_template.images.name, arguments=ops_args, command=command if command else None, container_kwargs=container_kwargs, file_outputs=json.loads(task.outputs) if task.outputs and json.loads(task.outputs) else None ) # 合并方式 # ops = kfp.dsl.ContainerOp( # name=task.name, # image=task.job_template.images.name, # arguments=ops_args, # command=['sh', '-c', task_command] if task_command else None, # container_kwargs=container_kwargs, # file_outputs=json.loads(task.outputs) if task.outputs and json.loads(task.outputs) else None # ) # 添加用户自定义挂载 task.volume_mount=task.volume_mount.strip() if task.volume_mount else '' if task.volume_mount: volume_mounts = re.split(',|;',task.volume_mount) for volume_mount in volume_mounts: volume,mount = volume_mount.split(":")[0].strip(),volume_mount.split(":")[1].strip() if "(pvc)" in volume: pvc_name = volume.replace('(pvc)','').replace(' ','') temps = re.split('_|\.|/', pvc_name) temps = [temp for temp in temps if temp] volumn_name = ('-'.join(temps))[:60].lower().strip('-') ops=ops.add_volume(k8s_client.V1Volume(name=volumn_name, persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name=pvc_name)))\ .add_volume_mount(k8s_client.V1VolumeMount(mount_path=os.path.join(mount,task.pipeline.created_by.username), name=volumn_name,sub_path=task.pipeline.created_by.username)) if "(hostpath)" in volume: hostpath_name = volume.replace('(hostpath)', '').replace(' ', '') temps = re.split('_|\.|/', hostpath_name) temps = [temp for temp in temps if temp] volumn_name = ('-'.join(temps))[:60].lower().strip('-') ops = ops.add_volume(k8s_client.V1Volume(name=volumn_name, host_path=k8s_client.V1HostPathVolumeSource(path=hostpath_name))) \ .add_volume_mount(k8s_client.V1VolumeMount(mount_path=mount, name=volumn_name)) if "(configmap)" in volume: configmap_name = volume.replace('(configmap)', '').replace(' ', '') temps = re.split('_|\.|/', configmap_name) temps = [temp for temp in temps if temp] volumn_name = ('-'.join(temps))[:60].lower().strip('-') ops = ops.add_volume(k8s_client.V1Volume(name=volumn_name, config_map=k8s_client.V1ConfigMapVolumeSource(name=configmap_name))) \ .add_volume_mount(k8s_client.V1VolumeMount(mount_path=mount, name=volumn_name)) if "(memory)" in volume: memory_size = volume.replace('(memory)', '').replace(' ', '').lower().replace('g','') volumn_name = ('memory-%s'%memory_size)[:60].lower().strip('-') ops = ops.add_volume(k8s_client.V1Volume(name=volumn_name,empty_dir=k8s_client.V1EmptyDirVolumeSource(medium='Memory',size_limit='%sGi'%memory_size)))\ .add_volume_mount(k8s_client.V1VolumeMount(mount_path=mount, name=volumn_name)) # 添加上游依赖 if "upstream" in dag[task_name] and dag[task_name]['upstream']: upstream_tasks = dag[task_name]['upstream'] if type(upstream_tasks)==dict: upstream_tasks=list(upstream_tasks.keys()) if type(upstream_tasks)==str: upstream_tasks = upstream_tasks.split(',;') if type(upstream_tasks)!=list: raise MyappException('%s upstream is not valid'%task_name) for upstream_task in upstream_tasks: # 可能存在已删除的upstream_task,这个时候是不添加的 add_upstream=False for task1_name in all_ops: if task1_name==upstream_task: ops.after(all_ops[task1_name]) # 配置任务顺序 add_upstream=True break if not add_upstream: raise MyappException('%s upstream %s is not exist' % (task_name,upstream_task)) # 添加node selector for selector in re.split(',|;|\n|\t', task.get_node_selector()): selector=selector.replace(' ','') if '=' in selector: ops.add_node_selector_constraint(selector.strip().split('=')[0].strip(),selector.strip().split('=')[1].strip()) # # 根据用户身份设置机器选择器 # if g and g.user and g.user.org: # ops.add_node_selector_constraint('org-'+g.user.org,'true') # elif pipeline.created_by.org: # 对定时任务,没有g存储 # ops.add_node_selector_constraint('org-'+pipeline.created_by.org,'true') # 添加pod label ops.add_pod_label("pipeline-id",str(pipeline.id)) ops.add_pod_label("pipeline-name", str(pipeline.name)) ops.add_pod_label("task-name", str(task.name)) ops.add_pod_label("run-id", global_envs.get('KFJ_RUN_ID','')) ops.add_pod_label("task-id", str(task.id)) ops.add_pod_label("task-id", str(task.id)) ops.add_pod_label('upload-rtx', g.user.username if g and g.user and g.user.username else pipeline.created_by.username) ops.add_pod_label('run-rtx', g.user.username if g and g.user and g.user.username else pipeline.created_by.username) ops.add_pod_label('pipeline-rtx', pipeline.created_by.username) # ops.add_pod_label('job-template', task.job_template.name) # 添加亲密度控制 affinity = k8s_client.V1Affinity( pod_anti_affinity=k8s_client.V1PodAntiAffinity( preferred_during_scheduling_ignored_during_execution=[ k8s_client.V1WeightedPodAffinityTerm( weight=80, pod_affinity_term=k8s_client.V1PodAffinityTerm( label_selector=k8s_client.V1LabelSelector( match_labels={ # 'job-template':task.job_template.name, "pipeline-id":str(pipeline.id) } ), topology_key='kubernetes.io/hostname' ) ) ] ) ) ops.add_affinity(affinity) # 设置重试次数 if task.retry: ops.set_retry(int(task.retry)) # 设置超时 if task.timeout: ops.set_timeout(int(task.timeout)) resource_cpu = task.job_template.get_env('TASK_RESOURCE_CPU') if task.job_template.get_env('TASK_RESOURCE_CPU') else task.resource_cpu resource_gpu = task.job_template.get_env('TASK_RESOURCE_GPU') if task.job_template.get_env('TASK_RESOURCE_GPU') else task.resource_gpu resource_memory = task.job_template.get_env('TASK_RESOURCE_MEMORY') if task.job_template.get_env('TASK_RESOURCE_MEMORY') else task.resource_memory # 设置资源限制 if resource_memory: if not '~' in resource_memory: ops.set_memory_request(resource_memory) ops.set_memory_limit(resource_memory) else: # logging.info(task.resource_memory) ops.set_memory_request(resource_memory.split("~")[0]) ops.set_memory_limit(resource_memory.split("~")[1]) if resource_cpu: if not '~' in resource_cpu: ops.set_cpu_request(resource_cpu) ops.set_cpu_limit(resource_cpu) else: # logging.info(task.resource_cpu) ops.set_cpu_request(resource_cpu.split("~")[0]) ops.set_cpu_limit(resource_cpu.split("~")[1]) if resource_gpu: if resource_gpu and core.get_gpu(resource_gpu)[0]>0: ops.set_gpu_limit(core.get_gpu(resource_gpu)[0]) all_ops[task_name]=ops # 这里面是真正的pipeline name 上传时指定的是version的name @kfp.dsl.pipeline(name=pipeline.name,description=pipeline.describe) def my_pipeline(): for task_name in dag: get_ops(task_name) # pipeline运行的相关配置 hubsecret_list = [] for task_name in all_tasks: # 配置拉取秘钥。本来在contain里面,workflow在外面 task_temp = all_tasks[task_name] if task_temp.job_template.images.repository.hubsecret: hubsecret = task_temp.job_template.images.repository.hubsecret if hubsecret not in hubsecret_list: hubsecret_list.append(hubsecret) pipeline_conf.image_pull_secrets.append(k8s_client.V1LocalObjectReference(name=hubsecret)) # # 配置host 在kfp中并不生效 # hostAliases = conf.get('HOSTALIASES', '') # if task_temp.job_template.hostAliases: # hostAliases+="\n"+ task_temp.job_template.hostAliases # if hostAliases: # hostAliases_list = re.split('\r|\n', hostAliases) # hostAliases_list = [host.strip() for host in hostAliases_list if host.strip()] # for row in hostAliases_list: # hosts = row.strip().split(' ') # hosts = [host for host in hosts if host] # # if len(hosts) > 1: # pipeline_conf.set_host_aliases(ip=hosts[0],hostnames=hosts[1:]) # 配置默认拉取策略 if pipeline.image_pull_policy: pipeline_conf.image_pull_policy = pipeline.image_pull_policy # 设置并发 if pipeline.parallelism: pipeline_conf.parallelism = int(pipeline.parallelism) # 设置workflow标签 # if pipeline._extra_data['upload_pipeline']: pipeline_conf.labels['upload-rtx']=g.user.username if g and g.user and g.user.username else pipeline.created_by.username pipeline_conf.labels['run-rtx'] = g.user.username if g and g.user and g.user.username else pipeline.created_by.username pipeline_conf.labels['pipeline-rtx'] = pipeline.created_by.username pipeline_conf.labels['save-time'] = datetime.datetime.now().strftime('%Y-%m-%dT%H-%M-%S') pipeline_conf.labels['pipeline-id'] = str(pipeline.id) pipeline_conf.labels['run-id'] = global_envs.get('KFJ_RUN_ID','') # 以此来绑定运行时id,不能用kfp的run—id。那个是传到kfp以后才产生的。 kfp.compiler.Compiler().compile(my_pipeline, pipeline.name+'.yaml',pipeline_conf=pipeline_conf) file = open(pipeline.name+'.yaml',mode='rb') pipeline_file = template_str(str(file.read(),encoding='utf-8')) file.close() return pipeline_file # logging.info(pipeline.pipeline_file) # @pysnooper.snoop(watch_explode=()) def upload_pipeline(pipeline_file,pipeline_name,kfp_host,pipeline_argo_id): if not pipeline_file: return None,None file = open(pipeline_name + '.yaml', mode='wb') file.write(bytes(pipeline_file,encoding='utf-8')) file.close() client = kfp.Client(kfp_host) # pipeline.project.cluster.get('KFP_HOST') pipeline_argo = None if pipeline_argo_id: try: pipeline_argo = client.get_pipeline(pipeline_argo_id) except Exception as e: logging.error(e) if pipeline_argo: pipeline_argo_version = client.upload_pipeline_version(pipeline_package_path=pipeline_name + '.yaml', pipeline_version_name=pipeline_name+"_version_at_"+datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),pipeline_id=pipeline_argo.id) time.sleep(1) # 因为创建是异步的,要等k8s反应,所以有时延 return pipeline_argo.id,pipeline_argo_version.id else: exist_pipeline_argo_id = None try: exist_pipeline_argo_id = client.get_pipeline_id(pipeline_name) except Exception as e: logging.error(e) if exist_pipeline_argo_id: pipeline_argo_version = client.upload_pipeline_version(pipeline_package_path=pipeline_name + '.yaml',pipeline_version_name=pipeline_name + "_version_at_" + datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),pipeline_id=exist_pipeline_argo_id) time.sleep(1) return exist_pipeline_argo_id,pipeline_argo_version.id else: pipeline_argo = client.upload_pipeline(pipeline_name + '.yaml', pipeline_name=pipeline_name) time.sleep(1) return pipeline_argo.id,pipeline_argo.default_version.id # @pysnooper.snoop(watch_explode=()) def run_pipeline(pipeline_file,pipeline_name,kfp_host,pipeline_argo_id,pipeline_argo_version_id): # logging.info(pipeline) # return # 如果没值就先upload if not pipeline_argo_id or not pipeline_argo_version_id: pipeline_argo_id,pipeline_argo_version_id = upload_pipeline(pipeline_file,pipeline_name,kfp_host,pipeline_argo_id) # 必须上传新版本 client = kfp.Client(kfp_host) # 先创建一个实验,在在这个实验中运行指定pipeline experiment=None try: experiment = client.get_experiment(experiment_name=pipeline_name) except Exception as e: logging.error(e) if not experiment: try: experiment = client.create_experiment(name=pipeline_name,description=pipeline_name) # 现在要求describe不能是中文了 except Exception as e: print(e) return None,None,None # 直接使用pipeline最新的版本运行 try: run = client.run_pipeline(experiment_id = experiment.id,pipeline_id=pipeline_argo_id,version_id=pipeline_argo_version_id,job_name=pipeline_name+"_version_at_"+datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')) return pipeline_argo_id, pipeline_argo_version_id, run.id except Exception as e: print(e) raise e class Pipeline_ModelView_Base(): label_title='任务流' datamodel = SQLAInterface(Pipeline) check_redirect_list_url = '/pipeline_modelview/list/?_flt_2_name=' base_permissions = ['can_show','can_edit','can_list','can_delete','can_add'] base_order = ("changed_on", "desc") # order_columns = ['id','changed_on'] order_columns = ['id'] list_columns = ['id','project','pipeline_url','creator','modified'] add_columns = ['project','name','describe','schedule_type','cron_time','depends_on_past','max_active_runs','expired_limit','parallelism','global_env','alert_status','alert_user','parameter'] show_columns = ['project','name','describe','schedule_type','cron_time','depends_on_past','max_active_runs','expired_limit','parallelism','global_env','dag_json_html','pipeline_file_html','pipeline_argo_id','version_id','run_id','created_by','changed_by','created_on','changed_on','expand_html','parameter_html'] edit_columns = add_columns base_filters = [["id", Pipeline_Filter, lambda: []]] # 设置权限过滤器 conv = GeneralModelConverter(datamodel) add_form_extra_fields = { "name": StringField( _(datamodel.obj.lab('name')), description="英文名(字母、数字、- 组成),最长50个字符", widget=BS3TextFieldWidget(), validators=[Regexp("^[a-z][a-z0-9\-]*[a-z0-9]$"),Length(1,54),DataRequired()] ), "project":QuerySelectField( _(datamodel.obj.lab('project')), query_factory=filter_join_org_project, allow_blank=True, widget=Select2Widget() ), "dag_json": StringField( _(datamodel.obj.lab('dag_json')), widget=MyBS3TextAreaFieldWidget(rows=10), # 传给widget函数的是外层的field对象,以及widget函数的参数 ), "namespace": StringField( _(datamodel.obj.lab('namespace')), description="部署task所在的命名空间(目前无需填写)", default='pipeline', widget=BS3TextFieldWidget() ), "node_selector": StringField( _(datamodel.obj.lab('node_selector')), description="部署task所在的机器(目前无需填写)", widget=BS3TextFieldWidget(), default=datamodel.obj.node_selector.default.arg ), "image_pull_policy": SelectField( _(datamodel.obj.lab('image_pull_policy')), description="镜像拉取策略(always为总是拉取远程镜像,IfNotPresent为若本地存在则使用本地镜像)", widget=Select2Widget(), choices=[['Always','Always'],['IfNotPresent','IfNotPresent']] ), "depends_on_past": BooleanField( _(datamodel.obj.lab('depends_on_past')), description="任务运行是否依赖上一次的示例状态", default=True ), "max_active_runs": IntegerField( _(datamodel.obj.lab('max_active_runs')), description="当前pipeline可同时运行的任务流实例数目", widget=BS3TextFieldWidget(), default=1, validators=[DataRequired()] ), "expired_limit": IntegerField( _(datamodel.obj.lab('expired_limit')), description="定时调度最新实例限制数目,0表示不限制", widget=BS3TextFieldWidget(), default=1, validators=[DataRequired()] ), "parallelism": IntegerField( _(datamodel.obj.lab('parallelism')), description="一个任务流实例中可同时运行的task数目", widget=BS3TextFieldWidget(), default=3, validators=[DataRequired()] ), "global_env": StringField( _(datamodel.obj.lab('global_env')), description="公共环境变量会以环境变量的形式传递给每个task,可以配置多个公共环境变量,每行一个,支持datetime/creator/runner/uuid/pipeline_id等变量 例如:USERNAME={{creator}}", widget=BS3TextAreaFieldWidget() ), "schedule_type":SelectField( _(datamodel.obj.lab('schedule_type')), description="调度类型,once仅运行一次,crontab周期运行,crontab配置保存一个小时候后才生效", widget=Select2Widget(), choices=[['once','once'],['crontab','crontab']] ), "cron_time": StringField( _(datamodel.obj.lab('cron_time')), description="周期任务的时间设定 * * * * * 表示为 minute hour day month week", widget=BS3TextFieldWidget() ), "alert_status":MySelectMultipleField( label=_(datamodel.obj.lab('alert_status')), widget=Select2ManyWidget(), choices=[[x, x] for x in ['Created', 'Pending', 'Running', 'Succeeded', 'Failed', 'Unknown', 'Waiting', 'Terminated']], description="选择通知状态" ), "alert_user": StringField( label=_(datamodel.obj.lab('alert_user')), widget=BS3TextFieldWidget(), description="选择通知用户,每个用户使用逗号分隔" ) } edit_form_extra_fields = add_form_extra_fields related_views = [Task_ModelView, ] # 检测是否具有编辑权限,只有creator和admin可以编辑 def check_edit_permission(self, item): user_roles = [role.name.lower() for role in list(get_user_roles())] if "admin" in user_roles: return True if g.user and g.user.username and hasattr(item,'created_by'): if g.user.username==item.created_by.username: return True flash('just creator can edit/delete ', 'warning') return False # 验证args参数 # @pysnooper.snoop(watch_explode=('item')) def pipeline_args_check(self, item): core.validate_str(item.name,'name') if not item.dag_json: item.dag_json='{}' core.validate_json(item.dag_json) # 校验task的关系,没有闭环,并且顺序要对。没有配置的,自动没有上游,独立 # @pysnooper.snoop() def order_by_upstream(dag_json): order_dag={} tasks_name = list(dag_json.keys()) # 如果没有配全的话,可能只有局部的task i=0 while tasks_name: i+=1 if i>100: # 不会有100个依赖关系 break for task_name in tasks_name: # 没有上游的情况 if not dag_json[task_name]: order_dag[task_name]={} tasks_name.remove(task_name) continue # 没有上游的情况 elif 'upstream' not in dag_json[task_name] or not dag_json[task_name]['upstream']: order_dag[task_name] = {} tasks_name.remove(task_name) continue # 如果有上游依赖的话,先看上游任务是否已经加到里面了。 upstream_all_ready=True for upstream_task_name in dag_json[task_name]['upstream']: if upstream_task_name not in order_dag: upstream_all_ready=False if upstream_all_ready: order_dag[task_name]=dag_json[task_name] tasks_name.remove(task_name) if list(dag_json.keys()).sort()!=list(order_dag.keys()).sort(): flash('dag pipeline 存在循环或未知上游',category='warning') raise MyappException('dag pipeline 存在循环或未知上游') return order_dag # 配置上缺少的默认上游 dag_json = json.loads(item.dag_json) tasks = item.get_tasks(db.session) if tasks and dag_json: for task in tasks: if task.name not in dag_json: dag_json[task.name]={ "upstream": [] } item.dag_json = json.dumps(order_by_upstream(copy.deepcopy(dag_json)),ensure_ascii=False,indent=4) # 生成workflow,如果有id, if item.id and item.get_tasks(): item.pipeline_file = dag_to_pipeline(item,db.session) else: item.pipeline_file = None # raise Exception('args is not valid') # 合并上下游关系 # @pysnooper.snoop(watch_explode=('pipeline')) def merge_upstream(self,pipeline): logging.info(pipeline) dag_json={} # 根据参数生成args字典。一层嵌套的形式 for arg in pipeline.__dict__: if len(arg)>5 and arg[:5] == 'task.': task_upstream = getattr(pipeline,arg) dag_json[arg[5:]]={ "upstream":task_upstream if task_upstream else [] } if dag_json: pipeline.dag_json = json.dumps(dag_json) # @pysnooper.snoop() def pre_add(self, item): item.name = item.name.replace('_', '-')[0:54].lower().strip('-') # item.alert_status = ','.join(item.alert_status) self.pipeline_args_check(item) item.create_datetime=datetime.datetime.now() item.change_datetime = datetime.datetime.now() item.parameter = json.dumps({"cronjob_start_time":datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}, indent=4, ensure_ascii=False) # @pysnooper.snoop() def pre_update(self, item): if item.expand: core.validate_json(item.expand) item.expand = json.dumps(json.loads(item.expand),indent=4,ensure_ascii=False) else: item.expand='{}' item.name = item.name.replace('_', '-')[0:54].lower() # item.alert_status = ','.join(item.alert_status) self.merge_upstream(item) self.pipeline_args_check(item) item.change_datetime = datetime.datetime.now() if item.parameter: item.parameter = json.dumps(json.loads(item.parameter),indent=4,ensure_ascii=False) else: item.parameter = '{}' if (item.schedule_type=='crontab' and self.src_item_json.get("schedule_type")=='once') or (item.cron_time!=self.src_item_json.get("cron_time",'')): parameter = json.loads(item.parameter if item.parameter else '{}') parameter.update({"cronjob_start_time":datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) item.parameter = json.dumps(parameter,indent=4,ensure_ascii=False) # 限制提醒 if item.schedule_type=='crontab': if not item.project.node_selector: flash('无法保障公共集群的稳定性,定时任务请选择专门的日更集群项目组','warning') else: org = item.project.node_selector.replace('org=','') if not org or org=='public': flash('无法保障公共集群的稳定性,定时任务请选择专门的日更集群项目组','warning') def pre_update_get(self,item): item.dag_json = item.fix_dag_json() # item.expand = json.dumps(item.fix_expand(),indent=4,ensure_ascii=False) db.session.commit() # 删除前先把下面的task删除了 # @pysnooper.snoop() def pre_delete(self, pipeline): tasks = pipeline.get_tasks() for task in tasks: db.session.delete(task) db.session.commit() if "(废弃)" not in pipeline.describe: pipeline.describe+="(废弃)" pipeline.schedule_type='once' pipeline.expand="" pipeline.dag_json="{}" db.session.commit() @expose("/my/list/") def my(self): try: user_id=g.user.id if user_id: pipelines = db.session.query(Pipeline).filter_by(created_by_fk=user_id).all() back=[] for pipeline in pipelines: back.append(pipeline.to_json()) return json_response(message='success',status=0,result=back) except Exception as e: print(e) return json_response(message=str(e),status=-1,result={}) @expose("/demo/list/") def demo(self): try: pipelines = db.session.query(Pipeline).filter(Pipeline.parameter.contains('"demo": "true"')).all() back=[] for pipeline in pipelines: back.append(pipeline.to_json()) return json_response(message='success',status=0,result=back) except Exception as e: print(e) return json_response(message=str(e),status=-1,result={}) @event_logger.log_this @expose("/list/") @has_access def list(self): args = request.args.to_dict() if '_flt_0_created_by' in args and args['_flt_0_created_by']=='': print(request.url) print(request.path) flash('去除过滤条件->查看所有pipeline','success') return redirect(request.url.replace('_flt_0_created_by=','_flt_0_created_by=%s'%g.user.id)) widgets = self._list() res = self.render_template( self.list_template, title=self.list_title, widgets=widgets ) return res # @event_logger.log_this @action( "download", __("Download"), __("Download Yaml"), "fa-download", multiple=False, single=True ) def download(self, item): file_name = item.name+'-download.yaml' file_dir = os.path.join(conf.get('DOWNLOAD_FOLDER'),'pipeline') if not os.path.exists(file_dir): os.makedirs(file_dir) file = open(os.path.join(file_dir,file_name), mode='wb') pipeline_file = item.pipeline_file try: import yaml pipeline_yaml = yaml.safe_load(pipeline_file) pipeline_yaml['metadata']['name']=item.name+'-'+uuid.uuid4().hex[:4] pipeline_yaml['metadata']['namespace'] = 'pipeline' pipeline_file = yaml.safe_dump(pipeline_yaml) except Exception as e: print(e) file.write(bytes(pipeline_file, encoding='utf-8')) file.close() response = make_response(send_from_directory(file_dir, file_name, as_attachment=True, conditional=True)) response.headers[ "Content-Disposition" ] = f"attachment; filename={file_name}" logging.info("Ready to return response") return response # 删除手动发起的workflow,不删除定时任务发起的workflow def delete_bind_crd(self,crds): for crd in crds: try: run_id = json.loads(crd['labels']).get("run-id",'') if run_id: # 定时任务发起的不能清理 run_history = db.session.query(RunHistory).filter_by(run_id=run_id).first() if run_history: continue db_crd = db.session.query(Workflow).filter_by(name=crd['name']).first() pipeline = db_crd.pipeline if pipeline: k8s_client = py_k8s.K8s(pipeline.project.cluster['KUBECONFIG']) else: k8s_client = py_k8s.K8s() k8s_client.delete_workflow( all_crd_info = conf.get("CRD_INFO", {}), namespace=crd['namespace'], run_id = run_id ) # push_message(conf.get('ADMIN_USER', '').split(','),'%s手动运行新的pipeline %s,进而删除旧的pipeline run-id: %s' % (pipeline.created_by.username,pipeline.describe,run_id,)) if db_crd: db_crd.status='Deleted' db_crd.change_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') db.session.commit() except Exception as e: print(e) def check_pipeline_perms(user_fun): # @pysnooper.snoop() def wraps(*args, **kwargs): pipeline_id = int(kwargs.get('pipeline_id','0')) if not pipeline_id: response = make_response("pipeline_id not exist") response.status_code = 404 return response user_roles = [role.name.lower() for role in g.user.roles] if "admin" in user_roles: return user_fun(*args, **kwargs) join_projects_id = security_manager.get_join_projects_id(db.session) pipeline = db.session.query(Pipeline).filter_by(id=pipeline_id).first() if pipeline.project.id in join_projects_id: return user_fun(*args, **kwargs) response = make_response("no perms to run pipeline %s"%pipeline_id) response.status_code = 403 return response return wraps # # @event_logger.log_this @expose("/run_pipeline/", methods=["GET", "POST"]) @check_pipeline_perms def run_pipeline(self,pipeline_id): print(pipeline_id) pipeline = db.session.query(Pipeline).filter_by(id=pipeline_id).first() pipeline.delete_old_task() time.sleep(1) # if pipeline.changed_on+datetime.timedelta(seconds=5)>datetime.datetime.now(): # flash("发起运行实例,太过频繁,5s后重试",category='warning') # return redirect('/pipeline_modelview/list/?_flt_2_name=') back_crds = pipeline.get_workflow() # 把消息加入到源数据库 for crd in back_crds: try: workflow = db.session.query(Workflow).filter_by(name=crd['name']).first() if not workflow: username = '' labels = json.loads(crd['labels']) if 'run-rtx' in labels: username = labels['run-rtx'] elif 'upload-rtx' in labels: username = labels['upload-rtx'] workflow = Workflow(name=crd['name'], namespace=crd['namespace'], create_time=crd['create_time'], status=crd['status'], annotations=crd['annotations'], labels=crd['labels'], spec=crd['spec'], status_more=crd['status_more'], username=username ) db.session.add(workflow) db.session.commit() except Exception as e: print(e) # 这里直接删除所有的历史任务流,正在运行的也删除掉 # not_running_crds = back_crds # [crd for crd in back_crds if 'running' not in crd['status'].lower()] self.delete_bind_crd(back_crds) # running_crds = [1 for crd in back_crds if 'running' in crd['status'].lower()] # if len(running_crds)>0: # flash("发现当前运行实例 %s 个,目前集群仅支持每个任务流1个运行实例,若要重新发起实例,请先stop旧实例"%len(running_crds),category='warning') # # run_instance = '/workflow_modelview/list/?_flt_2_name=%s'%pipeline.name.replace("_","-")[:54] # run_instance = r'/workflow_modelview/list/?_flt_2_labels="pipeline-id"%3A+"'+'%s"' % pipeline_id # return redirect(run_instance) # self.delete_workflow(pipeline) pipeline.pipeline_file = dag_to_pipeline(pipeline, db.session) # 合成workflow # print('make pipeline file %s' % pipeline.pipeline_file) # return print('begin upload and run pipeline %s' % pipeline.name) pipeline.version_id = '' pipeline.run_id = '' pipeline_argo_id, version_id, run_id = run_pipeline( pipeline_file=pipeline.pipeline_file, pipeline_name=pipeline.name, kfp_host=pipeline.project.cluster.get('KFP_HOST'), pipeline_argo_id=pipeline.pipeline_argo_id, pipeline_argo_version_id=pipeline.version_id ) # 会根据版本号是否为空决定是否上传 print('success upload and run pipeline %s,pipeline_argo_id %s, version_id %s,run_id %s ' % (pipeline.name, pipeline_argo_id, version_id, run_id)) pipeline.pipeline_argo_id = pipeline_argo_id pipeline.version_id = version_id pipeline.run_id = run_id db.session.commit() # 更新 run_url = conf.get('PIPELINE_URL') + "runs/details/" + run_id logging.info(run_url) # run_url='http://www.baidu.com/http://www.baidu.com/' return redirect("/pipeline_modelview/web/log/%s"%pipeline_id) # return redirect(run_url) # # @event_logger.log_this @expose("/web/", methods=["GET"]) def web(self,pipeline_id): pipeline = db.session.query(Pipeline).filter_by(id=pipeline_id).first() pipeline.dag_json = pipeline.fix_dag_json() # pipeline.expand = json.dumps(pipeline.fix_expand(), indent=4, ensure_ascii=False) pipeline.expand = json.dumps(pipeline.fix_position(), indent=4, ensure_ascii=False) # db_tasks = pipeline.get_tasks(db.session) # if db_tasks: # try: # tasks={} # for task in db_tasks: # tasks[task.name]=task.to_json() # expand = core.fix_task_position(pipeline.to_json(),tasks) # pipeline.expand=json.dumps(expand,indent=4,ensure_ascii=False) # db.session.commit() # except Exception as e: # print(e) db.session.commit() print(pipeline_id) data = { "url": '/static/appbuilder/vison/index.html?pipeline_id=%s'%pipeline_id # 前后端集成完毕,这里需要修改掉 } # 返回模板 return self.render_template('link.html', data=data) # # @event_logger.log_this @expose("/web/log/", methods=["GET"]) def web_log(self,pipeline_id): pipeline = db.session.query(Pipeline).filter_by(id=pipeline_id).first() if pipeline.run_id: data = { "url": pipeline.project.cluster.get('PIPELINE_URL') + "runs/details/" + pipeline.run_id, "target": "div.page_f1flacxk:nth-of-type(0)", # "div.page_f1flacxk:nth-of-type(0)", "delay":500, "loading": True } # 返回模板 if pipeline.project.cluster['NAME']==conf.get('ENVIRONMENT'): return self.render_template('link.html', data=data) else: return self.render_template('external_link.html', data=data) else: flash('no running instance','warning') return redirect('/pipeline_modelview/web/%s'%pipeline.id) # # @event_logger.log_this @expose("/web/monitoring/", methods=["GET"]) def web_monitoring(self,pipeline_id): pipeline = db.session.query(Pipeline).filter_by(id=int(pipeline_id)).first() if pipeline.run_id: url = pipeline.project.cluster.get('GRAFANA_HOST','').strip('/')+conf.get('GRAFANA_TASK_PATH')+ pipeline.name return redirect(url) # data = { # "url": pipeline.project.cluster.get('GRAFANA_HOST','').strip('/')+conf.get('GRAFANA_TASK_PATH') + pipeline.name, # # "target": "div.page_f1flacxk:nth-of-type(0)", # "div.page_f1flacxk:nth-of-type(0)", # "delay":1000, # "loading": True # } # # 返回模板 # if pipeline.project.cluster['NAME']==conf.get('ENVIRONMENT'): # return self.render_template('link.html', data=data) # else: # return self.render_template('external_link.html', data=data) else: flash('no running instance','warning') return redirect('/pipeline_modelview/web/%s'%pipeline.id) # # @event_logger.log_this @expose("/web/pod/", methods=["GET"]) def web_pod(self,pipeline_id): pipeline = db.session.query(Pipeline).filter_by(id=pipeline_id).first() data = { "url": pipeline.project.cluster.get('K8S_DASHBOARD_CLUSTER', '') + '#/search?namespace=%s&q=%s' % (conf.get('PIPELINE_NAMESPACE'), pipeline.name.replace('_', '-')), "target":"div.kd-chrome-container.kd-bg-background", "delay":500, "loading": True } # 返回模板 if pipeline.project.cluster['NAME']==conf.get('ENVIRONMENT'): return self.render_template('link.html', data=data) else: return self.render_template('external_link.html', data=data) # @pysnooper.snoop(watch_explode=('expand')) def copy_db(self,pipeline): new_pipeline = pipeline.clone() expand = json.loads(pipeline.expand) if pipeline.expand else {} new_pipeline.name = new_pipeline.name.replace('_', '-') + "-copy-" + uuid.uuid4().hex[:4] new_pipeline.created_on = datetime.datetime.now() new_pipeline.changed_on = datetime.datetime.now() db.session.add(new_pipeline) db.session.commit() def change_node(src_task_id, des_task_id): for node in expand: if 'source' not in node: # 位置信息换成新task的id if int(node['id']) == int(src_task_id): node['id'] = str(des_task_id) else: if int(node['source']) == int(src_task_id): node['source'] = str(des_task_id) if int(node['target']) == int(src_task_id): node['target'] = str(des_task_id) # 复制绑定的task,并绑定新的pipeline for task in pipeline.get_tasks(): new_task = task.clone() new_task.pipeline_id = new_pipeline.id new_task.create_datetime = datetime.datetime.now() new_task.change_datetime = datetime.datetime.now() db.session.add(new_task) db.session.commit() change_node(task.id, new_task.id) new_pipeline.expand = json.dumps(expand) db.session.commit() return new_pipeline # # @event_logger.log_this @expose("/copy_pipeline/", methods=["GET", "POST"]) def copy_pipeline(self,pipeline_id): print(pipeline_id) message='' try: pipeline = db.session.query(Pipeline).filter_by(id=pipeline_id).first() new_pipeline = self.copy_db(pipeline) return jsonify(new_pipeline.to_json()) # return redirect('/pipeline_modelview/web/%s'%new_pipeline.id) except InvalidRequestError: db.session.rollback() except Exception as e: logging.error(e) message=str(e) response = make_response("copy pipeline %s error: %s" % (pipeline_id,message)) response.status_code = 500 return response @action( "copy", __("Copy Pipeline"), confirmation=__('Copy Pipeline'), icon="fa-copy",multiple=True, single=False ) def copy(self, pipelines): if not isinstance(pipelines, list): pipelines = [pipelines] try: for pipeline in pipelines: self.copy_db(pipeline) except InvalidRequestError: db.session.rollback() except Exception as e: logging.error(e) raise e return redirect(request.referrer) class Pipeline_ModelView(Pipeline_ModelView_Base,MyappModelView,DeleteMixin): datamodel = SQLAInterface(Pipeline) # base_order = ("changed_on", "desc") # order_columns = ['changed_on'] appbuilder.add_view(Pipeline_ModelView,"任务流",href="/pipeline_modelview/list/?_flt_0_created_by=",icon = 'fa-sitemap',category = '训练') # 添加api class Pipeline_ModelView_Api(Pipeline_ModelView_Base,MyappModelRestApi): datamodel = SQLAInterface(Pipeline) route_base = '/pipeline_modelview/api' show_columns = ['project','name','describe','namespace','schedule_type','cron_time','node_selector','image_pull_policy','depends_on_past','max_active_runs','parallelism','global_env','dag_json','pipeline_file','pipeline_argo_id','version_id','run_id','created_by','changed_by','created_on','changed_on','expand'] # show_columns = ['name','describe','project','dag_json','namespace','global_env','schedule_type','cron_time','pipeline_file','pipeline_argo_id','version_id','run_id','node_selector','image_pull_policy','parallelism','alert_status'] list_columns = show_columns add_columns = ['project','name','describe','namespace','schedule_type','cron_time','node_selector','image_pull_policy','depends_on_past','max_active_runs','parallelism','dag_json','global_env','expand'] edit_columns = add_columns appbuilder.add_api(Pipeline_ModelView_Api)