adjust node resource

This commit is contained in:
pengluan 2021-11-25 18:07:05 +08:00
parent b395680360
commit ab5c423f80
14 changed files with 363 additions and 229 deletions

View File

@ -99,7 +99,7 @@ def delete_old_crd(object_info):
name=crd_object['name'])
# push_message(conf.get('ADMIN_USER', '').split(','), '%s %s 因上层workflow %s 已删除,现删除' % (object_info['plural'], crd_object['name'], run_id))
time.sleep(10)
if model_map[object_info['plural']] in model_map:
if object_info['plural'] in model_map:
db_crds = dbsession.query(model_map[object_info['plural']]).filter(model_map[object_info['plural']].name.in_(crd_names)).all()
for db_crd in db_crds:
db_crd.status = 'Deleted'
@ -113,13 +113,15 @@ def delete_old_crd(object_info):
if crd_object['create_time'] < (datetime.datetime.now() - datetime.timedelta(seconds=timeout)).strftime('%Y-%m-%d %H:%M:%S'):
if object_info['plural']=='workflows':
username=''
label=json.loads(crd_object['labels'])
pipeline_id = label.get('pipeline-id','')
if 'run-rtx' in label:
username = label['run-rtx']
elif 'upload-rtx' in label:
username = label['upload-rtx']
if username:
push_message([username]+conf.get('ADMIN_USER','').split(','),'%s %s 创建时间 %s 已经运行时间过久,注意修正'%(object_info['plural'],crd_object['name'],crd_object['create_time']))
push_message([username]+conf.get('ADMIN_USER','').split(','),'%s %s %s %s 创建时间 %s 已经运行时间过久,注意修正'%(username,object_info['plural'],crd_object['name'],pipeline_id,crd_object['create_time']))
else:
# 如果运行结束已经1天就直接删除
if crd_object['finish_time'] and crd_object['finish_time'] < (datetime.datetime.now() - datetime.timedelta(hours=3)).strftime('%Y-%m-%d %H:%M:%S'):
@ -179,15 +181,23 @@ def delete_tfjob(task):
time.sleep(10)
# 删除framework
framework_info = conf.get("CRD_INFO", {}).get('framework', {})
print(framework_info)
if framework_info:
delete_old_crd(framework_info)
vcjob_info = conf.get("CRD_INFO", {}).get('vcjob', {})
print(vcjob_info)
if vcjob_info:
delete_old_crd(vcjob_info)
time.sleep(10)
# # 删除framework
# framework_info = conf.get("CRD_INFO", {}).get('framework', {})
# print(framework_info)
# if framework_info:
# delete_old_crd(framework_info)
#
# time.sleep(10)
# 删除deployment
clusters = conf.get('CLUSTERS', {})
for cluster_name in clusters:
@ -208,14 +218,15 @@ def delete_tfjob(task):
print(e)
# print(deploy)
try:
create_time = deploy.metadata.creation_timestamp.strftime('%Y-%m-%d')
delete_time=(datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y-%m-%d')
if create_time < delete_time:
print('kill %s'%deploy.metadata.name)
k8s_client.delete_deployment(namespace='pipeline', name=deploy.name)
except Exception as e:
print(e)
# try:
# create_time = deploy.metadata.creation_timestamp.strftime('%Y-%m-%d')
# delete_time=(datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y-%m-%d')
# if create_time < delete_time:
# print('kill %s'%deploy.metadata.name)
# k8s_client.delete_deployment(namespace='pipeline', name=deploy.name)
# except Exception as e:
# print(e)
@ -243,14 +254,14 @@ def delete_tfjob(task):
except Exception as e:
print(e)
try:
create_time = daemon_set.metadata.creation_timestamp.strftime('%Y-%m-%d')
delete_time=(datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y-%m-%d')
if create_time < delete_time:
print('kill %s'%daemon_set.metadata.name)
k8s_client.AppsV1Api.delete_namespaced_daemon_set(namespace='pipeline', name=daemon_set.name)
except Exception as e:
print(e)
# try:
# create_time = daemon_set.metadata.creation_timestamp.strftime('%Y-%m-%d')
# delete_time=(datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y-%m-%d')
# if create_time < delete_time:
# print('kill %s'%daemon_set.metadata.name)
# k8s_client.AppsV1Api.delete_namespaced_daemon_set(namespace='pipeline', name=daemon_set.name)
# except Exception as e:
# print(e)
except Exception as e:
print(e)
@ -275,41 +286,20 @@ def delete_tfjob(task):
k8s_client.AppsV1Api.delete_namespaced_stateful_set(namespace='pipeline', name=sts.name)
except Exception as e:
print(e)
try:
create_time = sts.metadata.creation_timestamp.strftime('%Y-%m-%d')
delete_time=(datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y-%m-%d')
if create_time < delete_time:
print('kill %s'%sts.metadata.name)
k8s_client.AppsV1Api.delete_namespaced_stateful_set(namespace='pipeline', name=sts.name)
except Exception as e:
print(e)
# try:
# create_time = sts.metadata.creation_timestamp.strftime('%Y-%m-%d')
# delete_time=(datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y-%m-%d')
# if create_time < delete_time:
# print('kill %s'%sts.metadata.name)
# k8s_client.AppsV1Api.delete_namespaced_stateful_set(namespace='pipeline', name=sts.name)
# except Exception as e:
# print(e)
except Exception as e:
print(e)
time.sleep(60)
# 删除pod
clusters = conf.get('CLUSTERS', {})
for cluster_name in clusters:
cluster = clusters[cluster_name]
try:
k8s_client = K8s(cluster['KUBECONFIG'])
pods = k8s_client.v1.list_namespaced_pod(namespace='pipeline').items
for pod in pods:
# print(pod)
try:
create_time = pod.metadata.creation_timestamp.strftime('%Y-%m-%d')
delete_time=(datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y-%m-%d')
if create_time < delete_time:
print('kill %s'%pod.metadata.name)
k8s_client.v1.delete_namespaced_pod(namespace='pipeline', name=pod.metadata.name)
except Exception as e:
print(e)
except Exception as e:
print(e)
push_message(conf.get('ADMIN_USER','').split(','),'清理历史pod完成')
@celery_app.task(name="task.delete_notebook", bind=True)
@ -331,9 +321,11 @@ def delete_notebook(task):
vscode_pods = k8s_client.get_pods(namespace=namespace,pod_name=notebook.name)
if vscode_pods:
vscode_pod=vscode_pods[0]
# print(vscode_pod)
k8s_client.delete_pods(namespace=namespace, pod_name=vscode_pod['name'])
user = vscode_pod['lables'].get('user', '')
user = vscode_pod['labels'].get('user', '')
if user:
pass
push_message([user], '您的notebook %s已清理释放资源如果需要可reset后重新使用。' % vscode_pod['name'])
else:
message = '您的notebook %s即将过期如要继续使用请尽快续期每次有效期3天\n' % notebook.name
@ -348,11 +340,19 @@ def delete_debug_docker(task):
clusters = conf.get('CLUSTERS',{})
for cluster_name in clusters:
cluster = clusters[cluster_name]
namespace = conf.get('NOTEBOOK_NAMESPACE')
notebook_namespace = conf.get('NOTEBOOK_NAMESPACE')
pipeline_namespace = conf.get('PIPELINE_NAMESPACE')
k8s_client = K8s(cluster['KUBECONFIG'])
k8s_client.delete_pods(namespace=namespace,status='Succeeded')
k8s_client.delete_pods(namespace=notebook_namespace,status='Succeeded')
pipeline_pods = k8s_client.get_pods(pipeline_namespace)
for pod in pipeline_pods:
if pod['name'][0:6]=='debug-' or pod['name'][0:4]=='run-':
run_id = pod['labels'].get('run-id', '')
if run_id:
k8s_client.delete_workflow(all_crd_info=conf.get("CRD_INFO", {}), namespace=pipeline_namespace,run_id=run_id)
k8s_client.delete_pods(namespace=pipeline_namespace, labels={"run-id": run_id})
push_message(conf.get('ADMIN_USER', '').split(','), 'notebook清理和续期通知完成')
push_message(conf.get('ADMIN_USER', '').split(','), 'notebook清理完成')
# 推送微信消息
@ -380,7 +380,7 @@ def deliver_message(pipeline,message=''):
message = "pipeline: %s(%s) \nnamespace: %s\ncrontab: %s\ntime: %s\nfail start run:\n%s" % (pipeline.name,pipeline.describe, pipeline.namespace,pipeline.cron_time,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),message)
push_message(receivers,message)
push_message(conf.get('ADMIN_USER').split(','),message)
# push_message(conf.get('ADMIN_USER').split(','),message)
# @pysnooper.snoop()
@ -533,7 +533,6 @@ def upload_timerun(pipeline_id,stop_time):
# 获取前一个定时调度的timerun
pass_run = dbsession.query(RunHistory).filter(RunHistory.pipeline_id==pipeline.id).filter(RunHistory.execution_date>start_time).filter(RunHistory.execution_date<timerun.execution_date).order_by(RunHistory.execution_date.desc()).first()
if not pass_run:
push_message(['pengluan'], 'no pass_run %s %s ' % (str(kwargs), datetime.datetime.now()))
upload_workflow.apply_async(kwargs=kwargs,expires=120,retry=False)
elif pass_run.status=='created':
# 这里要注意处理一下 watch组件坏了或者argo controller组件坏了的情况。以及误操作在workflow界面把记录删除了的情况
@ -541,7 +540,6 @@ def upload_timerun(pipeline_id,stop_time):
if workflow:
if workflow.status == 'Deleted' or workflow.status == 'Succeeded':
print('pass workflow success finish')
push_message(['pengluan'],'Succeeded workflow %s %s ' % (str(kwargs), datetime.datetime.now()))
upload_workflow.apply_async(kwargs=kwargs,expires=120,retry=False)
else:
@ -565,9 +563,42 @@ def upload_timerun(pipeline_id,stop_time):
label = json.loads(crd['labels'])
if crd['status']=='Succeeded' and label.get('pipeline/runid','')==pass_run.run_id:
print('pass workflow success finish')
push_message(['pengluan'],'no workflow %s %s ' % (str(kwargs), datetime.datetime.now()))
upload_workflow.apply_async(kwargs=kwargs,expires=120,retry=False)
# 按时间倒序只保留最新的n个实例之前的要删掉
elif pipeline.expired_limit:
# 获取最新的n个
timeruns = dbsession.query(RunHistory) \
.filter(RunHistory.pipeline_id == pipeline.id) \
.filter(RunHistory.execution_date > start_time) \
.filter(RunHistory.execution_date <= stop_time) \
.order_by(RunHistory.execution_date.desc()).limit(pipeline.expired_limit)
latest_run_ids = [timerun.run_id for timerun in timeruns] # 可以运行的timerun
# 如果有旧的在运行,就先删掉
exist_workflows = pipeline.get_workflow()
for exist_workflow in exist_workflows:
argo_run_id = json.loads(exist_workflow['labels']).get('pipeline/runid','')
run_id = json.loads(exist_workflow['labels']).get('run-id', '')
if argo_run_id and run_id:
pass_run = dbsession.query(RunHistory).filter(RunHistory.pipeline_id == pipeline.id).filter(RunHistory.execution_date > start_time).filter(RunHistory.run_id == argo_run_id).first()
# 如果是定时任务发起的实例,并且已经过期,就直接删除
if pass_run and argo_run_id not in latest_run_ids:
k8s_client = K8s(pipeline.project.cluster['KUBECONFIG'])
k8s_client.delete_workflow(all_crd_info=conf.get("CRD_INFO", {}), namespace='pipeline',run_id=run_id)
# 如果有新的还没运行的,就运行
for timerun in timeruns:
if timerun.status=='comed':
kwargs = {
"timerun_id": timerun.id,
"pipeline_id": pipeline_id
}
upload_workflow.apply_async(kwargs=kwargs, expires=120, retry=False)
# 按时间顺序并发运行
else:
# 检测正在运行的workflow与激活并发限制是否符合
running_workflows = pipeline.get_workflow()
@ -586,7 +617,6 @@ def upload_timerun(pipeline_id,stop_time):
# if timerun.execution_date > datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'):
# upload_workflow.apply_async(kwargs=kwargs,eta=datetime.datetime.strptime(timerun.execution_date,'%Y-%m-%d %H:%M:%S'))
# else:
push_message(['pengluan'],'more workflow %s %s '%(str(kwargs),datetime.datetime.now()))
upload_workflow.apply_async(kwargs=kwargs,expires=120,retry=False)
except Exception as e:
@ -882,7 +912,7 @@ def watch_gpu(task):
cluster = clusters[cluster_name]
k8s_client = K8s(cluster['KUBECONFIG'])
all_gpu_pods=k8s_client.get_uesd_gpu(['pipeline','katib','jupyter','service'])
all_gpu_pods=k8s_client.get_uesd_gpu(namespaces=['pipeline','katib','jupyter','service'])
print(all_gpu_pods)
message = ''
@ -896,7 +926,170 @@ def watch_gpu(task):
# push_admin("%s集群共已使用%s张卡"%(cluster_name,int(used_gpu)))
@celery_app.task(name="task.adjust_node_resource", bind=True)
@pysnooper.snoop()
def adjust_node_resource(task):
clusters = conf.get('CLUSTERS', {})
for cluster_name in clusters:
cluster = clusters[cluster_name]
k8s_client = K8s(cluster['KUBECONFIG'])
all_node = k8s_client.get_node()
all_node_json = {}
# 获取每台机器的资源申请量
for node in all_node: # list 转dict
ip = node['hostip']
if node['labels'].get('cpu','false')=='true' or node['labels'].get('gpu','false')=='true':
all_node_json[ip] = node
all_node_json[ip]['used_memory'] = []
all_node_json[ip]['used_cpu'] = []
all_node_json[ip]['used_gpu'] = []
# print(all_node_json)
for namespace in ['jupyter', 'pipeline', 'katib', 'service']:
all_pods = k8s_client.get_pods(namespace=namespace)
for pod in all_pods:
if pod['status'] == 'Running':
# print(namespace,pod)
all_node_json[pod['host_ip']]['used_memory'].append(pod['memory'])
all_node_json[pod['host_ip']]['used_cpu'].append(pod['cpu'])
all_node_json[pod['host_ip']]['used_gpu'].append(pod['gpu'])
# print(all_node_json[pod['host_ip']])
for ip in all_node_json:
all_node_json[ip]['used_memory'] = int(sum(all_node_json[ip]['used_memory']))
all_node_json[ip]['used_cpu'] = int(sum(all_node_json[ip]['used_cpu']))
all_node_json[ip]['used_gpu'] = int(sum(all_node_json[ip]['used_gpu']))
# 获取每个资源组的资源申请量cpu机器和gpu单独看。
all_org_resource={}
for ip in all_node_json:
org=all_node_json[ip]['labels'].get('org','public')
if org not in all_org_resource:
all_org_resource[org]={
"cpu_node_num":0,
"gpu_node_num":0,
"cpu_req_total":0,
"gpu_req_total": 0,
"cpu_allocatable_total":0,
"gpu_allocatable_total":0
}
if all_node_json[ip]['labels'].get('cpu','false')=='true':
all_org_resource[org]['cpu_node_num']+=1
all_org_resource[org]['cpu_req_total'] += all_node_json[ip]['used_cpu']
all_org_resource[org]['cpu_allocatable_total'] += all_node_json[ip]['cpu']
if all_node_json[ip]['labels'].get('gpu','false')=='true':
all_org_resource[org]['gpu_node_num']+=1
all_org_resource[org]['gpu_req_total'] += all_node_json[ip]['used_gpu']
all_org_resource[org]['gpu_allocatable_total'] += all_node_json[ip]['gpu']
# 计算申请率最大最小集群
max_cpu_org=max_gpu_org=min_cpu_org=min_gpu_org='public'
max_cpu_per = max_gpu_per = 0
min_cpu_per = min_gpu_per = 1
for org in all_org_resource:
org_resource=all_org_resource[org]
if org_resource['cpu_node_num']>3: # 至少4台机器才参与调度融合
if org_resource['cpu_req_total']/org_resource['cpu_allocatable_total']>max_cpu_per:
max_cpu_per=org_resource['cpu_req_total']/org_resource['cpu_allocatable_total']
max_cpu_org=org
if org_resource['cpu_req_total']/org_resource['cpu_allocatable_total']<min_cpu_per:
min_cpu_per=org_resource['cpu_req_total']/org_resource['cpu_allocatable_total']
min_cpu_org=org
if org_resource['gpu_node_num']>3: # 至少4台机器才参与调度融合
if org_resource['gpu_req_total']/org_resource['gpu_allocatable_total']>max_gpu_per:
max_gpu_per=org_resource['gpu_req_total']/org_resource['gpu_allocatable_total']
max_gpu_org=org
if org_resource['gpu_req_total']/org_resource['gpu_allocatable_total']<min_gpu_per:
min_gpu_per=org_resource['gpu_req_total']/org_resource['gpu_allocatable_total']
min_gpu_org=org
print(all_org_resource)
# 如果差别最大的两个不同的资源组cpu申请率差距在20%,则将申请率最小的资源组中的申请率最小的机器转为到另一个资源组
print(max_cpu_org,min_cpu_org,max_gpu_org,min_gpu_org)
if max_cpu_org!=min_cpu_org and max_cpu_per>min_cpu_per+0.2:
org_node_cpu_per={}
for ip in all_node_json:
if all_node_json[ip]['labels'].get('org','')==min_cpu_org and all_node_json[ip]['labels'].get('cpu','false')=='true':
org_node_cpu_per[ip]=all_node_json[ip]['used_cpu']/all_node_json[ip]['cpu']
org_node_cpu_per = sorted(org_node_cpu_per.items(), key=lambda x: x[1], reverse=False) # 从小到大排序
print(org_node_cpu_per)
adjust_node = [node[0] for node in org_node_cpu_per[:1]] # 每次调整一台机器
push_message(conf.get('ADMIN_USER').split(','),'集群 %s 调整项目组 %s 下 cpu机器 %s 到项目组%s'%(cluster_name,min_cpu_org,','.join(adjust_node),max_cpu_org))
k8s_client.label_node(adjust_node,labels={"org":max_cpu_org})
# 将差距最大的两个gpu资源组进行调配
if max_gpu_org!=min_gpu_org and max_gpu_per>min_gpu_per+0.2:
org_node_gpu_per={}
for ip in all_node_json:
if all_node_json[ip]['labels'].get('org','')==min_gpu_org and all_node_json[ip]['labels'].get('gpu','false')=='true':
org_node_gpu_per[ip]=all_node_json[ip]['used_gpu']/all_node_json[ip]['gpu']
org_node_gpu_per = sorted(org_node_gpu_per.items(), key=lambda x: x[1], reverse=False) # 从小到大排序
print(org_node_gpu_per)
adjust_node = [node[0] for node in org_node_gpu_per[:1]] # 每次调整一台机器
push_message(conf.get('ADMIN_USER').split(','), '集群 %s 调整项目组 %s 下 gpu机器 %s 到项目组%s' % (cluster_name, min_gpu_org, ','.join(adjust_node), max_gpu_org))
k8s_client.label_node(adjust_node,labels={"org":max_gpu_org})
# 每5分钟做一次均衡空闲资源每个集群按申请率划分
#
# @celery_app.task(name="task.equalizer", bind=True)
# def equalizer(task):
# clusters = conf.get('CLUSTERS', {})
# for cluster_name in clusters:
# # 获取下面不同项目组的资源
#
# # 获取集群下面的所有机器,如果存在资源占用为空的就直接收回
# cluster = clusters[cluster_name]
# k8s_client = K8s(cluster['KUBECONFIG'])
# k8s_client.get_node(label={'org'})
#
# all_gpu_pods=k8s_client.get_uesd_gpu(namespaces=['pipeline','katib','jupyter','service'])
#
# print(all_gpu_pods)
# message = ''
# used_gpu = 0
# for pod in all_gpu_pods:
# used_gpu+=pod['gpu']
# message+=pod['namespace']+","+pod['user']+","+pod['name']+","+str(pod['gpu'])+"\n"
# print(message)
# message+="%s集群共已使用%s张卡"%(cluster_name,int(used_gpu))
# push_message(conf.get('ADMIN_USER','').split(','),message)
# # push_admin("%s集群共已使用%s张卡"%(cluster_name,int(used_gpu)))
#
# def aa():
# # 删除jupyter
# print('begin delete notebook')
# object_info = conf.get("CRD_INFO", {}).get('notebook', {})
# print(object_info)
# timeout = int(object_info.get('timeout', 60 * 60 * 24 * 3))
# with session_scope(nullpool=True) as dbsession:
# # 删除vscode的pod
# try:
# alert_time = datetime.datetime.now() - datetime.timedelta(seconds=timeout) + datetime.timedelta(days=1)
# notebooks = dbsession.query(Notebook).filter(Notebook.changed_on <= alert_time).all() # 需要删除或者需要通知续期的notebook
# for notebook in notebooks:
# if notebook.changed_on < (datetime.datetime.now() - datetime.timedelta(seconds=timeout) + datetime.timedelta(days=1)):
# message = '您的notebook %s即将过期如要继续使用请尽快续期每次有效期3天\n' % notebook.name
# print(notebook.created_by.username,message)
# push_message([notebook.created_by.username], message)
#
# except Exception as e:
# print(e)
#
# if __name__ == '__main__':
# delete_tfjob(task=None)

View File

@ -77,8 +77,9 @@ def deliver_message(workflow,dbsession):
if finish_time:
finish_time = (datetime.datetime.strptime(finish_time, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours=0)).strftime(
'%Y-%m-%d %H:%M:%S')
help_url='%s#/search?namespace=pipeline&q=%s'%(conf.get('K8S_DASHBOARD_PIPELINE'),workflow.name)
help_url='http://%s/pipeline_modelview/web/pod/%s'%(conf.get('HOST'),pipeline_id)
message = "workflow: %s \npipeline: %s(%s) \nnamespace: %s\nstatus: % s \nstart_time: %s\nfinish_time: %s\n" % (workflow.name,info_json.get('pipeline_name',''),info_json.get('describe',''),workflow.namespace,workflow.status,start_time,finish_time)
message+='\n'
link={
"pod详情":help_url
}
@ -231,9 +232,15 @@ def push_task_time(workflow,dbsession):
]
task_pod_time_sorted = sorted(task_pod_time.items(),key=lambda item:item[0])
max_task_run_time = 0
for task_pods in task_pod_time_sorted:
for task_pod in task_pods[1]:
message+=task_pod['task']+":"+task_pod['run_time']+"(h)\n"
try:
if float(task_pod['run_time'])>max_task_run_time:
max_task_run_time = float(task_pod['run_time'])
except Exception as e:
print(e)
# 记录是否已经推送,不然反复推送不好
info_json = json.loads(workflow.info_json)
@ -243,15 +250,20 @@ def push_task_time(workflow,dbsession):
info_json['push_task_time'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
workflow.info_json = json.dumps(info_json, indent=4, ensure_ascii=False)
dbsession.commit()
push_message(conf.get('ADMIN_USER').split(','),message)
message+="\n"
link = {
"点击查看资源的使用": "http://%s/pipeline_modelview/web/monitoring/%s"%(conf.get('HOST'),pipeline_id)
}
# 有单任务运行时长超过4个小时才通知
if max_task_run_time>4:
push_message(conf.get('ADMIN_USER').split(','),message,link)
alert_user = pipeline.alert_user.split(',') if pipeline.alert_user else []
alert_user = [user.strip() for user in alert_user if user.strip()]
receivers = alert_user + [workflow.username]
receivers = list(set(receivers))
push_message(receivers,message)
push_message(receivers,message,link)
@ -302,9 +314,9 @@ def save_monitoring(workflow,dbsession):
if task:
task.monitoring = json.dumps(monitoring_new,ensure_ascii=False,indent=4)
dbsession.commit()
# print(pods)
push_task_time(workflow, dbsession)
push_resource_rec(workflow, dbsession)
except Exception as e:

View File

@ -1,16 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

View File

@ -13,16 +13,10 @@ from myapp import app, db
logger = logging.getLogger(__name__)
# Null pool is used for the celery workers due process forking side effects.
# For more info see: https://github.com/apache/superset/issues/10530
@contextmanager
def session_scope(nullpool: bool) -> Iterator[Session]:
"""Provide a transactional scope around a series of operations."""
database_uri = app.config["SQLALCHEMY_DATABASE_URI"]
if "sqlite" in database_uri:
logger.warning(
"SQLite Database support for metadata databases will be removed \
in a future version of Superset."
)
if nullpool:
engine = create_engine(database_uri, poolclass=NullPool)
session_class = sessionmaker()

View File

@ -1,62 +0,0 @@
# -*- coding:utf-8 -*-
"""
执行本地命令
"""
import os
import subprocess
from . import commands
import traceback
class Command(object):
"""
执行本地命令
"""
@staticmethod
def execute(cmd, stdout = subprocess.PIPE, stderr = subprocess.PIPE, use_async = False):
"""
:param cmd:
:param stdout:
:param stderr:
:param async:
:return:
"""
r_stdout = None
r_stderr = None
try:
popen_obj = subprocess.Popen(cmd, stdin = None,
stdout = stdout,
stderr = stderr,
shell = True,
close_fds=True);
#非阻塞模式,直接返回
if use_async:
r_stdout, r_stderr = None, None
else:
r_stdout = popen_obj.stdout.read().strip()
msg_stderr = popen_obj.stderr.read().strip()
if not msg_stderr.strip():
r_stderr = msg_stderr
except Exception as ex:
r_stderr = traceback.format_exc()
finally:
try:
if not use_async:
if popen_obj.stdout is not None:
popen_obj.stdout.close()
if popen_obj.stderr is not None:
popen_obj.stderr.close()
if popen_obj.stdin is not None:
popen_obj.stdin.close()
popen_obj.terminate()
except:
pass
if r_stderr:
raise Exception("exe command [%s] failed, %s" % (cmd, r_stderr))
return r_stdout

View File

@ -1395,8 +1395,8 @@ def check_resource_memory(resource_memory,src_resource_memory=None):
if resource_int<=src_resource_int:
return resource
if resource_int>250000:
return '250G'
if resource_int>100000:
return '100G'
else:
return resource

View File

@ -1,19 +1,4 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
import pytz

View File

@ -156,6 +156,7 @@ class K8s():
back_node['memory'] = int(node.status.allocatable.get('memory', '0').replace('Ki', '')) // 1024//1024
back_node['gpu'] = int(node.status.allocatable.get('nvidia.com/gpu', '0'))
back_node['labels']=node.metadata.labels
back_node['name']=node.metadata.name
for address in adresses:
if address.type=='InternalIP':
back_node['hostip']=address.address
@ -170,7 +171,7 @@ class K8s():
return []
# 获取指定label的nodeip列表
def label_node(self,ips, label):
def label_node(self,ips, labels):
try:
all_node_ip = []
all_node = self.v1.list_node().items
@ -189,9 +190,7 @@ class K8s():
if InternalIP in ips:
body = {
"metadata": {
"labels": {
label: "true"
}
"labels": labels
}
}
api_response = self.v1.patch_node(Hostname, body)
@ -307,7 +306,7 @@ class K8s():
# vcjob的结束时间
elif 'status' in crd_object and 'state' in crd_object['status'] and 'lastTransitionTime' in crd_object['status']['state']:
if crd_object['status']['state'].get('phase','')=='Completed':
if crd_object['status']['state'].get('phase','')=='Completed' or crd_object['status']['state'].get('phase','')=='Aborted' or crd_object['status']['state'].get('phase','')=='Failed' or crd_object['status']['state'].get('phase','')=='Terminated':
finish_time = crd_object['status']['state']['lastTransitionTime'].replace('T', ' ').replace('Z', '')
finish_time = (datetime.datetime.strptime(finish_time, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')
@ -1199,6 +1198,50 @@ class K8s():
def create_hpa(self,namespace,name,min_replicas,max_replicas,mem_threshold=0.5,cpu_threshold=0.5):
try:
client.AutoscalingV2beta2Api().delete_namespaced_horizontal_pod_autoscaler(name=name,namespace=namespace)
except Exception as e:
print(e)
try:
my_metrics = []
my_metrics.append(client.V2beta2MetricSpec(type='Resource',
resource=client.V2beta2ResourceMetricSource(name='memory',
target=client.V2beta2MetricTarget(
average_utilization=int(mem_threshold*100),
type='Utilization'))))
my_metrics.append(client.V2beta2MetricSpec(type='Resource',
resource=client.V2beta2ResourceMetricSource(name='cpu',
target=client.V2beta2MetricTarget(
average_utilization=int(cpu_threshold*100),
type='Utilization'))))
my_conditions = []
my_conditions.append(client.V2beta2HorizontalPodAutoscalerCondition(status="True", type='AbleToScale'))
status = client.V2beta2HorizontalPodAutoscalerStatus(conditions=my_conditions, current_replicas=max_replicas,
desired_replicas=max_replicas)
body = client.V2beta2HorizontalPodAutoscaler(
api_version='autoscaling/v2beta2',
kind='HorizontalPodAutoscaler',
metadata=client.V1ObjectMeta(name=name),
spec=client.V2beta2HorizontalPodAutoscalerSpec(
max_replicas=max_replicas,
min_replicas=min_replicas,
metrics=my_metrics,
scale_target_ref=client.V2beta2CrossVersionObjectReference(kind='Deployment', name=name,
api_version='apps/v1'),
),
status=status)
v2 = client.AutoscalingV2beta2Api()
ret = v2.create_namespaced_horizontal_pod_autoscaler(namespace=namespace, body=body, pretty=True)
except Exception as e:
print(e)
# @pysnooper.snoop()
def to_memory_GB(self,memory):

View File

@ -27,9 +27,10 @@ class Prometheus():
def get_resource_metric(self,pod_name, namespace):
max_cpu = 0
max_mem = 0
ave_gpu = 0
# 这个pod 30分钟内的最大值
mem_expr = "sum by (pod) (container_memory_usage_bytes{namespace='%s', pod=~'%s.*',container!='POD', container!=''})"%(namespace,pod_name)
mem_expr = "sum by (pod) (container_memory_working_set_bytes{namespace='%s', pod=~'%s.*',container!='POD', container!=''})"%(namespace,pod_name)
# print(mem_expr)
params={
'query': mem_expr,
@ -53,8 +54,6 @@ class Prometheus():
except Exception as e:
print(e)
return {}
cpu_expr = "sum by (pod) (rate(container_cpu_usage_seconds_total{namespace='%s',pod=~'%s.*',container!='POD'}[1m]))" % (namespace, pod_name)
@ -79,9 +78,33 @@ class Prometheus():
max_cpu = float(metric[1])
except Exception as e:
print(e)
return {}
return {"cpu":round(max_cpu, 2),"memory":round(max_mem,2)}
gpu_expr = "avg by (exported_pod) (DCGM_FI_DEV_GPU_UTIL{exported_namespace='%s',exported_pod=~'%s.*'})" % (namespace, pod_name)
params={
'query': gpu_expr,
'start':(datetime.datetime.now()-datetime.timedelta(days=1)-datetime.timedelta(hours=8)).strftime('%Y-%m-%dT%H:%M:%S.000Z'),
'end':datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.000Z'),
'step':"1m", # 运行小于1分钟的将不会被采集到
# 'timeout':"30s"
}
print(params)
try:
res = requests.get(url=self.query_range_path,params=params)
metrics = json.loads(res.content.decode('utf8', 'ignore'))
if metrics['status']=='success':
metrics=metrics['data']['result']
if metrics:
metrics=metrics[0]['values']
all_util = [float(metric[1]) for metric in metrics]
ave_gpu = sum(all_util)/len(all_util)
except Exception as e:
print(e)
return {"cpu":round(max_cpu, 2),"memory":round(max_mem,2),'gpu':round(ave_gpu,2)}

View File

@ -66,20 +66,14 @@ log = logging.getLogger(__name__)
def get_error_msg():
"""
(inspired on Superset code)
:return: (str)
"""
if current_app.config.get("FAB_API_SHOW_STACKTRACE"):
return traceback.format_exc()
return "Fatal error"
def safe(f):
"""
A decorator that catches uncaught exceptions and
return the response in JSON format (inspired on Superset code)
"""
def wraps(self, *args, **kwargs):
try:
return f(self, *args, **kwargs)
@ -495,11 +489,11 @@ class MyappModelRestApi(ModelRestApi):
return self.response_error(400,message="Request is not JSON")
try:
if self.pre_json_load:
json = self.pre_json_load(request.json)
json_data = self.pre_json_load(request.json)
else:
json = request.json
json_data = request.json
item = self.add_model_schema.load(json)
item = self.add_model_schema.load(json_data)
# item = self.add_model_schema.load(data)
except ValidationError as err:
return self.response_error(422,message=err.messages)
@ -554,10 +548,10 @@ class MyappModelRestApi(ModelRestApi):
return self.response_error(404,message='Not found')
try:
if self.pre_json_load:
json = self.pre_json_load(request.json)
json_data = self.pre_json_load(request.json)
else:
json = request.json
data = self._merge_update_item(item, json)
json_data = request.json
data = self._merge_update_item(item, json_data)
item = self.edit_model_schema.load(data, instance=item)
except ValidationError as err:
return self.response_error(422,message=err.messages)

View File

@ -138,6 +138,7 @@ class Myapp(BaseMyappView):
# /static/appbuilder/mnt/make_pipeline.mp4
message = ''
td_html='<td style="border: 1px solid black;padding: 10px">%s</th>'
message += "<tr>%s %s %s %s %s %s %s<tr>" %(td_html%"集群",td_html%"资源组", td_html%"机器", td_html%"机型", td_html%"cpu占用率", td_html%"内存占用率", td_html%"gpu占用率")
for cluster_name in all_node_json:
nodes = all_node_json[cluster_name]
for ip in nodes:
@ -155,7 +156,7 @@ class Myapp(BaseMyappView):
# print(message)
data = {
'content': message,
'delay': 30000,
'delay': 300000,
'hit': True,
'target': url,
'title': '当前负载',

View File

@ -1,20 +1,4 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=C,R,W
from flask_babel import lazy_gettext as _

View File

@ -1,19 +1,4 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from flask_appbuilder import ModelRestApi
from flask_appbuilder.models.sqla.interface import SQLAInterface

View File

@ -49,8 +49,6 @@ def get_permissions(user):
for perm in role.permissions:
if perm.permission and perm.view_menu:
perms.add((perm.permission.name, perm.view_menu.name))
if perm.permission.name in ("datasource_access", "database_access"):
permissions[perm.permission.name].add(perm.view_menu.name)
roles[role.name] = [
[perm.permission.name, perm.view_menu.name]
for perm in role.permissions