from flask_appbuilder.models.sqla.interface import SQLAInterface from flask_babel import lazy_gettext as _ from myapp.models.model_service_pipeline import Service_Pipeline from myapp.models.model_job import Repository from flask import jsonify from flask_appbuilder.forms import GeneralModelConverter from myapp.utils import core from myapp import app, appbuilder,db from wtforms.ext.sqlalchemy.fields import QuerySelectField from wtforms.validators import DataRequired, Length, Regexp from sqlalchemy import or_ from myapp.exceptions import MyappException from wtforms import StringField, SelectField from flask_appbuilder.fieldwidgets import BS3TextFieldWidget,Select2ManyWidget,Select2Widget from myapp.forms import MyBS3TextAreaFieldWidget,MySelectMultipleField import copy from .baseApi import ( MyappModelRestApi ) from flask import ( flash, g, make_response, redirect, request, ) from myapp import security_manager from myapp.views.view_team import filter_join_org_project from .base import ( DeleteMixin, get_user_roles, MyappFilter, MyappModelView, json_response ) from flask_appbuilder import expose import pysnooper,datetime, json conf = app.config logging = app.logger class Service_Pipeline_Filter(MyappFilter): # @pysnooper.snoop() def apply(self, query, func): user_roles = [role.name.lower() for role in list(self.get_user_roles())] if "admin" in user_roles: return query join_projects_id = security_manager.get_join_projects_id(db.session) # public_project_id = # logging.info(join_projects_id) return query.filter( or_( self.model.project_id.in_(join_projects_id), # self.model.project.name.in_(['public']) ) ) class Service_Pipeline_ModelView_Base(): label_title='任务流' datamodel = SQLAInterface(Service_Pipeline) check_redirect_list_url = conf.get('MODEL_URLS',{}).get('service_pipeline','') base_permissions = ['can_show','can_edit','can_list','can_delete','can_add'] base_order = ("changed_on", "desc") # order_columns = ['id','changed_on'] order_columns = ['id'] list_columns = ['project','service_pipeline_url','creator','modified','operate_html'] add_columns = ['project','name','describe','namespace','images','env','resource_memory', 'resource_cpu','resource_gpu', 'replicas','dag_json','alert_status','alert_user','parameter'] show_columns = ['project','name','describe','namespace','run_id','created_by','changed_by','created_on','changed_on','expand_html','parameter_html'] edit_columns = add_columns base_filters = [["id", Service_Pipeline_Filter, lambda: []]] conv = GeneralModelConverter(datamodel) add_form_extra_fields = { "name": StringField( _(datamodel.obj.lab('name')), description="英文名(小写字母、数字、- 组成),最长50个字符", widget=BS3TextFieldWidget(), validators=[Regexp("^[a-z][a-z0-9\-]*[a-z0-9]$"),Length(1,54),DataRequired()] ), "project":QuerySelectField( _(datamodel.obj.lab('project')), query_factory=filter_join_org_project, allow_blank=True, widget=Select2Widget() ), "dag_json": StringField( _(datamodel.obj.lab('dag_json')), widget=MyBS3TextAreaFieldWidget(rows=10), # 传给widget函数的是外层的field对象,以及widget函数的参数 ), "namespace": StringField( _(datamodel.obj.lab('namespace')), description="部署task所在的命名空间(目前无需填写)", default='service', widget=BS3TextFieldWidget() ), "images":StringField( _(datamodel.obj.lab('images')), default='ccr.ccs.tencentyun.com/cube-studio/service-pipeline', description="推理服务镜像", widget=BS3TextFieldWidget(), validators=[DataRequired()] ), "node_selector": StringField( _(datamodel.obj.lab('node_selector')), description="部署task所在的机器(目前无需填写)", widget=BS3TextFieldWidget(), default=datamodel.obj.node_selector.default.arg ), "image_pull_policy": SelectField( _(datamodel.obj.lab('image_pull_policy')), description="镜像拉取策略(always为总是拉取远程镜像,IfNotPresent为若本地存在则使用本地镜像)", widget=Select2Widget(), choices=[['Always','Always'],['IfNotPresent','IfNotPresent']] ), "alert_status":MySelectMultipleField( label=_(datamodel.obj.lab('alert_status')), widget=Select2ManyWidget(), choices=[[x, x] for x in ['Created', 'Pending', 'Running', 'Succeeded', 'Failed', 'Unknown', 'Waiting', 'Terminated']], description="选择通知状态" ), "alert_user": StringField( label=_(datamodel.obj.lab('alert_user')), widget=BS3TextFieldWidget(), description="选择通知用户,每个用户使用逗号分隔" ), "label": StringField(_(datamodel.obj.lab('label')), description='中文名', widget=BS3TextFieldWidget(), validators=[DataRequired()]), "resource_memory": StringField(_(datamodel.obj.lab('resource_memory')), default=Service_Pipeline.resource_memory.default.arg, description='内存的资源使用限制,示例1G,10G, 最大100G,如需更多联系管路员', widget=BS3TextFieldWidget(), validators=[DataRequired()]), "resource_cpu": StringField(_(datamodel.obj.lab('resource_cpu')), default=Service_Pipeline.resource_cpu.default.arg, description='cpu的资源使用限制(单位核),示例 0.4,10,最大50核,如需更多联系管路员', widget=BS3TextFieldWidget(), validators=[DataRequired()]), "resource_gpu": StringField(_(datamodel.obj.lab('resource_gpu')), default='0', description='gpu的资源使用限制(单位卡),示例:1,2,训练任务每个容器独占整卡。申请具体的卡型号,可以类似 1(V100),目前支持T4/V100/A100/VGPU', widget=BS3TextFieldWidget()), "replicas": StringField(_(datamodel.obj.lab('replicas')), default=Service_Pipeline.replicas.default.arg, description='pod副本数,用来配置高可用', widget=BS3TextFieldWidget(), validators=[DataRequired()]), "env": StringField(_(datamodel.obj.lab('env')), default=Service_Pipeline.env.default.arg, description='使用模板的task自动添加的环境变量,支持模板变量。书写格式:每行一个环境变量env_key=env_value', widget=MyBS3TextAreaFieldWidget()), } edit_form_extra_fields = add_form_extra_fields # 检测是否具有编辑权限,只有creator和admin可以编辑 def check_edit_permission(self, item): user_roles = [role.name.lower() for role in list(get_user_roles())] if "admin" in user_roles: return True if g.user and g.user.username and hasattr(item,'created_by'): if g.user.username==item.created_by.username: return True flash('just creator can edit/delete ', 'warning') return False # 验证args参数 @pysnooper.snoop(watch_explode=('item')) def service_pipeline_args_check(self, item): core.validate_str(item.name,'name') if not item.dag_json: item.dag_json='{}' core.validate_json(item.dag_json) # 校验task的关系,没有闭环,并且顺序要对。没有配置的,自动没有上游,独立 # @pysnooper.snoop() def order_by_upstream(dag_json): order_dag={} tasks_name = list(dag_json.keys()) # 如果没有配全的话,可能只有局部的task i=0 while tasks_name: i+=1 if i>100: # 不会有100个依赖关系 break for task_name in tasks_name: # 没有上游的情况 if not dag_json[task_name]: order_dag[task_name]=dag_json[task_name] tasks_name.remove(task_name) continue # 没有上游的情况 elif 'upstream' not in dag_json[task_name] or not dag_json[task_name]['upstream']: order_dag[task_name] = dag_json[task_name] tasks_name.remove(task_name) continue # 如果有上游依赖的话,先看上游任务是否已经加到里面了。 upstream_all_ready=True for upstream_task_name in dag_json[task_name]['upstream']: if upstream_task_name not in order_dag: upstream_all_ready=False if upstream_all_ready: order_dag[task_name]=dag_json[task_name] tasks_name.remove(task_name) else: dag_json[task_name]['upstream']=[] order_dag[task_name] = dag_json[task_name] tasks_name.remove(task_name) if list(dag_json.keys()).sort()!=list(order_dag.keys()).sort(): flash('dag service pipeline 存在循环或未知上游',category='warning') raise MyappException('dag service pipeline 存在循环或未知上游') return order_dag # 配置上缺少的默认上游 dag_json = json.loads(item.dag_json) item.dag_json = json.dumps(order_by_upstream(copy.deepcopy(dag_json)),ensure_ascii=False,indent=4) # raise Exception('args is not valid') # @pysnooper.snoop() def pre_add(self, item): item.name = item.name.replace('_', '-')[0:54].lower().strip('-') # item.alert_status = ','.join(item.alert_status) # self.service_pipeline_args_check(item) item.create_datetime=datetime.datetime.now() item.change_datetime = datetime.datetime.now() item.parameter = json.dumps({}, indent=4, ensure_ascii=False) item.volume_mount = item.project.volume_mount + ",%s(configmap):/config/" % item.name # @pysnooper.snoop() def pre_update(self, item): if item.expand: core.validate_json(item.expand) item.expand = json.dumps(json.loads(item.expand),indent=4,ensure_ascii=False) else: item.expand='{}' item.name = item.name.replace('_', '-')[0:54].lower() item.alert_status = ','.join(item.alert_status) # self.service_pipeline_args_check(item) item.change_datetime = datetime.datetime.now() item.parameter = json.dumps(json.loads(item.parameter),indent=4,ensure_ascii=False) if item.parameter else '{}' item.dag_json = json.dumps(json.loads(item.dag_json), indent=4,ensure_ascii=False) if item.dag_json else '{}' item.volume_mount = item.project.volume_mount + ",%s(configmap):/config/" % item.name @expose("/my/list/") def my(self): try: user_id=g.user.id if user_id: service_pipelines = db.session.query(Service_Pipeline).filter_by(created_by_fk=user_id).all() back=[] for service_pipeline in service_pipelines: back.append(service_pipeline.to_json()) return json_response(message='success',status=0,result=back) except Exception as e: print(e) return json_response(message=str(e),status=-1,result={}) def check_service_pipeline_perms(user_fun): # @pysnooper.snoop() def wraps(*args, **kwargs): service_pipeline_id = int(kwargs.get('service_pipeline_id','0')) if not service_pipeline_id: response = make_response("service_pipeline_id not exist") response.status_code = 404 return response user_roles = [role.name.lower() for role in g.user.roles] if "admin" in user_roles: return user_fun(*args, **kwargs) join_projects_id = security_manager.get_join_projects_id(db.session) service_pipeline = db.session.query(Service_Pipeline).filter_by(id=service_pipeline_id).first() if service_pipeline.project.id in join_projects_id: return user_fun(*args, **kwargs) response = make_response("no perms to run pipeline %s"%service_pipeline_id) response.status_code = 403 return response return wraps # 构建同步服务 def build_http(self,service_pipeline): pass # 构建异步服务 @pysnooper.snoop() def build_mq_consumer(self,service_pipeline): namespace = conf.get('SERVICE_PIPELINE_NAMESPACE') name = service_pipeline.name command = service_pipeline.command image_secrets = conf.get('HUBSECRET', []) user_hubsecrets = db.session.query(Repository.hubsecret).filter(Repository.created_by_fk == g.user.id).all() if user_hubsecrets: for hubsecret in user_hubsecrets: if hubsecret[0] not in image_secrets: image_secrets.append(hubsecret[0]) from myapp.utils.py.py_k8s import K8s k8s_client = K8s(service_pipeline.project.cluster.get('KUBECONFIG','')) dag_json=service_pipeline.dag_json if service_pipeline.dag_json else '{}' # 生成服务使用的configmap config_data = { "dag.json":dag_json } k8s_client.create_configmap(namespace=namespace,name=name,data=config_data,labels={'app':name}) env = service_pipeline.env if conf.get('SERVICE_PIPELINE_JAEGER',''): env['JAEGER_HOST']=conf.get('SERVICE_PIPELINE_JAEGER','') env['SERVICE_NAME'] = name labels = {"app": name, "user": service_pipeline.created_by.username,"pod-type":"service-pipeline"} k8s_client.create_deployment( namespace=namespace, name=name, replicas=service_pipeline.replicas, labels=labels, # command=['sh','-c',command] if command else None, command=['bash', '-c', "python mq-pipeline/cube_kafka.py"], args=None, volume_mount=service_pipeline.volume_mount, working_dir=service_pipeline.working_dir, node_selector=service_pipeline.get_node_selector(), resource_memory=service_pipeline.resource_memory, resource_cpu=service_pipeline.resource_cpu, resource_gpu=service_pipeline.resource_gpu if service_pipeline.resource_gpu else '', image_pull_policy=conf.get('IMAGE_PULL_POLICY','Always'), image_pull_secrets=image_secrets, image=service_pipeline.images, hostAliases=conf.get('HOSTALIASES',''), env=env, privileged=False, accounts=None, username=service_pipeline.created_by.username, ports=None ) pass # 只能有一个入口。不能同时接口两个队列 # # @event_logger.log_this @expose("/run_service_pipeline/", methods=["GET", "POST"]) @check_service_pipeline_perms def run_service_pipeline(self,service_pipeline_id): service_pipeline = db.session.query(Service_Pipeline).filter_by(id=service_pipeline_id).first() dag_json = json.loads(service_pipeline.dag_json) root_nodes_name = service_pipeline.get_root_node_name() self.clear(service_pipeline_id) if root_nodes_name: root_node_name=root_nodes_name[0] root_node=dag_json[root_node_name] # 构建异步 if root_node['template-group']=='endpoint' and root_node['template']=='mq': self.build_mq_consumer(service_pipeline) # 构建同步 if root_node['template-group'] == 'endpoint' and root_node['template'] == 'gateway': self.build_http(service_pipeline) return redirect("/service_pipeline_modelview/web/log/%s"%service_pipeline_id) # return redirect(run_url) # # @event_logger.log_this @expose("/web/", methods=["GET"]) def web(self,service_pipeline_id): service_pipeline = db.session.query(Service_Pipeline).filter_by(id=service_pipeline_id).first() # service_pipeline.dag_json = service_pipeline.fix_dag_json() # service_pipeline.expand = json.dumps(service_pipeline.fix_position(), indent=4, ensure_ascii=False) db.session.commit() print(service_pipeline_id) data = { "url": '/static/appbuilder/vison/index.html?pipeline_id=%s'%service_pipeline_id # 前后端集成完毕,这里需要修改掉 } # 返回模板 return self.render_template('link.html', data=data) # # @event_logger.log_this @expose("/web/log/", methods=["GET"]) def web_log(self,service_pipeline_id): service_pipeline = db.session.query(Service_Pipeline).filter_by(id=service_pipeline_id).first() if service_pipeline.run_id: data = { "url": service_pipeline.project.cluster.get('PIPELINE_URL') + "runs/details/" + service_pipeline.run_id, "target": "div.page_f1flacxk:nth-of-type(0)", # "div.page_f1flacxk:nth-of-type(0)", "delay":500, "loading": True } # 返回模板 if service_pipeline.project.cluster['NAME']==conf.get('ENVIRONMENT'): return self.render_template('link.html', data=data) else: return self.render_template('external_link.html', data=data) else: flash('no running instance','warning') return redirect('/service_pipeline_modelview/web/%s'%service_pipeline.id) # 链路跟踪 @expose("/web/monitoring/", methods=["GET"]) def web_monitoring(self,service_pipeline_id): service_pipeline = db.session.query(Service_Pipeline).filter_by(id=int(service_pipeline_id)).first() if service_pipeline.run_id: url = service_pipeline.project.cluster.get('GRAFANA_HOST','').strip('/')+conf.get('GRAFANA_TASK_PATH')+ service_pipeline.name return redirect(url) else: flash('no running instance','warning') return redirect('/service_pipeline_modelview/web/%s'%service_pipeline.id) # # @event_logger.log_this @expose("/web/pod/", methods=["GET"]) def web_pod(self,service_pipeline_id): service_pipeline = db.session.query(Service_Pipeline).filter_by(id=service_pipeline_id).first() data = { "url": service_pipeline.project.cluster.get('K8S_DASHBOARD_CLUSTER', '') + '#/search?namespace=%s&q=%s' % (conf.get('SERVICE_PIPELINE_NAMESPACE'), service_pipeline.name.replace('_', '-')), "target":"div.kd-chrome-container.kd-bg-background", "delay":500, "loading": True } # 返回模板 if service_pipeline.project.cluster['NAME']==conf.get('ENVIRONMENT'): return self.render_template('link.html', data=data) else: return self.render_template('external_link.html', data=data) @expose('/clear/', methods=['POST', "GET"]) def clear(self, service_pipeline_id): service_pipeline = db.session.query(Service_Pipeline).filter_by(id=service_pipeline_id).first() from myapp.utils.py.py_k8s import K8s k8s_client = K8s(service_pipeline.project.cluster.get('KUBECONFIG','')) namespace = conf.get('SERVICE_PIPELINE_NAMESPACE') k8s_client.delete_deployment(namespace=namespace, name=service_pipeline.name) flash('服务清理完成', category='success') return redirect('/service_pipeline_modelview/list/') @expose("/config/",methods=("GET",'POST')) def pipeline_config(self,service_pipeline_id): print(service_pipeline_id) pipeline = db.session.query(Service_Pipeline).filter_by(id=service_pipeline_id).first() if not pipeline: return jsonify({ "status":1, "message":"服务流不存在", "result":{} }) if request.method.lower()=='post': data = request.get_json() request_config =data.get('config',{}) request_dag = data.get('dag_json', {}) if request_config: pipeline.config = json.dumps(request_config,indent=4,ensure_ascii=False) if request_dag: pipeline.dag_json = json.dumps(request_dag, indent=4, ensure_ascii=False) db.session.commit() config = { "id":pipeline.id, "name":pipeline.name, "label":pipeline.describe, "project":pipeline.project.describe, "pipeline_ui_config":{ "alert":{ "alert_user":{ "type": "str", "item_type": "str", "label": "报警用户", "require": 1, "choice": [], "range": "", "default": "", "placeholder": "报警用户名,逗号分隔", "describe": "报警用户,逗号分隔", "editable": 1, "condition": "", "sub_args": {} } } }, "pipeline_jump_button": [ { "name":"资源查看", "action_url":"", "icon_svg":'' } ], "pipeline_run_button": [ ], "task_jump_button": [], "dag_json":json.loads(pipeline.dag_json), "config": json.loads(pipeline.config), "message": "success", "status": 0 } return jsonify(config) @expose("/template/list/") def template_list(self): all_template={ "message": "success", "templte_common_ui_config":{ }, "template_group_order": ["入口", "逻辑节点", "功能节点"], "templte_list": { "入口":[ { "template_name": "kafka", "template_id": 1, "templte_ui_config": { "shell": { "topic": { "type": "str", "item_type": "str", "label": "topic", "require": 1, "choice": [], "range": "", "default": "predict", "placeholder": "", "describe": "kafka topic", "editable": 1, "condition": "", "sub_args": {} }, "consumer_num": { "type": "str", "item_type": "str", "label": "消费者数目", "require": 1, "choice": [], "range": "", "default": "4", "placeholder": "", "describe": "消费者数目", "editable": 1, "condition": "", "sub_args": {} }, "bootstrap_servers": { "type": "str", "item_type": "str", "label": "地址", "require": 1, "choice": [], "range": "", "default": "127.0.0.1:9092", "placeholder": "", "describe": "xx.xx.xx.xx:9092,xx.xx.xx.xx:9092", "editable": 1, "condition": "", "sub_args": {} }, "group": { "type": "str", "item_type": "str", "label": "分组", "require": 1, "choice": [], "range": "", "default": "predict", "placeholder": "", "describe": "消费者分组", "editable": 1, "condition": "", "sub_args": {} } } }, "username": g.user.username, "changed_on": datetime.datetime.now(), "created_on": datetime.datetime.now(), "label": "kafka", "describe": "消费kafka数据", "help_url": "", "pass_through": { # 无论什么内容 通过task的字段透传回来 } } ], "逻辑节点":[ { "template_name": "switch", "template_id": 2, "templte_ui_config": { "shell": { "case": { "type": "text", "item_type": "str", "label": "表达式", "require": 1, "choice": [], "range": "", "default": "int(input['node2'])<3:node4,node5:'3'\ndefault:node6:'0'", "placeholder": "", "describe": "条件:下游节点:输出 其中input为节点输入", "editable": 1, "condition": "", "sub_args": {} } } }, "username": g.user.username, "changed_on": datetime.datetime.now(), "created_on": datetime.datetime.now(), "label": "switch-case逻辑节点", "describe": "控制数据的流量", "help_url": "", "pass_through": { # 无论什么内容 通过task的字段透传回来 } }, ], "功能节点": [ { "template_name": "http", "template_id": 3, "templte_ui_config": { "shell": { "method": { "type": "choice", "item_type": "str", "label": "请求方式", "require": 1, "choice": ["GET","POST"], "range": "", "default": "POST", "placeholder": "", "describe": "请求方式", "editable": 1, "condition": "", "sub_args": {} }, "url": { "type": "str", "item_type": "str", "label": "请求地址", "require": 1, "choice": [], "range": "", "default": "http://127.0.0.1:8080/api", "placeholder": "", "describe": "请求地址", "editable": 1, "condition": "", "sub_args": {} }, "header": { "type": "text", "item_type": "str", "label": "请求头", "require": 1, "choice": [], "range": "", "default": "{}", "placeholder": "", "describe": "请求头", "editable": 1, "condition": "", "sub_args": {} }, "timeout": { "type": "int", "item_type": "str", "label": "请求超时", "require": 1, "choice": [], "range": "", "default": "300", "placeholder": "", "describe": "请求超时", "editable": 1, "condition": "", "sub_args": {} }, "date": { "type": "text", "item_type": "str", "label": "请求内容", "require": 1, "choice": [], "range": "", "default": "{}", "placeholder": "", "describe": "请求内容", "editable": 1, "condition": "", "sub_args": {} } } }, "username": g.user.username, "changed_on": datetime.datetime.now(), "created_on": datetime.datetime.now(), "label": "http请求", "describe": "http请求", "help_url": "", "pass_through": { # 无论什么内容 通过task的字段透传回来 } }, { "template_name": "自定义方法", "template_id": 4, "templte_ui_config": { "shell": { "sdk_path": { "type": "str", "item_type": "str", "label": "函数文件地址", "require": 1, "choice": [], "range": "", "default": "", "placeholder": "", "describe": "函数文件地址,文件名和python类型要相同", "editable": 1, "condition": "", "sub_args": {} }, } }, "username": g.user.username, "changed_on": datetime.datetime.now(), "created_on": datetime.datetime.now(), "label": "http请求", "describe": "http请求", "help_url": "", "pass_through": { # 无论什么内容 通过task的字段透传回来 } } ], }, "status": 0 } index = 1 for group in all_template['templte_list']: for template in all_template['templte_list'][group]: template['template_id'] = index template['changed_on'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') template['created_on'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') template['username'] = g.user.username, index += 1 return jsonify(all_template) class Service_Pipeline_ModelView(Service_Pipeline_ModelView_Base,MyappModelView,DeleteMixin): datamodel = SQLAInterface(Service_Pipeline) # base_order = ("changed_on", "desc") # order_columns = ['changed_on'] appbuilder.add_view_no_menu(Service_Pipeline_ModelView) # 添加api class Service_Pipeline_ModelView_Api(Service_Pipeline_ModelView_Base,MyappModelRestApi): datamodel = SQLAInterface(Service_Pipeline) route_base = '/service_pipeline_modelview/api' show_columns = ['project','name','describe','namespace','node_selector','image_pull_policy','env','dag_json','run_id','created_by','changed_by','created_on','changed_on','expand'] list_columns = ['project','service_pipeline_url','creator','modified','operate_html'] add_columns = ['project','name','describe','namespace','node_selector','image_pull_policy','dag_json','env','expand'] edit_columns = add_columns appbuilder.add_api(Service_Pipeline_ModelView_Api)