cube-studio/myapp/views/view_service_pipeline.py

792 lines
37 KiB
Python
Raw Normal View History

2022-02-26 22:36:57 +08:00
from flask_appbuilder.models.sqla.interface import SQLAInterface
from flask_babel import lazy_gettext as _
2022-08-11 10:47:08 +08:00
2022-02-26 22:36:57 +08:00
from myapp.models.model_service_pipeline import Service_Pipeline
from myapp.models.model_job import Repository
from flask import jsonify
2022-02-26 22:36:57 +08:00
from flask_appbuilder.forms import GeneralModelConverter
from myapp.utils import core
from myapp import app, appbuilder,db
2022-02-26 22:36:57 +08:00
from wtforms.ext.sqlalchemy.fields import QuerySelectField
from wtforms.validators import DataRequired, Length, Regexp
from sqlalchemy import or_
2022-02-26 22:36:57 +08:00
from myapp.exceptions import MyappException
from wtforms import StringField, SelectField
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget,Select2ManyWidget,Select2Widget
2022-10-10 11:44:53 +08:00
from myapp.forms import MyBS3TextAreaFieldWidget,MySelectMultipleField
import copy
2022-02-26 22:36:57 +08:00
from .baseApi import (
MyappModelRestApi
)
from flask import (
flash,
g,
make_response,
redirect,
request,
)
from myapp import security_manager
from myapp.views.view_team import filter_join_org_project
from .base import (
DeleteMixin,
get_user_roles,
MyappFilter,
MyappModelView,
json_response
)
from flask_appbuilder import expose
import pysnooper,datetime, json
2022-02-26 22:36:57 +08:00
conf = app.config
logging = app.logger
class Service_Pipeline_Filter(MyappFilter):
# @pysnooper.snoop()
def apply(self, query, func):
user_roles = [role.name.lower() for role in list(self.get_user_roles())]
if "admin" in user_roles:
return query
join_projects_id = security_manager.get_join_projects_id(db.session)
# public_project_id =
# logging.info(join_projects_id)
return query.filter(
or_(
self.model.project_id.in_(join_projects_id),
# self.model.project.name.in_(['public'])
)
)
class Service_Pipeline_ModelView_Base():
label_title='任务流'
datamodel = SQLAInterface(Service_Pipeline)
check_redirect_list_url = conf.get('MODEL_URLS',{}).get('service_pipeline','')
2022-02-26 22:36:57 +08:00
base_permissions = ['can_show','can_edit','can_list','can_delete','can_add']
base_order = ("changed_on", "desc")
# order_columns = ['id','changed_on']
order_columns = ['id']
list_columns = ['project','service_pipeline_url','creator','modified','operate_html']
add_columns = ['project','name','describe','namespace','images','env','resource_memory', 'resource_cpu','resource_gpu', 'replicas','dag_json','alert_status','alert_user','parameter']
show_columns = ['project','name','describe','namespace','run_id','created_by','changed_by','created_on','changed_on','expand_html','parameter_html']
edit_columns = add_columns
2022-08-11 10:47:08 +08:00
base_filters = [["id", Service_Pipeline_Filter, lambda: []]]
2022-02-26 22:36:57 +08:00
conv = GeneralModelConverter(datamodel)
add_form_extra_fields = {
"name": StringField(
_(datamodel.obj.lab('name')),
2022-08-16 11:09:52 +08:00
description="英文名(小写字母、数字、- 组成)最长50个字符",
2022-02-26 22:36:57 +08:00
widget=BS3TextFieldWidget(),
validators=[Regexp("^[a-z][a-z0-9\-]*[a-z0-9]$"),Length(1,54),DataRequired()]
),
"project":QuerySelectField(
_(datamodel.obj.lab('project')),
query_factory=filter_join_org_project,
allow_blank=True,
widget=Select2Widget()
),
"dag_json": StringField(
_(datamodel.obj.lab('dag_json')),
widget=MyBS3TextAreaFieldWidget(rows=10), # 传给widget函数的是外层的field对象以及widget函数的参数
),
"namespace": StringField(
_(datamodel.obj.lab('namespace')),
description="部署task所在的命名空间(目前无需填写)",
default='service',
widget=BS3TextFieldWidget()
),
"images":StringField(
_(datamodel.obj.lab('images')),
2022-06-11 21:17:25 +08:00
default='ccr.ccs.tencentyun.com/cube-studio/service-pipeline',
2022-02-26 22:36:57 +08:00
description="推理服务镜像",
widget=BS3TextFieldWidget(),
validators=[DataRequired()]
),
"node_selector": StringField(
_(datamodel.obj.lab('node_selector')),
description="部署task所在的机器(目前无需填写)",
widget=BS3TextFieldWidget(),
default=datamodel.obj.node_selector.default.arg
),
"image_pull_policy": SelectField(
_(datamodel.obj.lab('image_pull_policy')),
description="镜像拉取策略(always为总是拉取远程镜像IfNotPresent为若本地存在则使用本地镜像)",
widget=Select2Widget(),
choices=[['Always','Always'],['IfNotPresent','IfNotPresent']]
),
"alert_status":MySelectMultipleField(
label=_(datamodel.obj.lab('alert_status')),
widget=Select2ManyWidget(),
choices=[[x, x] for x in
['Created', 'Pending', 'Running', 'Succeeded', 'Failed', 'Unknown', 'Waiting', 'Terminated']],
description="选择通知状态"
),
"alert_user": StringField(
label=_(datamodel.obj.lab('alert_user')),
widget=BS3TextFieldWidget(),
description="选择通知用户,每个用户使用逗号分隔"
),
"label": StringField(_(datamodel.obj.lab('label')), description='中文名', widget=BS3TextFieldWidget(),
validators=[DataRequired()]),
"resource_memory": StringField(_(datamodel.obj.lab('resource_memory')),
default=Service_Pipeline.resource_memory.default.arg,
description='内存的资源使用限制示例1G10G 最大100G如需更多联系管路员', widget=BS3TextFieldWidget(),
validators=[DataRequired()]),
"resource_cpu": StringField(_(datamodel.obj.lab('resource_cpu')), default=Service_Pipeline.resource_cpu.default.arg,
description='cpu的资源使用限制(单位核),示例 0.410最大50核如需更多联系管路员',
widget=BS3TextFieldWidget(), validators=[DataRequired()]),
2022-07-26 20:47:49 +08:00
"resource_gpu": StringField(_(datamodel.obj.lab('resource_gpu')), default='0',
2022-02-26 22:36:57 +08:00
description='gpu的资源使用限制(单位卡),示例:12训练任务每个容器独占整卡。申请具体的卡型号可以类似 1(V100),目前支持T4/V100/A100/VGPU',
widget=BS3TextFieldWidget()),
"replicas": StringField(_(datamodel.obj.lab('replicas')), default=Service_Pipeline.replicas.default.arg,
description='pod副本数用来配置高可用', widget=BS3TextFieldWidget(), validators=[DataRequired()]),
"env": StringField(_(datamodel.obj.lab('env')), default=Service_Pipeline.env.default.arg,
description='使用模板的task自动添加的环境变量支持模板变量。书写格式:每行一个环境变量env_key=env_value',
widget=MyBS3TextAreaFieldWidget()),
}
edit_form_extra_fields = add_form_extra_fields
# 检测是否具有编辑权限只有creator和admin可以编辑
def check_edit_permission(self, item):
user_roles = [role.name.lower() for role in list(get_user_roles())]
if "admin" in user_roles:
return True
if g.user and g.user.username and hasattr(item,'created_by'):
if g.user.username==item.created_by.username:
return True
flash('just creator can edit/delete ', 'warning')
return False
# 验证args参数
@pysnooper.snoop(watch_explode=('item'))
def service_pipeline_args_check(self, item):
core.validate_str(item.name,'name')
if not item.dag_json:
item.dag_json='{}'
core.validate_json(item.dag_json)
# 校验task的关系没有闭环并且顺序要对。没有配置的自动没有上游独立
# @pysnooper.snoop()
def order_by_upstream(dag_json):
order_dag={}
tasks_name = list(dag_json.keys()) # 如果没有配全的话可能只有局部的task
i=0
while tasks_name:
i+=1
if i>100: # 不会有100个依赖关系
break
for task_name in tasks_name:
# 没有上游的情况
if not dag_json[task_name]:
order_dag[task_name]=dag_json[task_name]
tasks_name.remove(task_name)
continue
# 没有上游的情况
elif 'upstream' not in dag_json[task_name] or not dag_json[task_name]['upstream']:
order_dag[task_name] = dag_json[task_name]
tasks_name.remove(task_name)
continue
# 如果有上游依赖的话,先看上游任务是否已经加到里面了。
upstream_all_ready=True
for upstream_task_name in dag_json[task_name]['upstream']:
if upstream_task_name not in order_dag:
upstream_all_ready=False
if upstream_all_ready:
order_dag[task_name]=dag_json[task_name]
tasks_name.remove(task_name)
else:
dag_json[task_name]['upstream']=[]
order_dag[task_name] = dag_json[task_name]
tasks_name.remove(task_name)
if list(dag_json.keys()).sort()!=list(order_dag.keys()).sort():
flash('dag service pipeline 存在循环或未知上游',category='warning')
raise MyappException('dag service pipeline 存在循环或未知上游')
return order_dag
# 配置上缺少的默认上游
dag_json = json.loads(item.dag_json)
item.dag_json = json.dumps(order_by_upstream(copy.deepcopy(dag_json)),ensure_ascii=False,indent=4)
# raise Exception('args is not valid')
# @pysnooper.snoop()
def pre_add(self, item):
item.name = item.name.replace('_', '-')[0:54].lower().strip('-')
# item.alert_status = ','.join(item.alert_status)
# self.service_pipeline_args_check(item)
item.create_datetime=datetime.datetime.now()
item.change_datetime = datetime.datetime.now()
item.parameter = json.dumps({}, indent=4, ensure_ascii=False)
item.volume_mount = item.project.volume_mount + ",%s(configmap):/config/" % item.name
# @pysnooper.snoop()
def pre_update(self, item):
if item.expand:
core.validate_json(item.expand)
item.expand = json.dumps(json.loads(item.expand),indent=4,ensure_ascii=False)
else:
item.expand='{}'
item.name = item.name.replace('_', '-')[0:54].lower()
item.alert_status = ','.join(item.alert_status)
# self.service_pipeline_args_check(item)
item.change_datetime = datetime.datetime.now()
item.parameter = json.dumps(json.loads(item.parameter),indent=4,ensure_ascii=False) if item.parameter else '{}'
item.dag_json = json.dumps(json.loads(item.dag_json), indent=4,ensure_ascii=False) if item.dag_json else '{}'
item.volume_mount = item.project.volume_mount + ",%s(configmap):/config/" % item.name
@expose("/my/list/")
def my(self):
try:
user_id=g.user.id
if user_id:
service_pipelines = db.session.query(Service_Pipeline).filter_by(created_by_fk=user_id).all()
back=[]
for service_pipeline in service_pipelines:
back.append(service_pipeline.to_json())
return json_response(message='success',status=0,result=back)
except Exception as e:
print(e)
return json_response(message=str(e),status=-1,result={})
def check_service_pipeline_perms(user_fun):
# @pysnooper.snoop()
def wraps(*args, **kwargs):
service_pipeline_id = int(kwargs.get('service_pipeline_id','0'))
if not service_pipeline_id:
response = make_response("service_pipeline_id not exist")
response.status_code = 404
return response
user_roles = [role.name.lower() for role in g.user.roles]
if "admin" in user_roles:
return user_fun(*args, **kwargs)
join_projects_id = security_manager.get_join_projects_id(db.session)
service_pipeline = db.session.query(Service_Pipeline).filter_by(id=service_pipeline_id).first()
if service_pipeline.project.id in join_projects_id:
return user_fun(*args, **kwargs)
response = make_response("no perms to run pipeline %s"%service_pipeline_id)
response.status_code = 403
return response
return wraps
# 构建同步服务
def build_http(self,service_pipeline):
pass
# 构建异步服务
@pysnooper.snoop()
def build_mq_consumer(self,service_pipeline):
namespace = conf.get('SERVICE_PIPELINE_NAMESPACE')
name = service_pipeline.name
2022-10-12 11:06:14 +08:00
command = service_pipeline.command
2022-02-26 22:36:57 +08:00
image_secrets = conf.get('HUBSECRET', [])
user_hubsecrets = db.session.query(Repository.hubsecret).filter(Repository.created_by_fk == g.user.id).all()
if user_hubsecrets:
for hubsecret in user_hubsecrets:
if hubsecret[0] not in image_secrets:
image_secrets.append(hubsecret[0])
from myapp.utils.py.py_k8s import K8s
k8s_client = K8s(service_pipeline.project.cluster.get('KUBECONFIG',''))
2022-02-26 22:36:57 +08:00
dag_json=service_pipeline.dag_json if service_pipeline.dag_json else '{}'
# 生成服务使用的configmap
config_data = {
"dag.json":dag_json
}
k8s_client.create_configmap(namespace=namespace,name=name,data=config_data,labels={'app':name})
env = service_pipeline.env
if conf.get('SERVICE_PIPELINE_JAEGER',''):
env['JAEGER_HOST']=conf.get('SERVICE_PIPELINE_JAEGER','')
env['SERVICE_NAME'] = name
2022-07-26 20:47:49 +08:00
labels = {"app": name, "user": service_pipeline.created_by.username,"pod-type":"service-pipeline"}
2022-02-26 22:36:57 +08:00
k8s_client.create_deployment(
namespace=namespace,
name=name,
replicas=service_pipeline.replicas,
2022-07-26 20:47:49 +08:00
labels=labels,
2022-02-26 22:36:57 +08:00
# command=['sh','-c',command] if command else None,
command=['bash', '-c', "python mq-pipeline/cube_kafka.py"],
args=None,
volume_mount=service_pipeline.volume_mount,
working_dir=service_pipeline.working_dir,
node_selector=service_pipeline.get_node_selector(),
resource_memory=service_pipeline.resource_memory,
resource_cpu=service_pipeline.resource_cpu,
resource_gpu=service_pipeline.resource_gpu if service_pipeline.resource_gpu else '',
image_pull_policy=conf.get('IMAGE_PULL_POLICY','Always'),
2022-02-26 22:36:57 +08:00
image_pull_secrets=image_secrets,
image=service_pipeline.images,
hostAliases=conf.get('HOSTALIASES',''),
env=env,
privileged=False,
accounts=None,
username=service_pipeline.created_by.username,
ports=None
)
pass
# 只能有一个入口。不能同时接口两个队列
# # @event_logger.log_this
@expose("/run_service_pipeline/<service_pipeline_id>", methods=["GET", "POST"])
@check_service_pipeline_perms
def run_service_pipeline(self,service_pipeline_id):
service_pipeline = db.session.query(Service_Pipeline).filter_by(id=service_pipeline_id).first()
dag_json = json.loads(service_pipeline.dag_json)
root_nodes_name = service_pipeline.get_root_node_name()
self.clear(service_pipeline_id)
if root_nodes_name:
root_node_name=root_nodes_name[0]
root_node=dag_json[root_node_name]
# 构建异步
if root_node['template-group']=='endpoint' and root_node['template']=='mq':
self.build_mq_consumer(service_pipeline)
# 构建同步
if root_node['template-group'] == 'endpoint' and root_node['template'] == 'gateway':
self.build_http(service_pipeline)
return redirect("/service_pipeline_modelview/web/log/%s"%service_pipeline_id)
# return redirect(run_url)
# # @event_logger.log_this
@expose("/web/<service_pipeline_id>", methods=["GET"])
def web(self,service_pipeline_id):
2022-10-12 11:06:14 +08:00
service_pipeline = db.session.query(Service_Pipeline).filter_by(id=service_pipeline_id).first()
2022-02-26 22:36:57 +08:00
# service_pipeline.dag_json = service_pipeline.fix_dag_json()
# service_pipeline.expand = json.dumps(service_pipeline.fix_position(), indent=4, ensure_ascii=False)
db.session.commit()
print(service_pipeline_id)
data = {
"url": '/static/appbuilder/vison/index.html?pipeline_id=%s'%service_pipeline_id # 前后端集成完毕,这里需要修改掉
}
# 返回模板
return self.render_template('link.html', data=data)
# # @event_logger.log_this
@expose("/web/log/<service_pipeline_id>", methods=["GET"])
def web_log(self,service_pipeline_id):
2023-04-06 23:23:11 +08:00
pass
2022-02-26 22:36:57 +08:00
# 链路跟踪
@expose("/web/monitoring/<service_pipeline_id>", methods=["GET"])
def web_monitoring(self,service_pipeline_id):
service_pipeline = db.session.query(Service_Pipeline).filter_by(id=int(service_pipeline_id)).first()
if service_pipeline.run_id:
2023-04-06 23:23:11 +08:00
url = "http://"+service_pipeline.project.cluster.get('HOST', request.host)+conf.get('GRAFANA_TASK_PATH')+ service_pipeline.name
2022-02-26 22:36:57 +08:00
return redirect(url)
else:
flash('no running instance','warning')
return redirect('/service_pipeline_modelview/web/%s'%service_pipeline.id)
# # @event_logger.log_this
@expose("/web/pod/<service_pipeline_id>", methods=["GET"])
def web_pod(self,service_pipeline_id):
service_pipeline = db.session.query(Service_Pipeline).filter_by(id=service_pipeline_id).first()
data = {
2023-04-06 23:23:11 +08:00
"url": "http://" + service_pipeline.project.cluster.get('HOST', request.host) + conf.get('K8S_DASHBOARD_CLUSTER','/k8s/dashboard/cluster/') + '#/search?namespace=%s&q=%s' % (conf.get('SERVICE_PIPELINE_NAMESPACE'), service_pipeline.name.replace('_', '-')),
2022-02-26 22:36:57 +08:00
"target":"div.kd-chrome-container.kd-bg-background",
"delay":500,
"loading": True
}
# 返回模板
if service_pipeline.project.cluster['NAME']==conf.get('ENVIRONMENT'):
return self.render_template('link.html', data=data)
else:
return self.render_template('external_link.html', data=data)
@expose('/clear/<service_id>', methods=['POST', "GET"])
def clear(self, service_pipeline_id):
service_pipeline = db.session.query(Service_Pipeline).filter_by(id=service_pipeline_id).first()
from myapp.utils.py.py_k8s import K8s
k8s_client = K8s(service_pipeline.project.cluster.get('KUBECONFIG',''))
2022-02-26 22:36:57 +08:00
namespace = conf.get('SERVICE_PIPELINE_NAMESPACE')
k8s_client.delete_deployment(namespace=namespace, name=service_pipeline.name)
2022-09-11 11:29:29 +08:00
flash('服务清理完成', category='success')
2022-02-26 22:36:57 +08:00
return redirect('/service_pipeline_modelview/list/')
@expose("/config/<service_pipeline_id>",methods=("GET",'POST'))
def pipeline_config(self,service_pipeline_id):
print(service_pipeline_id)
pipeline = db.session.query(Service_Pipeline).filter_by(id=service_pipeline_id).first()
if not pipeline:
return jsonify({
"status":1,
"message":"服务流不存在",
"result":{}
})
if request.method.lower()=='post':
data = request.get_json()
request_config =data.get('config',{})
request_dag = data.get('dag_json', {})
if request_config:
pipeline.config = json.dumps(request_config,indent=4,ensure_ascii=False)
if request_dag:
pipeline.dag_json = json.dumps(request_dag, indent=4, ensure_ascii=False)
db.session.commit()
config = {
"id":pipeline.id,
"name":pipeline.name,
"label":pipeline.describe,
"project":pipeline.project.describe,
"pipeline_ui_config":{
"alert":{
"alert_user":{
"type": "str",
"item_type": "str",
"label": "报警用户",
"require": 1,
"choice": [],
"range": "",
"default": "",
"placeholder": "报警用户名,逗号分隔",
"describe": "报警用户,逗号分隔",
"editable": 1,
"condition": "",
"sub_args": {}
}
}
},
"pipeline_jump_button": [
{
"name":"资源查看",
"action_url":"",
"icon_svg":'<svg t="1644980982636" class="icon" viewBox="0 0 1024 1024" version="1.1" xmlns="http://www.w3.org/2000/svg" p-id="2611" width="128" height="128"><path d="M913.937279 113.328092c-32.94432-32.946366-76.898391-51.089585-123.763768-51.089585s-90.819448 18.143219-123.763768 51.089585L416.737356 362.999454c-32.946366 32.94432-51.089585 76.898391-51.089585 123.763768s18.143219 90.819448 51.087539 123.763768c25.406646 25.40767 57.58451 42.144866 93.053326 48.403406 1.76418 0.312108 3.51915 0.463558 5.249561 0.463558 14.288424 0 26.951839-10.244318 29.519314-24.802896 2.879584-16.322757-8.016581-31.889291-24.339338-34.768875-23.278169-4.106528-44.38386-15.081487-61.039191-31.736818-21.61018-21.61018-33.509185-50.489928-33.509185-81.322144s11.899004-59.711963 33.509185-81.322144l15.864316-15.864316c-0.267083 1.121544-0.478907 2.267647-0.6191 3.440355-1.955538 16.45988 9.800203 31.386848 26.260084 33.344432 25.863041 3.072989 49.213865 14.378475 67.527976 32.692586 21.608134 21.608134 33.509185 50.489928 33.509185 81.322144s-11.901051 59.71401-33.509185 81.322144L318.53987 871.368764c-21.61018 21.61018-50.489928 33.511231-81.322144 33.511231-30.832216 0-59.711963-11.901051-81.322144-33.511231-21.61018-21.61018-33.509185-50.489928-33.509185-81.322144s11.899004-59.711963 33.509185-81.322144l169.43597-169.438017c11.720949-11.718903 11.720949-30.722722 0-42.441625-11.718903-11.718903-30.722722-11.718903-42.441625 0L113.452935 666.282852c-32.946366 32.94432-51.089585 76.898391-51.089585 123.763768 0 46.865377 18.143219 90.819448 51.089585 123.763768 32.94432 32.946366 76.898391 51.091632 123.763768 51.091632s90.819448-18.145266 123.763768-51.091632l249.673409-249.671363c32.946366-32.94432 51.089585-76.898391 51.089585-123.763768-0.002047-46.865377-18.145266-90.819448-51.089585-123.763768-27.5341-27.536146-64.073294-45.240367-102.885252-49.854455-3.618411-0.428765-7.161097-0.196475-10.508331 0.601704l211.589023-211.589023c21.61018-21.61018 50.489928-33.509185 81.322144-33.509185s59.711963 11.899004 81.322144 33.509185c21.61018 21.61018 33.509185 50.489928 33.509185 81.322144s-11.899004 59.711963-33.509185 81.322144l-150.180418 150.182464c-11.720949 11.718903-11.720949 30.722722 0 42.441625 11.718903 11.718903 30.722722 11.718903 42.441625 0l150.180418-150.182464c32.946366-32.94432 51.089585-76.898391 51.089585-123.763768C965.026864 190.226482 946.882622 146.272411 913.937279 113.328092z" p-id="2612" fill="#225ed2"></path></svg>'
}
],
"pipeline_run_button": [
],
"task_jump_button": [],
"dag_json":json.loads(pipeline.dag_json),
"config": json.loads(pipeline.config),
"message": "success",
"status": 0
}
return jsonify(config)
@expose("/template/list/")
def template_list(self):
all_template={
"message": "success",
"templte_common_ui_config":{
},
"template_group_order": ["入口", "逻辑节点", "功能节点"],
"templte_list": {
"入口":[
{
"template_name": "kafka",
"template_id": 1,
"templte_ui_config": {
"shell": {
"topic": {
"type": "str",
"item_type": "str",
"label": "topic",
"require": 1,
"choice": [],
"range": "",
"default": "predict",
"placeholder": "",
"describe": "kafka topic",
"editable": 1,
"condition": "",
"sub_args": {}
},
"consumer_num": {
"type": "str",
"item_type": "str",
"label": "消费者数目",
"require": 1,
"choice": [],
"range": "",
"default": "4",
"placeholder": "",
"describe": "消费者数目",
"editable": 1,
"condition": "",
"sub_args": {}
},
"bootstrap_servers": {
"type": "str",
"item_type": "str",
"label": "地址",
"require": 1,
"choice": [],
"range": "",
"default": "127.0.0.1:9092",
"placeholder": "",
"describe": "xx.xx.xx.xx:9092,xx.xx.xx.xx:9092",
"editable": 1,
"condition": "",
"sub_args": {}
},
"group": {
"type": "str",
"item_type": "str",
"label": "分组",
"require": 1,
"choice": [],
"range": "",
"default": "predict",
"placeholder": "",
"describe": "消费者分组",
"editable": 1,
"condition": "",
"sub_args": {}
}
}
},
2022-06-11 21:17:25 +08:00
"username": g.user.username,
2022-02-26 22:36:57 +08:00
"changed_on": datetime.datetime.now(),
"created_on": datetime.datetime.now(),
"label": "kafka",
"describe": "消费kafka数据",
"help_url": "",
"pass_through": {
# 无论什么内容 通过task的字段透传回来
}
}
],
"逻辑节点":[
{
"template_name": "switch",
"template_id": 2,
"templte_ui_config": {
"shell": {
"case": {
"type": "text",
"item_type": "str",
"label": "表达式",
"require": 1,
"choice": [],
"range": "",
"default": "int(input['node2'])<3:node4,node5:'3'\ndefault:node6:'0'",
"placeholder": "",
"describe": "条件:下游节点:输出 其中input为节点输入",
"editable": 1,
"condition": "",
"sub_args": {}
}
}
},
2022-06-11 21:17:25 +08:00
"username": g.user.username,
2022-02-26 22:36:57 +08:00
"changed_on": datetime.datetime.now(),
"created_on": datetime.datetime.now(),
"label": "switch-case逻辑节点",
"describe": "控制数据的流量",
"help_url": "",
"pass_through": {
# 无论什么内容 通过task的字段透传回来
}
},
],
"功能节点": [
{
"template_name": "http",
"template_id": 3,
"templte_ui_config": {
"shell": {
"method": {
"type": "choice",
"item_type": "str",
"label": "请求方式",
"require": 1,
"choice": ["GET","POST"],
"range": "",
"default": "POST",
"placeholder": "",
"describe": "请求方式",
"editable": 1,
"condition": "",
"sub_args": {}
},
"url": {
"type": "str",
"item_type": "str",
"label": "请求地址",
"require": 1,
"choice": [],
"range": "",
"default": "http://127.0.0.1:8080/api",
"placeholder": "",
"describe": "请求地址",
"editable": 1,
"condition": "",
"sub_args": {}
},
"header": {
"type": "text",
"item_type": "str",
"label": "请求头",
"require": 1,
"choice": [],
"range": "",
"default": "{}",
"placeholder": "",
"describe": "请求头",
"editable": 1,
"condition": "",
"sub_args": {}
},
"timeout": {
"type": "int",
"item_type": "str",
"label": "请求超时",
"require": 1,
"choice": [],
"range": "",
"default": "300",
"placeholder": "",
"describe": "请求超时",
"editable": 1,
"condition": "",
"sub_args": {}
},
"date": {
"type": "text",
"item_type": "str",
"label": "请求内容",
"require": 1,
"choice": [],
"range": "",
"default": "{}",
"placeholder": "",
"describe": "请求内容",
"editable": 1,
"condition": "",
"sub_args": {}
}
}
},
2022-06-11 21:17:25 +08:00
"username": g.user.username,
2022-02-26 22:36:57 +08:00
"changed_on": datetime.datetime.now(),
"created_on": datetime.datetime.now(),
"label": "http请求",
"describe": "http请求",
"help_url": "",
"pass_through": {
# 无论什么内容 通过task的字段透传回来
}
},
{
"template_name": "自定义方法",
"template_id": 4,
"templte_ui_config": {
"shell": {
"sdk_path": {
"type": "str",
"item_type": "str",
"label": "函数文件地址",
"require": 1,
"choice": [],
"range": "",
"default": "",
"placeholder": "",
"describe": "函数文件地址文件名和python类型要相同",
"editable": 1,
"condition": "",
"sub_args": {}
},
}
},
2022-06-11 21:17:25 +08:00
"username": g.user.username,
2022-02-26 22:36:57 +08:00
"changed_on": datetime.datetime.now(),
"created_on": datetime.datetime.now(),
"label": "http请求",
"describe": "http请求",
"help_url": "",
"pass_through": {
# 无论什么内容 通过task的字段透传回来
}
}
],
},
"status": 0
}
index = 1
for group in all_template['templte_list']:
for template in all_template['templte_list'][group]:
template['template_id'] = index
template['changed_on'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
template['created_on'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
2022-06-11 21:17:25 +08:00
template['username'] = g.user.username,
2022-02-26 22:36:57 +08:00
index += 1
return jsonify(all_template)
class Service_Pipeline_ModelView(Service_Pipeline_ModelView_Base,MyappModelView,DeleteMixin):
datamodel = SQLAInterface(Service_Pipeline)
# base_order = ("changed_on", "desc")
# order_columns = ['changed_on']
2022-08-28 20:24:10 +08:00
appbuilder.add_view_no_menu(Service_Pipeline_ModelView)
2022-02-26 22:36:57 +08:00
# 添加api
class Service_Pipeline_ModelView_Api(Service_Pipeline_ModelView_Base,MyappModelRestApi):
datamodel = SQLAInterface(Service_Pipeline)
route_base = '/service_pipeline_modelview/api'
show_columns = ['project','name','describe','namespace','node_selector','image_pull_policy','env','dag_json','run_id','created_by','changed_by','created_on','changed_on','expand']
list_columns = ['project','service_pipeline_url','creator','modified','operate_html']
2022-02-26 22:36:57 +08:00
add_columns = ['project','name','describe','namespace','node_selector','image_pull_policy','dag_json','env','expand']
edit_columns = add_columns
appbuilder.add_api(Service_Pipeline_ModelView_Api)