diff --git a/myapp/tools/start.sh b/myapp/tools/start.sh index bd0d126e..bdfbe61b 100644 --- a/myapp/tools/start.sh +++ b/myapp/tools/start.sh @@ -1,7 +1,7 @@ -nohup python watch_workflow.py > workflow.log 2>&1 & +nohup python myapp/tools/watch_workflow.py > workflow.log 2>&1 & -nohup python watch_service.py > service.log 2>&1 & +nohup python myapp/tools/watch_service.py > service.log 2>&1 & tail -f workflow.log service.log diff --git a/myapp/tools/watch_pytorchjob.py b/myapp/tools/watch_pytorchjob.py index ed8e8a61..03811e8c 100644 --- a/myapp/tools/watch_pytorchjob.py +++ b/myapp/tools/watch_pytorchjob.py @@ -1,10 +1,8 @@ - - -import time,datetime, os +import time, datetime, os from kubernetes import client from kubernetes import watch import json -from myapp.utils.py.py_k8s import check_status_time,K8s +from myapp.utils.py.py_k8s import check_status_time, K8s import pysnooper from myapp import app from myapp.models.model_job import ( @@ -12,27 +10,29 @@ from myapp.models.model_job import ( Task ) from myapp.utils.celery import session_scope -from myapp.project import push_admin,push_message +from myapp.project import push_admin, push_message from myapp.models.model_job import Pipeline -conf=app.config + +conf = app.config from myapp.utils.py.py_prometheus import Prometheus -prometheus = Prometheus(conf.get('PROMETHEUS','')) +prometheus = Prometheus(conf.get('PROMETHEUS', '')) -cluster=os.getenv('ENVIRONMENT','').lower() +cluster = os.getenv('ENVIRONMENT', '').lower() if not cluster: - print('no cluster %s'%cluster) + print('no cluster %s' % cluster) exit(1) else: - clusters = conf.get('CLUSTERS',{}) + clusters = conf.get('CLUSTERS', {}) if clusters and cluster in clusters: - kubeconfig = clusters[cluster].get('KUBECONFIG','') + kubeconfig = clusters[cluster].get('KUBECONFIG', '') K8s(kubeconfig) else: print('no kubeconfig in cluster %s' % cluster) exit(1) + # 推送微信消息 # @pysnooper.snoop() def deliver_message(job): @@ -47,12 +47,12 @@ def deliver_message(job): info_json = json.loads(job.info_json) # print(info_json,experiments.status) if job.status in info_json['alert_status'] and job.status not in info_json['has_push']: - receivers=list(set(receivers)) + receivers = list(set(receivers)) # data = { # "Sender": sender, # "Rcptto":receivers, # } - workflow_name = info_json.get('workflow_name','') + workflow_name = info_json.get('workflow_name', '') hp_name = info_json.get('hp_name', '') if workflow_name: message = "pytorchjob: %s \nworkflow: %s \nnamespace: %s\nstatus: %s \ntime: %s" % (job.name,workflow_name,job.namespace,job.status,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) @@ -62,12 +62,13 @@ def deliver_message(job): message = "pytorchjob: %s \nnamespace: %s\nstatus: %s \ntime: %s" % (job.name,job.namespace,job.status,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) if message: - push_message(receivers,message) + push_message(receivers, message) + # @pysnooper.snoop() -def check_has_push(crd,dbsession): +def check_has_push(crd, dbsession): # 可能是workflow启动的或者是hp启动的 - workflow_name = crd['labels'].get('workflow-name','') + workflow_name = crd['labels'].get('workflow-name', '') hp_name = crd['labels'].get('hp-name', '') username = crd['username'] alert_status = '' @@ -79,13 +80,13 @@ def check_has_push(crd,dbsession): print("tf %s from workflow_name %s,user %s,status %s" % (crd['name'],workflow_name,crd['username'],crd['status'])) # print("%s status %s"%(crd['name'], crd['status'])) - alert_status='Pending' # 这里写死,就是相当于必须且仅Pending告警 + alert_status = 'Pending' # 这里写死,就是相当于必须且仅Pending告警 - info_json={ - "workflow_name":workflow_name, - "hp_name":hp_name, + info_json = { + "workflow_name": workflow_name, + "hp_name": hp_name, "alert_status": alert_status, - "has_push":'' + "has_push": '' } # print(crd['name'],crd['namespace']) pytorchjob = dbsession.query(Pytorchjob).filter(Pytorchjob.name==crd['name']).filter(Pytorchjob.namespace==crd['namespace']).first() @@ -93,38 +94,38 @@ def check_has_push(crd,dbsession): print('exist pytorchjob') if pytorchjob.info_json: exist_info_json = json.loads(pytorchjob.info_json) - info_json['has_push']=exist_info_json.get('has_push','') + info_json['has_push'] = exist_info_json.get('has_push', '') pytorchjob.create_time = crd['create_time'] pytorchjob.status = crd['status'] - pytorchjob.annotations = json.dumps(crd['annotations'],indent=4,ensure_ascii=False) - pytorchjob.labels = json.dumps(crd['labels'],indent=4,ensure_ascii=False) - pytorchjob.spec = json.dumps(crd['spec'],indent=4,ensure_ascii=False), - pytorchjob.status_more = json.dumps(crd['status_more'],indent=4,ensure_ascii=False) + pytorchjob.annotations = json.dumps(crd['annotations'], indent=4, ensure_ascii=False) + pytorchjob.labels = json.dumps(crd['labels'], indent=4, ensure_ascii=False) + pytorchjob.spec = json.dumps(crd['spec'], indent=4, ensure_ascii=False), + pytorchjob.status_more = json.dumps(crd['status_more'], indent=4, ensure_ascii=False) pytorchjob.username = crd['username'] - pytorchjob.info_json = json.dumps(info_json,indent=4,ensure_ascii=False) + pytorchjob.info_json = json.dumps(info_json, indent=4, ensure_ascii=False) dbsession.commit() if crd['status'] in info_json['alert_status'] and crd['status'] not in info_json['has_push']: - return False,pytorchjob + return False, pytorchjob else: - return True,pytorchjob + return True, pytorchjob else: print('new pytorchjob') # crd['status_more']={} # crd['spec']={} - pytorchjob = Pytorchjob(name=crd['name'],namespace=crd['namespace'],create_time=crd['create_time'], - status=crd['status'], - annotations=json.dumps(crd['annotations'],indent=4,ensure_ascii=False), - labels=json.dumps(crd['labels'],indent=4,ensure_ascii=False), - spec=json.dumps(crd['spec'],indent=4,ensure_ascii=False), - status_more=json.dumps(crd['status_more'],indent=4,ensure_ascii=False), - username=username, - info_json=json.dumps(info_json,indent=4,ensure_ascii=False)) + pytorchjob = Pytorchjob(name=crd['name'], namespace=crd['namespace'], create_time=crd['create_time'], + status=crd['status'], + annotations=json.dumps(crd['annotations'], indent=4, ensure_ascii=False), + labels=json.dumps(crd['labels'], indent=4, ensure_ascii=False), + spec=json.dumps(crd['spec'], indent=4, ensure_ascii=False), + status_more=json.dumps(crd['status_more'], indent=4, ensure_ascii=False), + username=username, + info_json=json.dumps(info_json, indent=4, ensure_ascii=False)) dbsession.add(pytorchjob) dbsession.commit() - return False,pytorchjob + return False, pytorchjob # @@ -163,12 +164,11 @@ def check_has_push(crd,dbsession): # push_message([task.pipeline.created_by.username],message) - # @pysnooper.snoop() -def save_monitoring(pytorchjob,dbsession): +def save_monitoring(pytorchjob, dbsession): try: - if pytorchjob.status=='Succeeded': - task_id = json.loads(pytorchjob.labels).get('task-id','') + if pytorchjob.status == 'Succeeded': + task_id = json.loads(pytorchjob.labels).get('task-id', '') if task_id: task = dbsession.query(Task).filter_by(id=int(task_id)).first() metrics = prometheus.get_pod_resource_metric(pytorchjob.name, namespace='pipeline') @@ -199,7 +199,7 @@ def save_monitoring(pytorchjob,dbsession): print(monitoring_new) if task: - task.monitoring = json.dumps(monitoring_new,ensure_ascii=False,indent=4) + task.monitoring = json.dumps(monitoring_new, ensure_ascii=False, indent=4) dbsession.commit() # print(pods) @@ -209,9 +209,8 @@ def save_monitoring(pytorchjob,dbsession): print(e) - # @pysnooper.snoop() -def save_history(pytorchjob,dbsession): +def save_history(pytorchjob, dbsession): info_json = json.loads(pytorchjob.info_json) if info_json['has_push']: if not pytorchjob.status in info_json['has_push']: @@ -223,13 +222,13 @@ def save_history(pytorchjob,dbsession): @pysnooper.snoop() -def check_crd_exist(group,version,namespace,plural,name): - exist_crd = client.CustomObjectsApi().get_namespaced_custom_object(group,version,namespace,plural,name) +def check_crd_exist(group, version, namespace, plural, name): + exist_crd = client.CustomObjectsApi().get_namespaced_custom_object(group, version, namespace, plural, name) return exist_crd @pysnooper.snoop() -def deal_event(event,crd_info,namespace): +def deal_event(event, crd_info, namespace): with session_scope(nullpool=True) as dbsession: try: crd_object = event['object'] @@ -265,40 +264,41 @@ def deal_event(event,crd_info,namespace): elif 'upload-rtx' in back_object: back_object['username'] = back_object['labels']['upload-rtx'] - has_push, crd_model = check_has_push(back_object,dbsession) + has_push, crd_model = check_has_push(back_object, dbsession) if not has_push: try: deliver_message(crd_model) except Exception as e1: print('push fail:', e1) push_admin(str(e1)) - save_history(crd_model,dbsession) - save_monitoring(crd_model,dbsession) + save_history(crd_model, dbsession) + save_monitoring(crd_model, dbsession) except Exception as e: print(e) + @pysnooper.snoop() def listen_crd(): crd_info = conf.get('CRD_INFO')['pytorchjob'] namespace = conf.get('PIPELINE_NAMESPACE') w = watch.Watch() print('begin listen') - while(True): + while (True): try: for event in w.stream(client.CustomObjectsApi().list_namespaced_custom_object, group=crd_info['group'], version=crd_info['version'], namespace=namespace, plural=crd_info['plural'], pretty='true'): - if event['type']=='ADDED' or event['type']=='MODIFIED': # ADDED MODIFIED DELETED - deal_event(event,crd_info,namespace) - elif event['type']=='ERROR': + if event['type'] == 'ADDED' or event['type'] == 'MODIFIED': # ADDED MODIFIED DELETED + deal_event(event, crd_info, namespace) + elif event['type'] == 'ERROR': w = watch.Watch() time.sleep(60) except Exception as ee: print(ee) -# 不能使用异步io,因为stream会阻塞 -if __name__=='__main__': - listen_crd() +# 不能使用异步io,因为stream会阻塞 +if __name__ == '__main__': + listen_crd() diff --git a/myapp/tools/watch_service.py b/myapp/tools/watch_service.py index 6c994e9c..45a0c57e 100644 --- a/myapp/tools/watch_service.py +++ b/myapp/tools/watch_service.py @@ -1,5 +1,3 @@ - - import time, os from kubernetes import client from kubernetes import watch @@ -7,16 +5,17 @@ from myapp.utils.py.py_k8s import K8s from myapp.project import push_message from myapp import app from myapp.utils.celery import session_scope -conf=app.config -cluster=os.getenv('ENVIRONMENT','').lower() +conf = app.config + +cluster = os.getenv('ENVIRONMENT', '').lower() if not cluster: - print('no cluster %s'%cluster) + print('no cluster %s' % cluster) exit(1) else: - clusters = conf.get('CLUSTERS',{}) + clusters = conf.get('CLUSTERS', {}) if clusters and cluster in clusters: - kubeconfig = clusters[cluster].get('KUBECONFIG','') + kubeconfig = clusters[cluster].get('KUBECONFIG', '') K8s(kubeconfig) else: print('no kubeconfig in cluster %s' % cluster) @@ -24,11 +23,13 @@ else: from myapp.models.model_serving import InferenceService from datetime import datetime, timezone, timedelta + + # @pysnooper.snoop() def listen_service(): namespace = conf.get('SERVICE_NAMESPACE') w = watch.Watch() - while(True): + while (True): try: print('begin listen') for event in w.stream(client.CoreV1Api().list_namespaced_pod, namespace=namespace,timeout_seconds=60): # label_selector=label, @@ -36,7 +37,7 @@ def listen_service(): try: if event['object'].status and event['object'].status.container_statuses and event["type"]=='MODIFIED': # 容器重启会触发MODIFIED # terminated 终止,waiting 等待启动,running 运行中 - container_statuse= event['object'].status.container_statuses[0].state + container_statuse = event['object'].status.container_statuses[0].state terminated = container_statuse.terminated # waiting = container_statuse.waiting # running = container_statuse.running @@ -62,8 +63,7 @@ def listen_service(): print(ee) time.sleep(5) + # 不能使用异步io,因为stream会阻塞 -if __name__=='__main__': +if __name__ == '__main__': listen_service() - - diff --git a/myapp/tools/watch_tfjob.py b/myapp/tools/watch_tfjob.py index 526346b5..94b0484e 100644 --- a/myapp/tools/watch_tfjob.py +++ b/myapp/tools/watch_tfjob.py @@ -1,10 +1,8 @@ - - -import time,datetime, os +import time, datetime, os from kubernetes import client from kubernetes import watch import json -from myapp.utils.py.py_k8s import check_status_time,K8s +from myapp.utils.py.py_k8s import check_status_time, K8s import pysnooper from myapp import app from myapp.models.model_job import ( @@ -12,27 +10,29 @@ from myapp.models.model_job import ( Task ) from myapp.utils.celery import session_scope -from myapp.project import push_admin,push_message +from myapp.project import push_admin, push_message from myapp.models.model_job import Pipeline -conf=app.config + +conf = app.config from myapp.utils.py.py_prometheus import Prometheus -prometheus = Prometheus(conf.get('PROMETHEUS','')) +prometheus = Prometheus(conf.get('PROMETHEUS', '')) -cluster=os.getenv('ENVIRONMENT','').lower() +cluster = os.getenv('ENVIRONMENT', '').lower() if not cluster: - print('no cluster %s'%cluster) + print('no cluster %s' % cluster) exit(1) else: - clusters = conf.get('CLUSTERS',{}) + clusters = conf.get('CLUSTERS', {}) if clusters and cluster in clusters: - kubeconfig = clusters[cluster].get('KUBECONFIG','') + kubeconfig = clusters[cluster].get('KUBECONFIG', '') K8s(kubeconfig) else: print('no kubeconfig in cluster %s' % cluster) exit(1) + # 推送消息 # @pysnooper.snoop() def deliver_message(tfjob): @@ -47,12 +47,12 @@ def deliver_message(tfjob): info_json = json.loads(tfjob.info_json) # print(info_json,experiments.status) if tfjob.status in info_json['alert_status'] and tfjob.status not in info_json['has_push']: - receivers=list(set(receivers)) + receivers = list(set(receivers)) # data = { # "Sender": sender, # "Rcptto":receivers, # } - workflow_name = info_json.get('workflow_name','') + workflow_name = info_json.get('workflow_name', '') hp_name = info_json.get('hp_name', '') if workflow_name: message = "tfjob: %s \nworkflow: %s \nnamespace: %s\nstatus: %s \ntime: %s" % (tfjob.name,workflow_name,tfjob.namespace,tfjob.status,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) @@ -62,13 +62,13 @@ def deliver_message(tfjob): message = "tfjob: %s \nnamespace: %s\nstatus: %s \ntime: %s" % (tfjob.name,tfjob.namespace,tfjob.status,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) if message: - push_message(receivers,message) + push_message(receivers, message) # @pysnooper.snoop() -def check_has_push(crd,dbsession): +def check_has_push(crd, dbsession): # 可能是workflow启动的或者是hp启动的 - workflow_name = crd['labels'].get('workflow-name','') + workflow_name = crd['labels'].get('workflow-name', '') hp_name = crd['labels'].get('hp-name', '') username = crd['username'] alert_status = '' @@ -80,52 +80,52 @@ def check_has_push(crd,dbsession): print("tf %s from workflow_name %s,user %s,status %s" % (crd['name'],workflow_name,crd['username'],crd['status'])) # print("%s status %s"%(crd['name'], crd['status'])) - alert_status='Pending' # 这里写死,就是相当于必须且仅Pending告警 + alert_status = 'Pending' # 这里写死,就是相当于必须且仅Pending告警 - info_json={ - "workflow_name":workflow_name, - "hp_name":hp_name, + info_json = { + "workflow_name": workflow_name, + "hp_name": hp_name, "alert_status": alert_status, - "has_push":'' + "has_push": '' } # print(crd['name'],crd['namespace']) - tfjob = dbsession.query(Tfjob).filter(Tfjob.name==crd['name']).filter(Tfjob.namespace==crd['namespace']).first() + tfjob = dbsession.query(Tfjob).filter(Tfjob.name == crd['name']).filter(Tfjob.namespace == crd['namespace']).first() if tfjob: print('exist tfjob') if tfjob.info_json: exist_info_json = json.loads(tfjob.info_json) - info_json['has_push']=exist_info_json.get('has_push','') + info_json['has_push'] = exist_info_json.get('has_push', '') tfjob.create_time = crd['create_time'] tfjob.status = crd['status'] - tfjob.annotations = json.dumps(crd['annotations'],indent=4,ensure_ascii=False) - tfjob.labels = json.dumps(crd['labels'],indent=4,ensure_ascii=False) - tfjob.spec = json.dumps(crd['spec'],indent=4,ensure_ascii=False), - tfjob.status_more = json.dumps(crd['status_more'],indent=4,ensure_ascii=False) + tfjob.annotations = json.dumps(crd['annotations'], indent=4, ensure_ascii=False) + tfjob.labels = json.dumps(crd['labels'], indent=4, ensure_ascii=False) + tfjob.spec = json.dumps(crd['spec'], indent=4, ensure_ascii=False), + tfjob.status_more = json.dumps(crd['status_more'], indent=4, ensure_ascii=False) tfjob.username = crd['username'] - tfjob.info_json = json.dumps(info_json,indent=4,ensure_ascii=False) + tfjob.info_json = json.dumps(info_json, indent=4, ensure_ascii=False) dbsession.commit() if crd['status'] in info_json['alert_status'] and crd['status'] not in info_json['has_push']: - return False,tfjob + return False, tfjob else: - return True,tfjob + return True, tfjob else: print('new tfjob') # crd['status_more']={} # crd['spec']={} - tfjob = Tfjob(name=crd['name'],namespace=crd['namespace'],create_time=crd['create_time'], - status=crd['status'], - annotations=json.dumps(crd['annotations'],indent=4,ensure_ascii=False), - labels=json.dumps(crd['labels'],indent=4,ensure_ascii=False), - spec=json.dumps(crd['spec'],indent=4,ensure_ascii=False), - status_more=json.dumps(crd['status_more'],indent=4,ensure_ascii=False), - username=username, - info_json=json.dumps(info_json,indent=4,ensure_ascii=False)) + tfjob = Tfjob(name=crd['name'], namespace=crd['namespace'], create_time=crd['create_time'], + status=crd['status'], + annotations=json.dumps(crd['annotations'], indent=4, ensure_ascii=False), + labels=json.dumps(crd['labels'], indent=4, ensure_ascii=False), + spec=json.dumps(crd['spec'], indent=4, ensure_ascii=False), + status_more=json.dumps(crd['status_more'], indent=4, ensure_ascii=False), + username=username, + info_json=json.dumps(info_json, indent=4, ensure_ascii=False)) dbsession.add(tfjob) dbsession.commit() - return False,tfjob + return False, tfjob # @@ -164,12 +164,11 @@ def check_has_push(crd,dbsession): # push_message([task.pipeline.created_by.username],message) - # @pysnooper.snoop() -def save_monitoring(tfjob,dbsession): +def save_monitoring(tfjob, dbsession): try: - if tfjob.status=='Succeeded': - task_id = json.loads(tfjob.labels).get('task-id','') + if tfjob.status == 'Succeeded': + task_id = json.loads(tfjob.labels).get('task-id', '') if task_id: task = dbsession.query(Task).filter_by(id=int(task_id)).first() metrics = prometheus.get_pod_resource_metric(tfjob.name, namespace='pipeline') @@ -200,7 +199,7 @@ def save_monitoring(tfjob,dbsession): print(monitoring_new) if task: - task.monitoring = json.dumps(monitoring_new,ensure_ascii=False,indent=4) + task.monitoring = json.dumps(monitoring_new, ensure_ascii=False, indent=4) dbsession.commit() # print(pods) @@ -210,9 +209,8 @@ def save_monitoring(tfjob,dbsession): print(e) - # @pysnooper.snoop() -def save_history(tfjob,dbsession): +def save_history(tfjob, dbsession): info_json = json.loads(tfjob.info_json) if info_json['has_push']: if not tfjob.status in info_json['has_push']: @@ -224,13 +222,13 @@ def save_history(tfjob,dbsession): # @pysnooper.snoop() -def check_crd_exist(group,version,namespace,plural,name): - exist_crd = client.CustomObjectsApi().get_namespaced_custom_object(group,version,namespace,plural,name) +def check_crd_exist(group, version, namespace, plural, name): + exist_crd = client.CustomObjectsApi().get_namespaced_custom_object(group, version, namespace, plural, name) return exist_crd @pysnooper.snoop() -def deal_event(event,crd_info,namespace): +def deal_event(event, crd_info, namespace): with session_scope(nullpool=True) as dbsession: try: crd_object = event['object'] @@ -266,40 +264,41 @@ def deal_event(event,crd_info,namespace): elif 'upload-rtx' in back_object: back_object['username'] = back_object['labels']['upload-rtx'] - has_push, crd_model = check_has_push(back_object,dbsession) + has_push, crd_model = check_has_push(back_object, dbsession) if not has_push: try: deliver_message(crd_model) except Exception as e1: print('push fail:', e1) push_admin(str(e1)) - save_history(crd_model,dbsession) - save_monitoring(crd_model,dbsession) + save_history(crd_model, dbsession) + save_monitoring(crd_model, dbsession) except Exception as e: print(e) + @pysnooper.snoop() def listen_crd(): crd_info = conf.get('CRD_INFO')['tfjob'] namespace = conf.get('PIPELINE_NAMESPACE') w = watch.Watch() print('begin listen') - while(True): + while (True): try: for event in w.stream(client.CustomObjectsApi().list_namespaced_custom_object, group=crd_info['group'], version=crd_info['version'], namespace=namespace, plural=crd_info['plural'], pretty='true'): - if event['type']=='ADDED' or event['type']=='MODIFIED': # ADDED MODIFIED DELETED - deal_event(event,crd_info,namespace) - elif event['type']=='ERROR': + if event['type'] == 'ADDED' or event['type'] == 'MODIFIED': # ADDED MODIFIED DELETED + deal_event(event, crd_info, namespace) + elif event['type'] == 'ERROR': w = watch.Watch() time.sleep(60) except Exception as ee: print(ee) -# 不能使用异步io,因为stream会阻塞 -if __name__=='__main__': - listen_crd() +# 不能使用异步io,因为stream会阻塞 +if __name__ == '__main__': + listen_crd() diff --git a/myapp/tools/watch_workflow.py b/myapp/tools/watch_workflow.py index 4a311e99..aa6a0cfb 100644 --- a/myapp/tools/watch_workflow.py +++ b/myapp/tools/watch_workflow.py @@ -1,11 +1,10 @@ - import pysnooper -import time,datetime, os +import time, datetime, os from kubernetes import client from kubernetes import watch import json import math -from myapp.utils.py.py_k8s import check_status_time,K8s +from myapp.utils.py.py_k8s import check_status_time, K8s from myapp.utils.py.py_prometheus import Prometheus from myapp.project import push_message from myapp import app @@ -17,26 +16,28 @@ from myapp.models.model_job import ( ) from myapp.utils.celery import session_scope -conf=app.config -prometheus = Prometheus(conf.get('PROMETHEUS','')) -cluster=os.getenv('ENVIRONMENT','').lower() +conf = app.config +prometheus = Prometheus(conf.get('PROMETHEUS', '')) + +cluster = os.getenv('ENVIRONMENT', '').lower() if not cluster: - print('no cluster %s'%cluster) + print('no cluster %s' % cluster) exit(1) else: - clusters = conf.get('CLUSTERS',{}) + clusters = conf.get('CLUSTERS', {}) if clusters and cluster in clusters: - kubeconfig = clusters[cluster].get('KUBECONFIG','') + kubeconfig = clusters[cluster].get('KUBECONFIG', '') K8s(kubeconfig) # k8s_config.kube_config.load_kube_config(config_file=kubeconfig) else: print('no kubeconfig in cluster %s' % cluster) exit(1) + # 推送微信消息 # @pysnooper.snoop() -def deliver_message(workflow,dbsession): +def deliver_message(workflow, dbsession): if not workflow: return @@ -46,9 +47,9 @@ def deliver_message(workflow,dbsession): pipeline_id = json.loads(workflow.labels).get("pipeline-id", '') if pipeline_id and int(pipeline_id) > 0: pipeline = dbsession.query(Pipeline).filter_by(id=int(pipeline_id)).first() - alert_user=pipeline.alert_user.split(',') if pipeline.alert_user else [] - alert_user=[user.strip() for user in alert_user if user.strip()] - receivers+=alert_user + alert_user = pipeline.alert_user.split(',') if pipeline.alert_user else [] + alert_user = [user.strip() for user in alert_user if user.strip()] + receivers += alert_user if not receivers: print('no receivers') @@ -57,7 +58,7 @@ def deliver_message(workflow,dbsession): info_json = json.loads(workflow.info_json) # print(info_json,workflow.status) if workflow.status in info_json['alert_status'] and workflow.status not in info_json['has_push']: - receivers=list(set(receivers)) + receivers = list(set(receivers)) # data = { # "Sender": sender, # "Rcptto":receivers, @@ -78,11 +79,12 @@ def deliver_message(workflow,dbsession): "pod详情":help_url } if message: - push_message(receivers,message,link) + push_message(receivers, message, link) + # 保存workflow记录 # @pysnooper.snoop() -def save_workflow(crd,dbsession): +def save_workflow(crd, dbsession): pipeline_id = crd['labels'].get('pipeline-id', '') pipeline = dbsession.query(Pipeline).filter_by(id=int(pipeline_id)).first() if not pipeline: @@ -106,7 +108,7 @@ def save_workflow(crd,dbsession): workflow.labels = json.dumps(crd['labels'], indent=4, ensure_ascii=False) workflow.spec = json.dumps(crd['spec'], indent=4, ensure_ascii=False), workflow.status_more = json.dumps(crd['status_more'], indent=4, ensure_ascii=False) - workflow.cluster=cluster + workflow.cluster = cluster dbsession.commit() else: @@ -118,7 +120,7 @@ def save_workflow(crd,dbsession): "has_push": '' } print('new workflow') - workflow = Workflow(name=crd['name'], cluster=cluster,namespace=crd['namespace'], create_time=crd['create_time'], + workflow = Workflow(name=crd['name'], cluster=cluster, namespace=crd['namespace'], create_time=crd['create_time'], status=crd['status'], annotations=json.dumps(crd['annotations'], indent=4, ensure_ascii=False), labels=json.dumps(crd['labels'], indent=4, ensure_ascii=False), @@ -130,16 +132,17 @@ def save_workflow(crd,dbsession): dbsession.commit() # 更新runhistory - pipeline_run_id = json.loads(workflow.labels).get("run-id",'') + pipeline_run_id = json.loads(workflow.labels).get("run-id", '') if pipeline_run_id: run_history = dbsession.query(RunHistory).filter_by(run_id=pipeline_run_id).first() if run_history: - run_history.status=crd['status'] + run_history.status = crd['status'] dbsession.commit() return workflow + # @pysnooper.snoop() -def check_has_push(crd,dbsession): +def check_has_push(crd, dbsession): workflow = dbsession.query(Workflow).filter(Workflow.name == crd['name']).filter(Workflow.namespace == crd['namespace']).first() if workflow and workflow.info_json: info_json = json.loads(workflow.info_json) @@ -152,55 +155,55 @@ def check_has_push(crd,dbsession): # 推送修改通知 # @pysnooper.snoop() -def push_resource_rec(workflow,dbsession): +def push_resource_rec(workflow, dbsession): pipeline_id = json.loads(workflow.labels).get('pipeline-id', '') pipeline = dbsession.query(Pipeline).filter_by(id=int(pipeline_id)).first() if pipeline: - init_message = 'pipeline(%s)根据近10次的任务训练资源使用情况,系统做如下调整:\n'%pipeline.describe + init_message = 'pipeline(%s)根据近10次的任务训练资源使用情况,系统做如下调整:\n' % pipeline.describe message = init_message tasks = dbsession.query(Task).filter(Task.pipeline_id == int(pipeline_id)).all() for task in tasks: - if 'NO_RESOURCE_CHECK' not in task.job_template.env.replace("-","_").upper(): - task_monitorings = json.loads(task.monitoring).get('task',[]) - if len(task_monitorings)>9: + if 'NO_RESOURCE_CHECK' not in task.job_template.env.replace("-", "_").upper(): + task_monitorings = json.loads(task.monitoring).get('task', []) + if len(task_monitorings) > 9: max_cpu = 0 - max_memory=0 + max_memory = 0 for task_monitoring in task_monitorings: - if float(task_monitoring.get('cpu',0))>max_cpu: - max_cpu = float(task_monitoring.get('cpu',0)) + if float(task_monitoring.get('cpu', 0)) > max_cpu: + max_cpu = float(task_monitoring.get('cpu', 0)) if float(task_monitoring.get('memory', 0)) > max_memory: max_memory = float(task_monitoring.get('memory', 0)) if max_cpu: - rec_cpu = math.ceil(max_cpu*1.4)+2 - if rec_cpu>150: - rec_cpu=150 - if rec_cpu!=int(task.resource_cpu): - message += "task(%s),原申请cpu:%s,近10次最大使用cpu:%s,新申请值:%s\n" % (task.label,task.resource_cpu, max_cpu, rec_cpu) + rec_cpu = math.ceil(max_cpu * 1.4) + 2 + if rec_cpu > 150: + rec_cpu = 150 + if rec_cpu != int(task.resource_cpu): + message += "task(%s),原申请cpu:%s,近10次最大使用cpu:%s,新申请值:%s\n" % (task.label, task.resource_cpu, max_cpu, rec_cpu) task.resource_cpu = str(rec_cpu) if max_memory: - rec_memory = math.ceil(max_memory*1.4)+2 - if rec_memory>350: - rec_memory=350 - if rec_memory!=int(task.resource_memory.replace('G','').replace('M','')): - message += "task(%s),原申请mem:%s,近10次最大使用mem:%s(G),新申请值:%s\n" % (task.label,task.resource_memory, max_memory, str(rec_memory)+"G") - task.resource_memory = str(rec_memory)+"G" + rec_memory = math.ceil(max_memory * 1.4) + 2 + if rec_memory > 350: + rec_memory = 350 + if rec_memory != int(task.resource_memory.replace('G', '').replace('M', '')): + message += "task(%s),原申请mem:%s,近10次最大使用mem:%s(G),新申请值:%s\n" % (task.label, task.resource_memory, max_memory, str(rec_memory) + "G") + task.resource_memory = str(rec_memory) + "G" dbsession.commit() - if message!=init_message: + if message != init_message: alert_user = pipeline.alert_user.split(',') if pipeline.alert_user else [] alert_user = [user.strip() for user in alert_user if user.strip()] - receivers = alert_user+[pipeline.created_by.username] - receivers=list(set(receivers)) + receivers = alert_user + [pipeline.created_by.username] + receivers = list(set(receivers)) - push_message(receivers,message) + push_message(receivers, message) # 推送训练耗时通知 # @pysnooper.snoop() -def push_task_time(workflow,dbsession): +def push_task_time(workflow, dbsession): if not workflow: return - nodes = json.loads(workflow.status_more).get('nodes',{}) + nodes = json.loads(workflow.status_more).get('nodes', {}) pods = {} for node_name in nodes: if nodes[node_name]['type'] == 'Pod' and nodes[node_name]['phase'] == 'Succeeded': @@ -209,8 +212,8 @@ def push_task_time(workflow,dbsession): if pipeline_id and pods: pipeline = dbsession.query(Pipeline).filter_by(id=pipeline_id).first() if pipeline: - message='\n%s %s,各task耗时,酌情优化:\n'%(pipeline.describe,pipeline.created_by.username) - task_pod_time={} + message = '\n%s %s,各task耗时,酌情优化:\n' % (pipeline.describe, pipeline.created_by.username) + task_pod_time = {} for pod_name in pods: # print(pods[pod_name]) task_name = pods[pod_name]['displayName'] @@ -222,65 +225,64 @@ def push_task_time(workflow,dbsession): if startAt in task_pod_time and task_pod_time[startAt]: task_pod_time[startAt].append( { - "task":task.label, - "run_time":str(run_time) + "task": task.label, + "run_time": str(run_time) } ) else: - task_pod_time[startAt]=[ + task_pod_time[startAt] = [ { "task": task.label, "run_time": str(run_time) } ] - task_pod_time_sorted = sorted(task_pod_time.items(),key=lambda item:item[0]) + task_pod_time_sorted = sorted(task_pod_time.items(), key=lambda item: item[0]) max_task_run_time = 0 for task_pods in task_pod_time_sorted: for task_pod in task_pods[1]: - message+=task_pod['task']+":"+task_pod['run_time']+"(h)\n" + message += task_pod['task'] + ":" + task_pod['run_time'] + "(h)\n" try: - if float(task_pod['run_time'])>max_task_run_time: + if float(task_pod['run_time']) > max_task_run_time: max_task_run_time = float(task_pod['run_time']) except Exception as e: print(e) # 记录是否已经推送,不然反复推送不好 info_json = json.loads(workflow.info_json) - if info_json.get('push_task_time',''): + if info_json.get('push_task_time', ''): pass else: info_json['push_task_time'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') workflow.info_json = json.dumps(info_json, indent=4, ensure_ascii=False) dbsession.commit() - message+="\n" + message += "\n" link = { - "点击查看资源的使用": "http://%s/pipeline_modelview/web/monitoring/%s"%(conf.get('HOST'),pipeline_id) + "点击查看资源的使用": "http://%s/pipeline_modelview/web/monitoring/%s" % (conf.get('HOST'), pipeline_id) } # 有单任务运行时长超过4个小时才通知 - if max_task_run_time>4: - push_message(conf.get('ADMIN_USER').split(','),message,link) + if max_task_run_time > 4: + push_message(conf.get('ADMIN_USER').split(','), message, link) alert_user = pipeline.alert_user.split(',') if pipeline.alert_user else [] alert_user = [user.strip() for user in alert_user if user.strip()] receivers = alert_user + [workflow.username] receivers = list(set(receivers)) - push_message(receivers,message,link) - + push_message(receivers, message, link) # @pysnooper.snoop() -def save_monitoring(workflow,dbsession): +def save_monitoring(workflow, dbsession): try: - if workflow.status=='Succeeded': + if workflow.status == 'Succeeded': # 获取下面的所有pod - nodes = json.loads(workflow.status_more).get('nodes',{}) - pods={} + nodes = json.loads(workflow.status_more).get('nodes', {}) + pods = {} for node_name in nodes: - if nodes[node_name]['type']=='Pod' and nodes[node_name]['phase']=='Succeeded': - pods[node_name]=nodes[node_name] - pipeline_id = json.loads(workflow.labels).get('pipeline-id','') + if nodes[node_name]['type'] == 'Pod' and nodes[node_name]['phase'] == 'Succeeded': + pods[node_name] = nodes[node_name] + pipeline_id = json.loads(workflow.labels).get('pipeline-id', '') if pipeline_id and pods: for pod_name in pods: print(pods[pod_name]) @@ -290,13 +292,13 @@ def save_monitoring(workflow,dbsession): task = dbsession.query(Task).filter(Task.pipeline_id == int(pipeline_id)).filter(Task.name == task_name).first() metrics = prometheus.get_pod_resource_metric(pod_name, namespace='pipeline') monitoring = json.loads(task.monitoring) if task and task.monitoring else {} - task_monitoring = monitoring.get('task',[]) + task_monitoring = monitoring.get('task', []) if metrics: task_monitoring.append({ - "cpu":metrics.get('cpu', ''), - "memory":metrics.get('memory', ''), - "pod_name":pod_name, - "update_time":datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + "cpu": metrics.get('cpu', ''), + "memory": metrics.get('memory', ''), + "pod_name": pod_name, + "update_time": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) # 清理监控记录 @@ -306,16 +308,16 @@ def save_monitoring(workflow,dbsession): if float(metric.get('cpu',0))>0.1 and float(metric.get('memory',0))>0.1 and metric['update_time']>(datetime.datetime.now()-datetime.timedelta(days=30)).strftime('%Y-%m-%d %H:%M:%S'): task_monitoring_new.append(metric) - if len(task_monitoring_new)>10: + if len(task_monitoring_new) > 10: del task_monitoring_new[0] - monitoring_new={} - monitoring_new['task']= task_monitoring_new - monitoring_new['tfjob'] = monitoring.get('tfjob',[]) + monitoring_new = {} + monitoring_new['task'] = task_monitoring_new + monitoring_new['tfjob'] = monitoring.get('tfjob', []) print(monitoring_new) if task: - task.monitoring = json.dumps(monitoring_new,ensure_ascii=False,indent=4) + task.monitoring = json.dumps(monitoring_new, ensure_ascii=False, indent=4) dbsession.commit() push_task_time(workflow, dbsession) @@ -326,9 +328,8 @@ def save_monitoring(workflow,dbsession): print(e) - # @pysnooper.snoop() -def save_history(workflow,dbsession): +def save_history(workflow, dbsession): info_json = json.loads(workflow.info_json) if info_json['has_push']: if not workflow.status in info_json['has_push']: @@ -338,17 +339,15 @@ def save_history(workflow,dbsession): workflow.info_json = json.dumps(info_json, indent=4, ensure_ascii=False) dbsession.commit() + # @pysnooper.snoop() -def check_crd_exist(group,version,namespace,plural,name): - exist_crd = client.CustomObjectsApi().get_namespaced_custom_object(group,version,namespace,plural,name) +def check_crd_exist(group, version, namespace, plural, name): + exist_crd = client.CustomObjectsApi().get_namespaced_custom_object(group, version, namespace, plural, name) return exist_crd - - - # @pysnooper.snoop() -def deal_event(event,workflow_info,namespace): +def deal_event(event, workflow_info, namespace): with session_scope(nullpool=True) as dbsession: try: crd_object = event['object'] @@ -359,14 +358,13 @@ def deal_event(event,workflow_info,namespace): creat_time = crd_object['metadata']['creationTimestamp'].replace('T', ' ').replace('Z', '') creat_time = (datetime.datetime.strptime(creat_time,'%Y-%m-%d %H:%M:%S')+datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S') # 不能直接使用里面的状态 - status='' + status = '' if 'status' in crd_object and 'nodes' in crd_object['status']: keys = list(crd_object['status']['nodes'].keys()) status = crd_object['status']['nodes'][keys[-1]]['phase'] if status != 'Pending': status = crd_object['status']['phase'] - back_object = { "name": crd_object['metadata']['name'], "namespace": crd_object['metadata']['namespace'] if 'namespace' in crd_object['metadata'] else '', @@ -382,43 +380,43 @@ def deal_event(event,workflow_info,namespace): back_object['username'] = back_object['labels']['run-rtx'] elif 'upload-rtx' in back_object: back_object['username'] = back_object['labels']['upload-rtx'] - workflow = save_workflow(back_object,dbsession) + workflow = save_workflow(back_object, dbsession) if workflow: - has_push = check_has_push(back_object,dbsession) + has_push = check_has_push(back_object, dbsession) if not has_push: try: - deliver_message(workflow,dbsession) + deliver_message(workflow, dbsession) except Exception as e1: print('push fail:', e1) - push_message(conf.get('ADMIN_USER').split(','),'push fail'+str(e1)) - save_history(workflow,dbsession) + push_message(conf.get('ADMIN_USER').split(','), 'push fail' + str(e1)) + save_history(workflow, dbsession) save_monitoring(workflow, dbsession) except Exception as e: print(e) + # @pysnooper.snoop() def listen_workflow(): workflow_info = conf.get('CRD_INFO')['workflow'] - namespace = conf.get('PIPELINE_NAMESPACE') # 不仅这一个命名空间 + namespace = conf.get('PIPELINE_NAMESPACE') # 不仅这一个命名空间 w = watch.Watch() print('begin listen') - while(True): + while (True): try: for event in w.stream(client.CustomObjectsApi().list_namespaced_custom_object, group=workflow_info['group'], version=workflow_info["version"], namespace=namespace, plural=workflow_info["plural"]): # label_selector=label, - if event['type']=='ADDED' or event['type']=='MODIFIED': # ADDED MODIFIED DELETED - deal_event(event,workflow_info,namespace) - elif event['type']=='ERROR': + if event['type'] == 'ADDED' or event['type'] == 'MODIFIED': # ADDED MODIFIED DELETED + deal_event(event, workflow_info, namespace) + elif event['type'] == 'ERROR': w = watch.Watch() time.sleep(60) except Exception as ee: print(ee) + # 不能使用异步io,因为stream会阻塞 -if __name__=='__main__': +if __name__ == '__main__': listen_workflow() - -