diff --git a/myapp/tasks/schedules.py b/myapp/tasks/schedules.py index ea286fa5..d424bbe6 100644 --- a/myapp/tasks/schedules.py +++ b/myapp/tasks/schedules.py @@ -99,7 +99,7 @@ def delete_old_crd(object_info): name=crd_object['name']) # push_message(conf.get('ADMIN_USER', '').split(','), '%s %s 因上层workflow %s 已删除,现删除' % (object_info['plural'], crd_object['name'], run_id)) time.sleep(10) - if model_map[object_info['plural']] in model_map: + if object_info['plural'] in model_map: db_crds = dbsession.query(model_map[object_info['plural']]).filter(model_map[object_info['plural']].name.in_(crd_names)).all() for db_crd in db_crds: db_crd.status = 'Deleted' @@ -113,13 +113,15 @@ def delete_old_crd(object_info): if crd_object['create_time'] < (datetime.datetime.now() - datetime.timedelta(seconds=timeout)).strftime('%Y-%m-%d %H:%M:%S'): if object_info['plural']=='workflows': username='' + label=json.loads(crd_object['labels']) + pipeline_id = label.get('pipeline-id','') if 'run-rtx' in label: username = label['run-rtx'] elif 'upload-rtx' in label: username = label['upload-rtx'] if username: - push_message([username]+conf.get('ADMIN_USER','').split(','),'%s %s 创建时间 %s, 已经运行时间过久,注意修正'%(object_info['plural'],crd_object['name'],crd_object['create_time'])) + push_message([username]+conf.get('ADMIN_USER','').split(','),'%s %s %s %s 创建时间 %s, 已经运行时间过久,注意修正'%(username,object_info['plural'],crd_object['name'],pipeline_id,crd_object['create_time'])) else: # 如果运行结束已经1天,就直接删除 if crd_object['finish_time'] and crd_object['finish_time'] < (datetime.datetime.now() - datetime.timedelta(hours=3)).strftime('%Y-%m-%d %H:%M:%S'): @@ -179,15 +181,23 @@ def delete_tfjob(task): time.sleep(10) - # 删除framework - framework_info = conf.get("CRD_INFO", {}).get('framework', {}) - print(framework_info) - if framework_info: - delete_old_crd(framework_info) + vcjob_info = conf.get("CRD_INFO", {}).get('vcjob', {}) + print(vcjob_info) + if vcjob_info: + delete_old_crd(vcjob_info) time.sleep(10) + # # 删除framework + # framework_info = conf.get("CRD_INFO", {}).get('framework', {}) + # print(framework_info) + # if framework_info: + # delete_old_crd(framework_info) + # + # time.sleep(10) + + # 删除deployment clusters = conf.get('CLUSTERS', {}) for cluster_name in clusters: @@ -208,14 +218,15 @@ def delete_tfjob(task): print(e) # print(deploy) - try: - create_time = deploy.metadata.creation_timestamp.strftime('%Y-%m-%d') - delete_time=(datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y-%m-%d') - if create_time < delete_time: - print('kill %s'%deploy.metadata.name) - k8s_client.delete_deployment(namespace='pipeline', name=deploy.name) - except Exception as e: - print(e) + + # try: + # create_time = deploy.metadata.creation_timestamp.strftime('%Y-%m-%d') + # delete_time=(datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y-%m-%d') + # if create_time < delete_time: + # print('kill %s'%deploy.metadata.name) + # k8s_client.delete_deployment(namespace='pipeline', name=deploy.name) + # except Exception as e: + # print(e) @@ -243,14 +254,14 @@ def delete_tfjob(task): except Exception as e: print(e) - try: - create_time = daemon_set.metadata.creation_timestamp.strftime('%Y-%m-%d') - delete_time=(datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y-%m-%d') - if create_time < delete_time: - print('kill %s'%daemon_set.metadata.name) - k8s_client.AppsV1Api.delete_namespaced_daemon_set(namespace='pipeline', name=daemon_set.name) - except Exception as e: - print(e) + # try: + # create_time = daemon_set.metadata.creation_timestamp.strftime('%Y-%m-%d') + # delete_time=(datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y-%m-%d') + # if create_time < delete_time: + # print('kill %s'%daemon_set.metadata.name) + # k8s_client.AppsV1Api.delete_namespaced_daemon_set(namespace='pipeline', name=daemon_set.name) + # except Exception as e: + # print(e) except Exception as e: print(e) @@ -275,41 +286,20 @@ def delete_tfjob(task): k8s_client.AppsV1Api.delete_namespaced_stateful_set(namespace='pipeline', name=sts.name) except Exception as e: print(e) - try: - create_time = sts.metadata.creation_timestamp.strftime('%Y-%m-%d') - delete_time=(datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y-%m-%d') - if create_time < delete_time: - print('kill %s'%sts.metadata.name) - k8s_client.AppsV1Api.delete_namespaced_stateful_set(namespace='pipeline', name=sts.name) - except Exception as e: - print(e) + # try: + # create_time = sts.metadata.creation_timestamp.strftime('%Y-%m-%d') + # delete_time=(datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y-%m-%d') + # if create_time < delete_time: + # print('kill %s'%sts.metadata.name) + # k8s_client.AppsV1Api.delete_namespaced_stateful_set(namespace='pipeline', name=sts.name) + # except Exception as e: + # print(e) except Exception as e: print(e) time.sleep(60) - # 删除pod - clusters = conf.get('CLUSTERS', {}) - for cluster_name in clusters: - cluster = clusters[cluster_name] - try: - k8s_client = K8s(cluster['KUBECONFIG']) - pods = k8s_client.v1.list_namespaced_pod(namespace='pipeline').items - for pod in pods: - # print(pod) - try: - create_time = pod.metadata.creation_timestamp.strftime('%Y-%m-%d') - delete_time=(datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y-%m-%d') - if create_time < delete_time: - print('kill %s'%pod.metadata.name) - k8s_client.v1.delete_namespaced_pod(namespace='pipeline', name=pod.metadata.name) - except Exception as e: - print(e) - except Exception as e: - print(e) - - push_message(conf.get('ADMIN_USER','').split(','),'清理历史pod完成') @celery_app.task(name="task.delete_notebook", bind=True) @@ -331,9 +321,11 @@ def delete_notebook(task): vscode_pods = k8s_client.get_pods(namespace=namespace,pod_name=notebook.name) if vscode_pods: vscode_pod=vscode_pods[0] + # print(vscode_pod) k8s_client.delete_pods(namespace=namespace, pod_name=vscode_pod['name']) - user = vscode_pod['lables'].get('user', '') + user = vscode_pod['labels'].get('user', '') if user: + pass push_message([user], '您的notebook %s已清理释放资源,如果需要可reset后重新使用。' % vscode_pod['name']) else: message = '您的notebook %s即将过期,如要继续使用,请尽快续期,每次有效期3天\n' % notebook.name @@ -348,11 +340,19 @@ def delete_debug_docker(task): clusters = conf.get('CLUSTERS',{}) for cluster_name in clusters: cluster = clusters[cluster_name] - namespace = conf.get('NOTEBOOK_NAMESPACE') + notebook_namespace = conf.get('NOTEBOOK_NAMESPACE') + pipeline_namespace = conf.get('PIPELINE_NAMESPACE') k8s_client = K8s(cluster['KUBECONFIG']) - k8s_client.delete_pods(namespace=namespace,status='Succeeded') + k8s_client.delete_pods(namespace=notebook_namespace,status='Succeeded') + pipeline_pods = k8s_client.get_pods(pipeline_namespace) + for pod in pipeline_pods: + if pod['name'][0:6]=='debug-' or pod['name'][0:4]=='run-': + run_id = pod['labels'].get('run-id', '') + if run_id: + k8s_client.delete_workflow(all_crd_info=conf.get("CRD_INFO", {}), namespace=pipeline_namespace,run_id=run_id) + k8s_client.delete_pods(namespace=pipeline_namespace, labels={"run-id": run_id}) - push_message(conf.get('ADMIN_USER', '').split(','), 'notebook清理和续期通知完成') + push_message(conf.get('ADMIN_USER', '').split(','), 'notebook清理完成') # 推送微信消息 @@ -380,7 +380,7 @@ def deliver_message(pipeline,message=''): message = "pipeline: %s(%s) \nnamespace: %s\ncrontab: %s\ntime: %s\nfail start run:\n%s" % (pipeline.name,pipeline.describe, pipeline.namespace,pipeline.cron_time,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),message) push_message(receivers,message) - push_message(conf.get('ADMIN_USER').split(','),message) + # push_message(conf.get('ADMIN_USER').split(','),message) # @pysnooper.snoop() @@ -533,7 +533,6 @@ def upload_timerun(pipeline_id,stop_time): # 获取前一个定时调度的timerun pass_run = dbsession.query(RunHistory).filter(RunHistory.pipeline_id==pipeline.id).filter(RunHistory.execution_date>start_time).filter(RunHistory.execution_date start_time) \ + .filter(RunHistory.execution_date <= stop_time) \ + .order_by(RunHistory.execution_date.desc()).limit(pipeline.expired_limit) + latest_run_ids = [timerun.run_id for timerun in timeruns] # 可以运行的timerun + + # 如果有旧的在运行,就先删掉 + exist_workflows = pipeline.get_workflow() + for exist_workflow in exist_workflows: + argo_run_id = json.loads(exist_workflow['labels']).get('pipeline/runid','') + run_id = json.loads(exist_workflow['labels']).get('run-id', '') + if argo_run_id and run_id: + pass_run = dbsession.query(RunHistory).filter(RunHistory.pipeline_id == pipeline.id).filter(RunHistory.execution_date > start_time).filter(RunHistory.run_id == argo_run_id).first() + # 如果是定时任务发起的实例,并且已经过期,就直接删除 + if pass_run and argo_run_id not in latest_run_ids: + k8s_client = K8s(pipeline.project.cluster['KUBECONFIG']) + k8s_client.delete_workflow(all_crd_info=conf.get("CRD_INFO", {}), namespace='pipeline',run_id=run_id) + + # 如果有新的还没运行的,就运行 + for timerun in timeruns: + if timerun.status=='comed': + kwargs = { + "timerun_id": timerun.id, + "pipeline_id": pipeline_id + } + upload_workflow.apply_async(kwargs=kwargs, expires=120, retry=False) + + + + # 按时间顺序并发运行 else: # 检测正在运行的workflow与激活并发限制是否符合 running_workflows = pipeline.get_workflow() @@ -586,7 +617,6 @@ def upload_timerun(pipeline_id,stop_time): # if timerun.execution_date > datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'): # upload_workflow.apply_async(kwargs=kwargs,eta=datetime.datetime.strptime(timerun.execution_date,'%Y-%m-%d %H:%M:%S')) # else: - push_message(['pengluan'],'more workflow %s %s '%(str(kwargs),datetime.datetime.now())) upload_workflow.apply_async(kwargs=kwargs,expires=120,retry=False) except Exception as e: @@ -882,7 +912,7 @@ def watch_gpu(task): cluster = clusters[cluster_name] k8s_client = K8s(cluster['KUBECONFIG']) - all_gpu_pods=k8s_client.get_uesd_gpu(['pipeline','katib','jupyter','service']) + all_gpu_pods=k8s_client.get_uesd_gpu(namespaces=['pipeline','katib','jupyter','service']) print(all_gpu_pods) message = '' @@ -896,7 +926,170 @@ def watch_gpu(task): # push_admin("%s集群共已使用%s张卡"%(cluster_name,int(used_gpu))) +@celery_app.task(name="task.adjust_node_resource", bind=True) +@pysnooper.snoop() +def adjust_node_resource(task): + clusters = conf.get('CLUSTERS', {}) + for cluster_name in clusters: + cluster = clusters[cluster_name] + k8s_client = K8s(cluster['KUBECONFIG']) + all_node = k8s_client.get_node() + all_node_json = {} + # 获取每台机器的资源申请量 + for node in all_node: # list 转dict + ip = node['hostip'] + if node['labels'].get('cpu','false')=='true' or node['labels'].get('gpu','false')=='true': + all_node_json[ip] = node + all_node_json[ip]['used_memory'] = [] + all_node_json[ip]['used_cpu'] = [] + all_node_json[ip]['used_gpu'] = [] + + # print(all_node_json) + for namespace in ['jupyter', 'pipeline', 'katib', 'service']: + all_pods = k8s_client.get_pods(namespace=namespace) + for pod in all_pods: + if pod['status'] == 'Running': + # print(namespace,pod) + all_node_json[pod['host_ip']]['used_memory'].append(pod['memory']) + all_node_json[pod['host_ip']]['used_cpu'].append(pod['cpu']) + all_node_json[pod['host_ip']]['used_gpu'].append(pod['gpu']) + # print(all_node_json[pod['host_ip']]) + + for ip in all_node_json: + all_node_json[ip]['used_memory'] = int(sum(all_node_json[ip]['used_memory'])) + all_node_json[ip]['used_cpu'] = int(sum(all_node_json[ip]['used_cpu'])) + all_node_json[ip]['used_gpu'] = int(sum(all_node_json[ip]['used_gpu'])) + + + # 获取每个资源组的资源申请量,cpu机器和gpu单独看。 + all_org_resource={} + for ip in all_node_json: + org=all_node_json[ip]['labels'].get('org','public') + if org not in all_org_resource: + all_org_resource[org]={ + "cpu_node_num":0, + "gpu_node_num":0, + "cpu_req_total":0, + "gpu_req_total": 0, + "cpu_allocatable_total":0, + "gpu_allocatable_total":0 + } + if all_node_json[ip]['labels'].get('cpu','false')=='true': + all_org_resource[org]['cpu_node_num']+=1 + all_org_resource[org]['cpu_req_total'] += all_node_json[ip]['used_cpu'] + all_org_resource[org]['cpu_allocatable_total'] += all_node_json[ip]['cpu'] + + if all_node_json[ip]['labels'].get('gpu','false')=='true': + all_org_resource[org]['gpu_node_num']+=1 + all_org_resource[org]['gpu_req_total'] += all_node_json[ip]['used_gpu'] + all_org_resource[org]['gpu_allocatable_total'] += all_node_json[ip]['gpu'] + + # 计算申请率最大最小集群 + max_cpu_org=max_gpu_org=min_cpu_org=min_gpu_org='public' + max_cpu_per = max_gpu_per = 0 + min_cpu_per = min_gpu_per = 1 + for org in all_org_resource: + org_resource=all_org_resource[org] + if org_resource['cpu_node_num']>3: # 至少4台机器,才参与调度融合 + if org_resource['cpu_req_total']/org_resource['cpu_allocatable_total']>max_cpu_per: + max_cpu_per=org_resource['cpu_req_total']/org_resource['cpu_allocatable_total'] + max_cpu_org=org + if org_resource['cpu_req_total']/org_resource['cpu_allocatable_total']3: # 至少4台机器,才参与调度融合 + if org_resource['gpu_req_total']/org_resource['gpu_allocatable_total']>max_gpu_per: + max_gpu_per=org_resource['gpu_req_total']/org_resource['gpu_allocatable_total'] + max_gpu_org=org + if org_resource['gpu_req_total']/org_resource['gpu_allocatable_total']min_cpu_per+0.2: + org_node_cpu_per={} + for ip in all_node_json: + if all_node_json[ip]['labels'].get('org','')==min_cpu_org and all_node_json[ip]['labels'].get('cpu','false')=='true': + org_node_cpu_per[ip]=all_node_json[ip]['used_cpu']/all_node_json[ip]['cpu'] + org_node_cpu_per = sorted(org_node_cpu_per.items(), key=lambda x: x[1], reverse=False) # 从小到大排序 + print(org_node_cpu_per) + adjust_node = [node[0] for node in org_node_cpu_per[:1]] # 每次调整一台机器 + push_message(conf.get('ADMIN_USER').split(','),'集群 %s 调整项目组 %s 下 cpu机器 %s 到项目组%s'%(cluster_name,min_cpu_org,','.join(adjust_node),max_cpu_org)) + k8s_client.label_node(adjust_node,labels={"org":max_cpu_org}) + + + # 将差距最大的两个gpu资源组,进行调配 + if max_gpu_org!=min_gpu_org and max_gpu_per>min_gpu_per+0.2: + org_node_gpu_per={} + for ip in all_node_json: + if all_node_json[ip]['labels'].get('org','')==min_gpu_org and all_node_json[ip]['labels'].get('gpu','false')=='true': + org_node_gpu_per[ip]=all_node_json[ip]['used_gpu']/all_node_json[ip]['gpu'] + org_node_gpu_per = sorted(org_node_gpu_per.items(), key=lambda x: x[1], reverse=False) # 从小到大排序 + print(org_node_gpu_per) + adjust_node = [node[0] for node in org_node_gpu_per[:1]] # 每次调整一台机器 + push_message(conf.get('ADMIN_USER').split(','), '集群 %s 调整项目组 %s 下 gpu机器 %s 到项目组%s' % (cluster_name, min_gpu_org, ','.join(adjust_node), max_gpu_org)) + k8s_client.label_node(adjust_node,labels={"org":max_gpu_org}) + + + + +# 每5分钟做一次均衡,空闲资源每个集群按申请率划分 +# +# @celery_app.task(name="task.equalizer", bind=True) +# def equalizer(task): +# clusters = conf.get('CLUSTERS', {}) +# for cluster_name in clusters: +# # 获取下面不同项目组的资源 +# +# # 获取集群下面的所有机器,如果存在资源占用为空的就直接收回 +# cluster = clusters[cluster_name] +# k8s_client = K8s(cluster['KUBECONFIG']) +# k8s_client.get_node(label={'org'}) +# +# all_gpu_pods=k8s_client.get_uesd_gpu(namespaces=['pipeline','katib','jupyter','service']) +# +# print(all_gpu_pods) +# message = '' +# used_gpu = 0 +# for pod in all_gpu_pods: +# used_gpu+=pod['gpu'] +# message+=pod['namespace']+","+pod['user']+","+pod['name']+","+str(pod['gpu'])+"\n" +# print(message) +# message+="%s集群共已使用%s张卡"%(cluster_name,int(used_gpu)) +# push_message(conf.get('ADMIN_USER','').split(','),message) +# # push_admin("%s集群共已使用%s张卡"%(cluster_name,int(used_gpu))) +# + + + + +# def aa(): +# # 删除jupyter +# print('begin delete notebook') +# object_info = conf.get("CRD_INFO", {}).get('notebook', {}) +# print(object_info) +# timeout = int(object_info.get('timeout', 60 * 60 * 24 * 3)) +# with session_scope(nullpool=True) as dbsession: +# # 删除vscode的pod +# try: +# alert_time = datetime.datetime.now() - datetime.timedelta(seconds=timeout) + datetime.timedelta(days=1) +# notebooks = dbsession.query(Notebook).filter(Notebook.changed_on <= alert_time).all() # 需要删除或者需要通知续期的notebook +# for notebook in notebooks: +# if notebook.changed_on < (datetime.datetime.now() - datetime.timedelta(seconds=timeout) + datetime.timedelta(days=1)): +# message = '您的notebook %s即将过期,如要继续使用,请尽快续期,每次有效期3天\n' % notebook.name +# print(notebook.created_by.username,message) +# push_message([notebook.created_by.username], message) +# +# except Exception as e: +# print(e) + + +# # if __name__ == '__main__': # delete_tfjob(task=None) diff --git a/myapp/tools/watch_workflow.py b/myapp/tools/watch_workflow.py index e67e82ed..c75ec9a8 100644 --- a/myapp/tools/watch_workflow.py +++ b/myapp/tools/watch_workflow.py @@ -77,8 +77,9 @@ def deliver_message(workflow,dbsession): if finish_time: finish_time = (datetime.datetime.strptime(finish_time, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours=0)).strftime( '%Y-%m-%d %H:%M:%S') - help_url='%s#/search?namespace=pipeline&q=%s'%(conf.get('K8S_DASHBOARD_PIPELINE'),workflow.name) + help_url='http://%s/pipeline_modelview/web/pod/%s'%(conf.get('HOST'),pipeline_id) message = "workflow: %s \npipeline: %s(%s) \nnamespace: %s\nstatus: % s \nstart_time: %s\nfinish_time: %s\n" % (workflow.name,info_json.get('pipeline_name',''),info_json.get('describe',''),workflow.namespace,workflow.status,start_time,finish_time) + message+='\n' link={ "pod详情":help_url } @@ -231,9 +232,15 @@ def push_task_time(workflow,dbsession): ] task_pod_time_sorted = sorted(task_pod_time.items(),key=lambda item:item[0]) + max_task_run_time = 0 for task_pods in task_pod_time_sorted: for task_pod in task_pods[1]: message+=task_pod['task']+":"+task_pod['run_time']+"(h)\n" + try: + if float(task_pod['run_time'])>max_task_run_time: + max_task_run_time = float(task_pod['run_time']) + except Exception as e: + print(e) # 记录是否已经推送,不然反复推送不好 info_json = json.loads(workflow.info_json) @@ -243,15 +250,20 @@ def push_task_time(workflow,dbsession): info_json['push_task_time'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') workflow.info_json = json.dumps(info_json, indent=4, ensure_ascii=False) dbsession.commit() - - push_message(conf.get('ADMIN_USER').split(','),message) + message+="\n" + link = { + "点击查看资源的使用": "http://%s/pipeline_modelview/web/monitoring/%s"%(conf.get('HOST'),pipeline_id) + } + # 有单任务运行时长超过4个小时才通知 + if max_task_run_time>4: + push_message(conf.get('ADMIN_USER').split(','),message,link) alert_user = pipeline.alert_user.split(',') if pipeline.alert_user else [] alert_user = [user.strip() for user in alert_user if user.strip()] receivers = alert_user + [workflow.username] receivers = list(set(receivers)) - push_message(receivers,message) + push_message(receivers,message,link) @@ -302,9 +314,9 @@ def save_monitoring(workflow,dbsession): if task: task.monitoring = json.dumps(monitoring_new,ensure_ascii=False,indent=4) dbsession.commit() - # print(pods) push_task_time(workflow, dbsession) + push_resource_rec(workflow, dbsession) except Exception as e: diff --git a/myapp/utils/__init__.py b/myapp/utils/__init__.py index 13a83393..e69de29b 100644 --- a/myapp/utils/__init__.py +++ b/myapp/utils/__init__.py @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/myapp/utils/celery.py b/myapp/utils/celery.py index f3164787..d68a503c 100644 --- a/myapp/utils/celery.py +++ b/myapp/utils/celery.py @@ -13,16 +13,10 @@ from myapp import app, db logger = logging.getLogger(__name__) # Null pool is used for the celery workers due process forking side effects. -# For more info see: https://github.com/apache/superset/issues/10530 @contextmanager def session_scope(nullpool: bool) -> Iterator[Session]: """Provide a transactional scope around a series of operations.""" database_uri = app.config["SQLALCHEMY_DATABASE_URI"] - if "sqlite" in database_uri: - logger.warning( - "SQLite Database support for metadata databases will be removed \ - in a future version of Superset." - ) if nullpool: engine = create_engine(database_uri, poolclass=NullPool) session_class = sessionmaker() diff --git a/myapp/utils/commands.py b/myapp/utils/commands.py deleted file mode 100644 index 9c54b17a..00000000 --- a/myapp/utils/commands.py +++ /dev/null @@ -1,62 +0,0 @@ -# -*- coding:utf-8 -*- -""" -执行本地命令 -""" - -import os -import subprocess -from . import commands -import traceback - -class Command(object): - """ - 执行本地命令 - """ - - @staticmethod - def execute(cmd, stdout = subprocess.PIPE, stderr = subprocess.PIPE, use_async = False): - """ - - :param cmd: - :param stdout: - :param stderr: - :param async: - :return: - """ - r_stdout = None - r_stderr = None - try: - popen_obj = subprocess.Popen(cmd, stdin = None, - stdout = stdout, - stderr = stderr, - shell = True, - close_fds=True); - #非阻塞模式,直接返回 - if use_async: - r_stdout, r_stderr = None, None - else: - r_stdout = popen_obj.stdout.read().strip() - msg_stderr = popen_obj.stderr.read().strip() - if not msg_stderr.strip(): - r_stderr = msg_stderr - except Exception as ex: - r_stderr = traceback.format_exc() - finally: - try: - if not use_async: - if popen_obj.stdout is not None: - popen_obj.stdout.close() - - if popen_obj.stderr is not None: - popen_obj.stderr.close() - - if popen_obj.stdin is not None: - popen_obj.stdin.close() - popen_obj.terminate() - except: - pass - if r_stderr: - raise Exception("exe command [%s] failed, %s" % (cmd, r_stderr)) - return r_stdout - - \ No newline at end of file diff --git a/myapp/utils/core.py b/myapp/utils/core.py index 605562b6..f29120a5 100644 --- a/myapp/utils/core.py +++ b/myapp/utils/core.py @@ -1395,8 +1395,8 @@ def check_resource_memory(resource_memory,src_resource_memory=None): if resource_int<=src_resource_int: return resource - if resource_int>250000: - return '250G' + if resource_int>100000: + return '100G' else: return resource diff --git a/myapp/utils/dates.py b/myapp/utils/dates.py index a1826e2f..cc9d15b2 100644 --- a/myapp/utils/dates.py +++ b/myapp/utils/dates.py @@ -1,19 +1,4 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. + from datetime import datetime import pytz diff --git a/myapp/utils/py/py_k8s.py b/myapp/utils/py/py_k8s.py index 4e006070..0bd26404 100755 --- a/myapp/utils/py/py_k8s.py +++ b/myapp/utils/py/py_k8s.py @@ -156,6 +156,7 @@ class K8s(): back_node['memory'] = int(node.status.allocatable.get('memory', '0').replace('Ki', '')) // 1024//1024 back_node['gpu'] = int(node.status.allocatable.get('nvidia.com/gpu', '0')) back_node['labels']=node.metadata.labels + back_node['name']=node.metadata.name for address in adresses: if address.type=='InternalIP': back_node['hostip']=address.address @@ -170,7 +171,7 @@ class K8s(): return [] # 获取指定label的nodeip列表 - def label_node(self,ips, label): + def label_node(self,ips, labels): try: all_node_ip = [] all_node = self.v1.list_node().items @@ -189,9 +190,7 @@ class K8s(): if InternalIP in ips: body = { "metadata": { - "labels": { - label: "true" - } + "labels": labels } } api_response = self.v1.patch_node(Hostname, body) @@ -307,7 +306,7 @@ class K8s(): # vcjob的结束时间 elif 'status' in crd_object and 'state' in crd_object['status'] and 'lastTransitionTime' in crd_object['status']['state']: - if crd_object['status']['state'].get('phase','')=='Completed': + if crd_object['status']['state'].get('phase','')=='Completed' or crd_object['status']['state'].get('phase','')=='Aborted' or crd_object['status']['state'].get('phase','')=='Failed' or crd_object['status']['state'].get('phase','')=='Terminated': finish_time = crd_object['status']['state']['lastTransitionTime'].replace('T', ' ').replace('Z', '') finish_time = (datetime.datetime.strptime(finish_time, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S') @@ -1199,6 +1198,50 @@ class K8s(): + def create_hpa(self,namespace,name,min_replicas,max_replicas,mem_threshold=0.5,cpu_threshold=0.5): + + try: + client.AutoscalingV2beta2Api().delete_namespaced_horizontal_pod_autoscaler(name=name,namespace=namespace) + except Exception as e: + print(e) + + try: + my_metrics = [] + my_metrics.append(client.V2beta2MetricSpec(type='Resource', + resource=client.V2beta2ResourceMetricSource(name='memory', + target=client.V2beta2MetricTarget( + average_utilization=int(mem_threshold*100), + type='Utilization')))) + my_metrics.append(client.V2beta2MetricSpec(type='Resource', + resource=client.V2beta2ResourceMetricSource(name='cpu', + target=client.V2beta2MetricTarget( + average_utilization=int(cpu_threshold*100), + type='Utilization')))) + my_conditions = [] + my_conditions.append(client.V2beta2HorizontalPodAutoscalerCondition(status="True", type='AbleToScale')) + + status = client.V2beta2HorizontalPodAutoscalerStatus(conditions=my_conditions, current_replicas=max_replicas, + desired_replicas=max_replicas) + + body = client.V2beta2HorizontalPodAutoscaler( + api_version='autoscaling/v2beta2', + kind='HorizontalPodAutoscaler', + metadata=client.V1ObjectMeta(name=name), + spec=client.V2beta2HorizontalPodAutoscalerSpec( + max_replicas=max_replicas, + min_replicas=min_replicas, + metrics=my_metrics, + scale_target_ref=client.V2beta2CrossVersionObjectReference(kind='Deployment', name=name, + api_version='apps/v1'), + ), + status=status) + v2 = client.AutoscalingV2beta2Api() + ret = v2.create_namespaced_horizontal_pod_autoscaler(namespace=namespace, body=body, pretty=True) + except Exception as e: + print(e) + + + # @pysnooper.snoop() def to_memory_GB(self,memory): diff --git a/myapp/utils/py/py_prometheus.py b/myapp/utils/py/py_prometheus.py index 24a5ec59..e78eaf8e 100755 --- a/myapp/utils/py/py_prometheus.py +++ b/myapp/utils/py/py_prometheus.py @@ -27,9 +27,10 @@ class Prometheus(): def get_resource_metric(self,pod_name, namespace): max_cpu = 0 max_mem = 0 + ave_gpu = 0 # 这个pod 30分钟内的最大值 - mem_expr = "sum by (pod) (container_memory_usage_bytes{namespace='%s', pod=~'%s.*',container!='POD', container!=''})"%(namespace,pod_name) + mem_expr = "sum by (pod) (container_memory_working_set_bytes{namespace='%s', pod=~'%s.*',container!='POD', container!=''})"%(namespace,pod_name) # print(mem_expr) params={ 'query': mem_expr, @@ -53,8 +54,6 @@ class Prometheus(): except Exception as e: print(e) - return {} - cpu_expr = "sum by (pod) (rate(container_cpu_usage_seconds_total{namespace='%s',pod=~'%s.*',container!='POD'}[1m]))" % (namespace, pod_name) @@ -79,9 +78,33 @@ class Prometheus(): max_cpu = float(metric[1]) except Exception as e: print(e) - return {} - return {"cpu":round(max_cpu, 2),"memory":round(max_mem,2)} + + + gpu_expr = "avg by (exported_pod) (DCGM_FI_DEV_GPU_UTIL{exported_namespace='%s',exported_pod=~'%s.*'})" % (namespace, pod_name) + + params={ + 'query': gpu_expr, + 'start':(datetime.datetime.now()-datetime.timedelta(days=1)-datetime.timedelta(hours=8)).strftime('%Y-%m-%dT%H:%M:%S.000Z'), + 'end':datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.000Z'), + 'step':"1m", # 运行小于1分钟的,将不会被采集到 + # 'timeout':"30s" + } + print(params) + try: + + res = requests.get(url=self.query_range_path,params=params) + metrics = json.loads(res.content.decode('utf8', 'ignore')) + if metrics['status']=='success': + metrics=metrics['data']['result'] + if metrics: + metrics=metrics[0]['values'] + all_util = [float(metric[1]) for metric in metrics] + ave_gpu = sum(all_util)/len(all_util) + except Exception as e: + print(e) + + return {"cpu":round(max_cpu, 2),"memory":round(max_mem,2),'gpu':round(ave_gpu,2)} diff --git a/myapp/views/baseApi.py b/myapp/views/baseApi.py index ff51c634..55ce7b40 100644 --- a/myapp/views/baseApi.py +++ b/myapp/views/baseApi.py @@ -66,20 +66,14 @@ log = logging.getLogger(__name__) def get_error_msg(): - """ - (inspired on Superset code) - :return: (str) - """ + if current_app.config.get("FAB_API_SHOW_STACKTRACE"): return traceback.format_exc() return "Fatal error" def safe(f): - """ - A decorator that catches uncaught exceptions and - return the response in JSON format (inspired on Superset code) - """ + def wraps(self, *args, **kwargs): try: return f(self, *args, **kwargs) @@ -495,11 +489,11 @@ class MyappModelRestApi(ModelRestApi): return self.response_error(400,message="Request is not JSON") try: if self.pre_json_load: - json = self.pre_json_load(request.json) + json_data = self.pre_json_load(request.json) else: - json = request.json + json_data = request.json - item = self.add_model_schema.load(json) + item = self.add_model_schema.load(json_data) # item = self.add_model_schema.load(data) except ValidationError as err: return self.response_error(422,message=err.messages) @@ -554,10 +548,10 @@ class MyappModelRestApi(ModelRestApi): return self.response_error(404,message='Not found') try: if self.pre_json_load: - json = self.pre_json_load(request.json) + json_data = self.pre_json_load(request.json) else: - json = request.json - data = self._merge_update_item(item, json) + json_data = request.json + data = self._merge_update_item(item, json_data) item = self.edit_model_schema.load(data, instance=item) except ValidationError as err: return self.response_error(422,message=err.messages) diff --git a/myapp/views/home.py b/myapp/views/home.py index c07f5f63..ed380017 100644 --- a/myapp/views/home.py +++ b/myapp/views/home.py @@ -138,6 +138,7 @@ class Myapp(BaseMyappView): # /static/appbuilder/mnt/make_pipeline.mp4 message = '' td_html='%s' + message += "%s %s %s %s %s %s %s" %(td_html%"集群",td_html%"资源组", td_html%"机器", td_html%"机型", td_html%"cpu占用率", td_html%"内存占用率", td_html%"gpu占用率") for cluster_name in all_node_json: nodes = all_node_json[cluster_name] for ip in nodes: @@ -155,7 +156,7 @@ class Myapp(BaseMyappView): # print(message) data = { 'content': message, - 'delay': 30000, + 'delay': 300000, 'hit': True, 'target': url, 'title': '当前负载', diff --git a/myapp/views/log/__init__.py b/myapp/views/log/__init__.py index 37dfd164..b606bb2f 100644 --- a/myapp/views/log/__init__.py +++ b/myapp/views/log/__init__.py @@ -1,20 +1,4 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# pylint: disable=C,R,W + from flask_babel import lazy_gettext as _ diff --git a/myapp/views/log/api.py b/myapp/views/log/api.py index de10ea0a..ee19de85 100644 --- a/myapp/views/log/api.py +++ b/myapp/views/log/api.py @@ -1,19 +1,4 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. + from flask_appbuilder import ModelRestApi from flask_appbuilder.models.sqla.interface import SQLAInterface diff --git a/myapp/views/utils.py b/myapp/views/utils.py index 15f61ea5..9cbbaf0c 100644 --- a/myapp/views/utils.py +++ b/myapp/views/utils.py @@ -49,8 +49,6 @@ def get_permissions(user): for perm in role.permissions: if perm.permission and perm.view_menu: perms.add((perm.permission.name, perm.view_menu.name)) - if perm.permission.name in ("datasource_access", "database_access"): - permissions[perm.permission.name].add(perm.view_menu.name) roles[role.name] = [ [perm.permission.name, perm.view_menu.name] for perm in role.permissions