代码风格改进

This commit is contained in:
chendile 2023-09-03 19:34:26 +08:00
parent 5460502916
commit a00129e6bf
5 changed files with 229 additions and 232 deletions

View File

@ -1,7 +1,7 @@
nohup python watch_workflow.py > workflow.log 2>&1 & nohup python myapp/tools/watch_workflow.py > workflow.log 2>&1 &
nohup python watch_service.py > service.log 2>&1 & nohup python myapp/tools/watch_service.py > service.log 2>&1 &
tail -f workflow.log service.log tail -f workflow.log service.log

View File

@ -1,10 +1,8 @@
import time, datetime, os
import time,datetime, os
from kubernetes import client from kubernetes import client
from kubernetes import watch from kubernetes import watch
import json import json
from myapp.utils.py.py_k8s import check_status_time,K8s from myapp.utils.py.py_k8s import check_status_time, K8s
import pysnooper import pysnooper
from myapp import app from myapp import app
from myapp.models.model_job import ( from myapp.models.model_job import (
@ -12,27 +10,29 @@ from myapp.models.model_job import (
Task Task
) )
from myapp.utils.celery import session_scope from myapp.utils.celery import session_scope
from myapp.project import push_admin,push_message from myapp.project import push_admin, push_message
from myapp.models.model_job import Pipeline from myapp.models.model_job import Pipeline
conf=app.config
conf = app.config
from myapp.utils.py.py_prometheus import Prometheus from myapp.utils.py.py_prometheus import Prometheus
prometheus = Prometheus(conf.get('PROMETHEUS','')) prometheus = Prometheus(conf.get('PROMETHEUS', ''))
cluster=os.getenv('ENVIRONMENT','').lower() cluster = os.getenv('ENVIRONMENT', '').lower()
if not cluster: if not cluster:
print('no cluster %s'%cluster) print('no cluster %s' % cluster)
exit(1) exit(1)
else: else:
clusters = conf.get('CLUSTERS',{}) clusters = conf.get('CLUSTERS', {})
if clusters and cluster in clusters: if clusters and cluster in clusters:
kubeconfig = clusters[cluster].get('KUBECONFIG','') kubeconfig = clusters[cluster].get('KUBECONFIG', '')
K8s(kubeconfig) K8s(kubeconfig)
else: else:
print('no kubeconfig in cluster %s' % cluster) print('no kubeconfig in cluster %s' % cluster)
exit(1) exit(1)
# 推送微信消息 # 推送微信消息
# @pysnooper.snoop() # @pysnooper.snoop()
def deliver_message(job): def deliver_message(job):
@ -47,12 +47,12 @@ def deliver_message(job):
info_json = json.loads(job.info_json) info_json = json.loads(job.info_json)
# print(info_json,experiments.status) # print(info_json,experiments.status)
if job.status in info_json['alert_status'] and job.status not in info_json['has_push']: if job.status in info_json['alert_status'] and job.status not in info_json['has_push']:
receivers=list(set(receivers)) receivers = list(set(receivers))
# data = { # data = {
# "Sender": sender, # "Sender": sender,
# "Rcptto":receivers, # "Rcptto":receivers,
# } # }
workflow_name = info_json.get('workflow_name','') workflow_name = info_json.get('workflow_name', '')
hp_name = info_json.get('hp_name', '') hp_name = info_json.get('hp_name', '')
if workflow_name: if workflow_name:
message = "pytorchjob: %s \nworkflow: %s \nnamespace: %s\nstatus: %s \ntime: %s" % (job.name,workflow_name,job.namespace,job.status,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) message = "pytorchjob: %s \nworkflow: %s \nnamespace: %s\nstatus: %s \ntime: %s" % (job.name,workflow_name,job.namespace,job.status,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
@ -62,12 +62,13 @@ def deliver_message(job):
message = "pytorchjob: %s \nnamespace: %s\nstatus: %s \ntime: %s" % (job.name,job.namespace,job.status,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) message = "pytorchjob: %s \nnamespace: %s\nstatus: %s \ntime: %s" % (job.name,job.namespace,job.status,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
if message: if message:
push_message(receivers,message) push_message(receivers, message)
# @pysnooper.snoop() # @pysnooper.snoop()
def check_has_push(crd,dbsession): def check_has_push(crd, dbsession):
# 可能是workflow启动的或者是hp启动的 # 可能是workflow启动的或者是hp启动的
workflow_name = crd['labels'].get('workflow-name','') workflow_name = crd['labels'].get('workflow-name', '')
hp_name = crd['labels'].get('hp-name', '') hp_name = crd['labels'].get('hp-name', '')
username = crd['username'] username = crd['username']
alert_status = '' alert_status = ''
@ -79,13 +80,13 @@ def check_has_push(crd,dbsession):
print("tf %s from workflow_name %s,user %s,status %s" % (crd['name'],workflow_name,crd['username'],crd['status'])) print("tf %s from workflow_name %s,user %s,status %s" % (crd['name'],workflow_name,crd['username'],crd['status']))
# print("%s status %s"%(crd['name'], crd['status'])) # print("%s status %s"%(crd['name'], crd['status']))
alert_status='Pending' # 这里写死就是相当于必须且仅Pending告警 alert_status = 'Pending' # 这里写死就是相当于必须且仅Pending告警
info_json={ info_json = {
"workflow_name":workflow_name, "workflow_name": workflow_name,
"hp_name":hp_name, "hp_name": hp_name,
"alert_status": alert_status, "alert_status": alert_status,
"has_push":'' "has_push": ''
} }
# print(crd['name'],crd['namespace']) # print(crd['name'],crd['namespace'])
pytorchjob = dbsession.query(Pytorchjob).filter(Pytorchjob.name==crd['name']).filter(Pytorchjob.namespace==crd['namespace']).first() pytorchjob = dbsession.query(Pytorchjob).filter(Pytorchjob.name==crd['name']).filter(Pytorchjob.namespace==crd['namespace']).first()
@ -93,38 +94,38 @@ def check_has_push(crd,dbsession):
print('exist pytorchjob') print('exist pytorchjob')
if pytorchjob.info_json: if pytorchjob.info_json:
exist_info_json = json.loads(pytorchjob.info_json) exist_info_json = json.loads(pytorchjob.info_json)
info_json['has_push']=exist_info_json.get('has_push','') info_json['has_push'] = exist_info_json.get('has_push', '')
pytorchjob.create_time = crd['create_time'] pytorchjob.create_time = crd['create_time']
pytorchjob.status = crd['status'] pytorchjob.status = crd['status']
pytorchjob.annotations = json.dumps(crd['annotations'],indent=4,ensure_ascii=False) pytorchjob.annotations = json.dumps(crd['annotations'], indent=4, ensure_ascii=False)
pytorchjob.labels = json.dumps(crd['labels'],indent=4,ensure_ascii=False) pytorchjob.labels = json.dumps(crd['labels'], indent=4, ensure_ascii=False)
pytorchjob.spec = json.dumps(crd['spec'],indent=4,ensure_ascii=False), pytorchjob.spec = json.dumps(crd['spec'], indent=4, ensure_ascii=False),
pytorchjob.status_more = json.dumps(crd['status_more'],indent=4,ensure_ascii=False) pytorchjob.status_more = json.dumps(crd['status_more'], indent=4, ensure_ascii=False)
pytorchjob.username = crd['username'] pytorchjob.username = crd['username']
pytorchjob.info_json = json.dumps(info_json,indent=4,ensure_ascii=False) pytorchjob.info_json = json.dumps(info_json, indent=4, ensure_ascii=False)
dbsession.commit() dbsession.commit()
if crd['status'] in info_json['alert_status'] and crd['status'] not in info_json['has_push']: if crd['status'] in info_json['alert_status'] and crd['status'] not in info_json['has_push']:
return False,pytorchjob return False, pytorchjob
else: else:
return True,pytorchjob return True, pytorchjob
else: else:
print('new pytorchjob') print('new pytorchjob')
# crd['status_more']={} # crd['status_more']={}
# crd['spec']={} # crd['spec']={}
pytorchjob = Pytorchjob(name=crd['name'],namespace=crd['namespace'],create_time=crd['create_time'], pytorchjob = Pytorchjob(name=crd['name'], namespace=crd['namespace'], create_time=crd['create_time'],
status=crd['status'], status=crd['status'],
annotations=json.dumps(crd['annotations'],indent=4,ensure_ascii=False), annotations=json.dumps(crd['annotations'], indent=4, ensure_ascii=False),
labels=json.dumps(crd['labels'],indent=4,ensure_ascii=False), labels=json.dumps(crd['labels'], indent=4, ensure_ascii=False),
spec=json.dumps(crd['spec'],indent=4,ensure_ascii=False), spec=json.dumps(crd['spec'], indent=4, ensure_ascii=False),
status_more=json.dumps(crd['status_more'],indent=4,ensure_ascii=False), status_more=json.dumps(crd['status_more'], indent=4, ensure_ascii=False),
username=username, username=username,
info_json=json.dumps(info_json,indent=4,ensure_ascii=False)) info_json=json.dumps(info_json, indent=4, ensure_ascii=False))
dbsession.add(pytorchjob) dbsession.add(pytorchjob)
dbsession.commit() dbsession.commit()
return False,pytorchjob return False, pytorchjob
# #
@ -163,12 +164,11 @@ def check_has_push(crd,dbsession):
# push_message([task.pipeline.created_by.username],message) # push_message([task.pipeline.created_by.username],message)
# @pysnooper.snoop() # @pysnooper.snoop()
def save_monitoring(pytorchjob,dbsession): def save_monitoring(pytorchjob, dbsession):
try: try:
if pytorchjob.status=='Succeeded': if pytorchjob.status == 'Succeeded':
task_id = json.loads(pytorchjob.labels).get('task-id','') task_id = json.loads(pytorchjob.labels).get('task-id', '')
if task_id: if task_id:
task = dbsession.query(Task).filter_by(id=int(task_id)).first() task = dbsession.query(Task).filter_by(id=int(task_id)).first()
metrics = prometheus.get_pod_resource_metric(pytorchjob.name, namespace='pipeline') metrics = prometheus.get_pod_resource_metric(pytorchjob.name, namespace='pipeline')
@ -199,7 +199,7 @@ def save_monitoring(pytorchjob,dbsession):
print(monitoring_new) print(monitoring_new)
if task: if task:
task.monitoring = json.dumps(monitoring_new,ensure_ascii=False,indent=4) task.monitoring = json.dumps(monitoring_new, ensure_ascii=False, indent=4)
dbsession.commit() dbsession.commit()
# print(pods) # print(pods)
@ -209,9 +209,8 @@ def save_monitoring(pytorchjob,dbsession):
print(e) print(e)
# @pysnooper.snoop() # @pysnooper.snoop()
def save_history(pytorchjob,dbsession): def save_history(pytorchjob, dbsession):
info_json = json.loads(pytorchjob.info_json) info_json = json.loads(pytorchjob.info_json)
if info_json['has_push']: if info_json['has_push']:
if not pytorchjob.status in info_json['has_push']: if not pytorchjob.status in info_json['has_push']:
@ -223,13 +222,13 @@ def save_history(pytorchjob,dbsession):
@pysnooper.snoop() @pysnooper.snoop()
def check_crd_exist(group,version,namespace,plural,name): def check_crd_exist(group, version, namespace, plural, name):
exist_crd = client.CustomObjectsApi().get_namespaced_custom_object(group,version,namespace,plural,name) exist_crd = client.CustomObjectsApi().get_namespaced_custom_object(group, version, namespace, plural, name)
return exist_crd return exist_crd
@pysnooper.snoop() @pysnooper.snoop()
def deal_event(event,crd_info,namespace): def deal_event(event, crd_info, namespace):
with session_scope(nullpool=True) as dbsession: with session_scope(nullpool=True) as dbsession:
try: try:
crd_object = event['object'] crd_object = event['object']
@ -265,40 +264,41 @@ def deal_event(event,crd_info,namespace):
elif 'upload-rtx' in back_object: elif 'upload-rtx' in back_object:
back_object['username'] = back_object['labels']['upload-rtx'] back_object['username'] = back_object['labels']['upload-rtx']
has_push, crd_model = check_has_push(back_object,dbsession) has_push, crd_model = check_has_push(back_object, dbsession)
if not has_push: if not has_push:
try: try:
deliver_message(crd_model) deliver_message(crd_model)
except Exception as e1: except Exception as e1:
print('push fail:', e1) print('push fail:', e1)
push_admin(str(e1)) push_admin(str(e1))
save_history(crd_model,dbsession) save_history(crd_model, dbsession)
save_monitoring(crd_model,dbsession) save_monitoring(crd_model, dbsession)
except Exception as e: except Exception as e:
print(e) print(e)
@pysnooper.snoop() @pysnooper.snoop()
def listen_crd(): def listen_crd():
crd_info = conf.get('CRD_INFO')['pytorchjob'] crd_info = conf.get('CRD_INFO')['pytorchjob']
namespace = conf.get('PIPELINE_NAMESPACE') namespace = conf.get('PIPELINE_NAMESPACE')
w = watch.Watch() w = watch.Watch()
print('begin listen') print('begin listen')
while(True): while (True):
try: try:
for event in w.stream(client.CustomObjectsApi().list_namespaced_custom_object, group=crd_info['group'], for event in w.stream(client.CustomObjectsApi().list_namespaced_custom_object, group=crd_info['group'],
version=crd_info['version'], version=crd_info['version'],
namespace=namespace, plural=crd_info['plural'], pretty='true'): namespace=namespace, plural=crd_info['plural'], pretty='true'):
if event['type']=='ADDED' or event['type']=='MODIFIED': # ADDED MODIFIED DELETED if event['type'] == 'ADDED' or event['type'] == 'MODIFIED': # ADDED MODIFIED DELETED
deal_event(event,crd_info,namespace) deal_event(event, crd_info, namespace)
elif event['type']=='ERROR': elif event['type'] == 'ERROR':
w = watch.Watch() w = watch.Watch()
time.sleep(60) time.sleep(60)
except Exception as ee: except Exception as ee:
print(ee) print(ee)
# 不能使用异步io因为stream会阻塞
if __name__=='__main__':
listen_crd()
# 不能使用异步io因为stream会阻塞
if __name__ == '__main__':
listen_crd()

View File

@ -1,5 +1,3 @@
import time, os import time, os
from kubernetes import client from kubernetes import client
from kubernetes import watch from kubernetes import watch
@ -7,16 +5,17 @@ from myapp.utils.py.py_k8s import K8s
from myapp.project import push_message from myapp.project import push_message
from myapp import app from myapp import app
from myapp.utils.celery import session_scope from myapp.utils.celery import session_scope
conf=app.config
cluster=os.getenv('ENVIRONMENT','').lower() conf = app.config
cluster = os.getenv('ENVIRONMENT', '').lower()
if not cluster: if not cluster:
print('no cluster %s'%cluster) print('no cluster %s' % cluster)
exit(1) exit(1)
else: else:
clusters = conf.get('CLUSTERS',{}) clusters = conf.get('CLUSTERS', {})
if clusters and cluster in clusters: if clusters and cluster in clusters:
kubeconfig = clusters[cluster].get('KUBECONFIG','') kubeconfig = clusters[cluster].get('KUBECONFIG', '')
K8s(kubeconfig) K8s(kubeconfig)
else: else:
print('no kubeconfig in cluster %s' % cluster) print('no kubeconfig in cluster %s' % cluster)
@ -24,11 +23,13 @@ else:
from myapp.models.model_serving import InferenceService from myapp.models.model_serving import InferenceService
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
# @pysnooper.snoop() # @pysnooper.snoop()
def listen_service(): def listen_service():
namespace = conf.get('SERVICE_NAMESPACE') namespace = conf.get('SERVICE_NAMESPACE')
w = watch.Watch() w = watch.Watch()
while(True): while (True):
try: try:
print('begin listen') print('begin listen')
for event in w.stream(client.CoreV1Api().list_namespaced_pod, namespace=namespace,timeout_seconds=60): # label_selector=label, for event in w.stream(client.CoreV1Api().list_namespaced_pod, namespace=namespace,timeout_seconds=60): # label_selector=label,
@ -36,7 +37,7 @@ def listen_service():
try: try:
if event['object'].status and event['object'].status.container_statuses and event["type"]=='MODIFIED': # 容器重启会触发MODIFIED if event['object'].status and event['object'].status.container_statuses and event["type"]=='MODIFIED': # 容器重启会触发MODIFIED
# terminated 终止waiting 等待启动running 运行中 # terminated 终止waiting 等待启动running 运行中
container_statuse= event['object'].status.container_statuses[0].state container_statuse = event['object'].status.container_statuses[0].state
terminated = container_statuse.terminated terminated = container_statuse.terminated
# waiting = container_statuse.waiting # waiting = container_statuse.waiting
# running = container_statuse.running # running = container_statuse.running
@ -62,8 +63,7 @@ def listen_service():
print(ee) print(ee)
time.sleep(5) time.sleep(5)
# 不能使用异步io因为stream会阻塞 # 不能使用异步io因为stream会阻塞
if __name__=='__main__': if __name__ == '__main__':
listen_service() listen_service()

View File

@ -1,10 +1,8 @@
import time, datetime, os
import time,datetime, os
from kubernetes import client from kubernetes import client
from kubernetes import watch from kubernetes import watch
import json import json
from myapp.utils.py.py_k8s import check_status_time,K8s from myapp.utils.py.py_k8s import check_status_time, K8s
import pysnooper import pysnooper
from myapp import app from myapp import app
from myapp.models.model_job import ( from myapp.models.model_job import (
@ -12,27 +10,29 @@ from myapp.models.model_job import (
Task Task
) )
from myapp.utils.celery import session_scope from myapp.utils.celery import session_scope
from myapp.project import push_admin,push_message from myapp.project import push_admin, push_message
from myapp.models.model_job import Pipeline from myapp.models.model_job import Pipeline
conf=app.config
conf = app.config
from myapp.utils.py.py_prometheus import Prometheus from myapp.utils.py.py_prometheus import Prometheus
prometheus = Prometheus(conf.get('PROMETHEUS','')) prometheus = Prometheus(conf.get('PROMETHEUS', ''))
cluster=os.getenv('ENVIRONMENT','').lower() cluster = os.getenv('ENVIRONMENT', '').lower()
if not cluster: if not cluster:
print('no cluster %s'%cluster) print('no cluster %s' % cluster)
exit(1) exit(1)
else: else:
clusters = conf.get('CLUSTERS',{}) clusters = conf.get('CLUSTERS', {})
if clusters and cluster in clusters: if clusters and cluster in clusters:
kubeconfig = clusters[cluster].get('KUBECONFIG','') kubeconfig = clusters[cluster].get('KUBECONFIG', '')
K8s(kubeconfig) K8s(kubeconfig)
else: else:
print('no kubeconfig in cluster %s' % cluster) print('no kubeconfig in cluster %s' % cluster)
exit(1) exit(1)
# 推送消息 # 推送消息
# @pysnooper.snoop() # @pysnooper.snoop()
def deliver_message(tfjob): def deliver_message(tfjob):
@ -47,12 +47,12 @@ def deliver_message(tfjob):
info_json = json.loads(tfjob.info_json) info_json = json.loads(tfjob.info_json)
# print(info_json,experiments.status) # print(info_json,experiments.status)
if tfjob.status in info_json['alert_status'] and tfjob.status not in info_json['has_push']: if tfjob.status in info_json['alert_status'] and tfjob.status not in info_json['has_push']:
receivers=list(set(receivers)) receivers = list(set(receivers))
# data = { # data = {
# "Sender": sender, # "Sender": sender,
# "Rcptto":receivers, # "Rcptto":receivers,
# } # }
workflow_name = info_json.get('workflow_name','') workflow_name = info_json.get('workflow_name', '')
hp_name = info_json.get('hp_name', '') hp_name = info_json.get('hp_name', '')
if workflow_name: if workflow_name:
message = "tfjob: %s \nworkflow: %s \nnamespace: %s\nstatus: %s \ntime: %s" % (tfjob.name,workflow_name,tfjob.namespace,tfjob.status,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) message = "tfjob: %s \nworkflow: %s \nnamespace: %s\nstatus: %s \ntime: %s" % (tfjob.name,workflow_name,tfjob.namespace,tfjob.status,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
@ -62,13 +62,13 @@ def deliver_message(tfjob):
message = "tfjob: %s \nnamespace: %s\nstatus: %s \ntime: %s" % (tfjob.name,tfjob.namespace,tfjob.status,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) message = "tfjob: %s \nnamespace: %s\nstatus: %s \ntime: %s" % (tfjob.name,tfjob.namespace,tfjob.status,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
if message: if message:
push_message(receivers,message) push_message(receivers, message)
# @pysnooper.snoop() # @pysnooper.snoop()
def check_has_push(crd,dbsession): def check_has_push(crd, dbsession):
# 可能是workflow启动的或者是hp启动的 # 可能是workflow启动的或者是hp启动的
workflow_name = crd['labels'].get('workflow-name','') workflow_name = crd['labels'].get('workflow-name', '')
hp_name = crd['labels'].get('hp-name', '') hp_name = crd['labels'].get('hp-name', '')
username = crd['username'] username = crd['username']
alert_status = '' alert_status = ''
@ -80,52 +80,52 @@ def check_has_push(crd,dbsession):
print("tf %s from workflow_name %s,user %s,status %s" % (crd['name'],workflow_name,crd['username'],crd['status'])) print("tf %s from workflow_name %s,user %s,status %s" % (crd['name'],workflow_name,crd['username'],crd['status']))
# print("%s status %s"%(crd['name'], crd['status'])) # print("%s status %s"%(crd['name'], crd['status']))
alert_status='Pending' # 这里写死就是相当于必须且仅Pending告警 alert_status = 'Pending' # 这里写死就是相当于必须且仅Pending告警
info_json={ info_json = {
"workflow_name":workflow_name, "workflow_name": workflow_name,
"hp_name":hp_name, "hp_name": hp_name,
"alert_status": alert_status, "alert_status": alert_status,
"has_push":'' "has_push": ''
} }
# print(crd['name'],crd['namespace']) # print(crd['name'],crd['namespace'])
tfjob = dbsession.query(Tfjob).filter(Tfjob.name==crd['name']).filter(Tfjob.namespace==crd['namespace']).first() tfjob = dbsession.query(Tfjob).filter(Tfjob.name == crd['name']).filter(Tfjob.namespace == crd['namespace']).first()
if tfjob: if tfjob:
print('exist tfjob') print('exist tfjob')
if tfjob.info_json: if tfjob.info_json:
exist_info_json = json.loads(tfjob.info_json) exist_info_json = json.loads(tfjob.info_json)
info_json['has_push']=exist_info_json.get('has_push','') info_json['has_push'] = exist_info_json.get('has_push', '')
tfjob.create_time = crd['create_time'] tfjob.create_time = crd['create_time']
tfjob.status = crd['status'] tfjob.status = crd['status']
tfjob.annotations = json.dumps(crd['annotations'],indent=4,ensure_ascii=False) tfjob.annotations = json.dumps(crd['annotations'], indent=4, ensure_ascii=False)
tfjob.labels = json.dumps(crd['labels'],indent=4,ensure_ascii=False) tfjob.labels = json.dumps(crd['labels'], indent=4, ensure_ascii=False)
tfjob.spec = json.dumps(crd['spec'],indent=4,ensure_ascii=False), tfjob.spec = json.dumps(crd['spec'], indent=4, ensure_ascii=False),
tfjob.status_more = json.dumps(crd['status_more'],indent=4,ensure_ascii=False) tfjob.status_more = json.dumps(crd['status_more'], indent=4, ensure_ascii=False)
tfjob.username = crd['username'] tfjob.username = crd['username']
tfjob.info_json = json.dumps(info_json,indent=4,ensure_ascii=False) tfjob.info_json = json.dumps(info_json, indent=4, ensure_ascii=False)
dbsession.commit() dbsession.commit()
if crd['status'] in info_json['alert_status'] and crd['status'] not in info_json['has_push']: if crd['status'] in info_json['alert_status'] and crd['status'] not in info_json['has_push']:
return False,tfjob return False, tfjob
else: else:
return True,tfjob return True, tfjob
else: else:
print('new tfjob') print('new tfjob')
# crd['status_more']={} # crd['status_more']={}
# crd['spec']={} # crd['spec']={}
tfjob = Tfjob(name=crd['name'],namespace=crd['namespace'],create_time=crd['create_time'], tfjob = Tfjob(name=crd['name'], namespace=crd['namespace'], create_time=crd['create_time'],
status=crd['status'], status=crd['status'],
annotations=json.dumps(crd['annotations'],indent=4,ensure_ascii=False), annotations=json.dumps(crd['annotations'], indent=4, ensure_ascii=False),
labels=json.dumps(crd['labels'],indent=4,ensure_ascii=False), labels=json.dumps(crd['labels'], indent=4, ensure_ascii=False),
spec=json.dumps(crd['spec'],indent=4,ensure_ascii=False), spec=json.dumps(crd['spec'], indent=4, ensure_ascii=False),
status_more=json.dumps(crd['status_more'],indent=4,ensure_ascii=False), status_more=json.dumps(crd['status_more'], indent=4, ensure_ascii=False),
username=username, username=username,
info_json=json.dumps(info_json,indent=4,ensure_ascii=False)) info_json=json.dumps(info_json, indent=4, ensure_ascii=False))
dbsession.add(tfjob) dbsession.add(tfjob)
dbsession.commit() dbsession.commit()
return False,tfjob return False, tfjob
# #
@ -164,12 +164,11 @@ def check_has_push(crd,dbsession):
# push_message([task.pipeline.created_by.username],message) # push_message([task.pipeline.created_by.username],message)
# @pysnooper.snoop() # @pysnooper.snoop()
def save_monitoring(tfjob,dbsession): def save_monitoring(tfjob, dbsession):
try: try:
if tfjob.status=='Succeeded': if tfjob.status == 'Succeeded':
task_id = json.loads(tfjob.labels).get('task-id','') task_id = json.loads(tfjob.labels).get('task-id', '')
if task_id: if task_id:
task = dbsession.query(Task).filter_by(id=int(task_id)).first() task = dbsession.query(Task).filter_by(id=int(task_id)).first()
metrics = prometheus.get_pod_resource_metric(tfjob.name, namespace='pipeline') metrics = prometheus.get_pod_resource_metric(tfjob.name, namespace='pipeline')
@ -200,7 +199,7 @@ def save_monitoring(tfjob,dbsession):
print(monitoring_new) print(monitoring_new)
if task: if task:
task.monitoring = json.dumps(monitoring_new,ensure_ascii=False,indent=4) task.monitoring = json.dumps(monitoring_new, ensure_ascii=False, indent=4)
dbsession.commit() dbsession.commit()
# print(pods) # print(pods)
@ -210,9 +209,8 @@ def save_monitoring(tfjob,dbsession):
print(e) print(e)
# @pysnooper.snoop() # @pysnooper.snoop()
def save_history(tfjob,dbsession): def save_history(tfjob, dbsession):
info_json = json.loads(tfjob.info_json) info_json = json.loads(tfjob.info_json)
if info_json['has_push']: if info_json['has_push']:
if not tfjob.status in info_json['has_push']: if not tfjob.status in info_json['has_push']:
@ -224,13 +222,13 @@ def save_history(tfjob,dbsession):
# @pysnooper.snoop() # @pysnooper.snoop()
def check_crd_exist(group,version,namespace,plural,name): def check_crd_exist(group, version, namespace, plural, name):
exist_crd = client.CustomObjectsApi().get_namespaced_custom_object(group,version,namespace,plural,name) exist_crd = client.CustomObjectsApi().get_namespaced_custom_object(group, version, namespace, plural, name)
return exist_crd return exist_crd
@pysnooper.snoop() @pysnooper.snoop()
def deal_event(event,crd_info,namespace): def deal_event(event, crd_info, namespace):
with session_scope(nullpool=True) as dbsession: with session_scope(nullpool=True) as dbsession:
try: try:
crd_object = event['object'] crd_object = event['object']
@ -266,40 +264,41 @@ def deal_event(event,crd_info,namespace):
elif 'upload-rtx' in back_object: elif 'upload-rtx' in back_object:
back_object['username'] = back_object['labels']['upload-rtx'] back_object['username'] = back_object['labels']['upload-rtx']
has_push, crd_model = check_has_push(back_object,dbsession) has_push, crd_model = check_has_push(back_object, dbsession)
if not has_push: if not has_push:
try: try:
deliver_message(crd_model) deliver_message(crd_model)
except Exception as e1: except Exception as e1:
print('push fail:', e1) print('push fail:', e1)
push_admin(str(e1)) push_admin(str(e1))
save_history(crd_model,dbsession) save_history(crd_model, dbsession)
save_monitoring(crd_model,dbsession) save_monitoring(crd_model, dbsession)
except Exception as e: except Exception as e:
print(e) print(e)
@pysnooper.snoop() @pysnooper.snoop()
def listen_crd(): def listen_crd():
crd_info = conf.get('CRD_INFO')['tfjob'] crd_info = conf.get('CRD_INFO')['tfjob']
namespace = conf.get('PIPELINE_NAMESPACE') namespace = conf.get('PIPELINE_NAMESPACE')
w = watch.Watch() w = watch.Watch()
print('begin listen') print('begin listen')
while(True): while (True):
try: try:
for event in w.stream(client.CustomObjectsApi().list_namespaced_custom_object, group=crd_info['group'], for event in w.stream(client.CustomObjectsApi().list_namespaced_custom_object, group=crd_info['group'],
version=crd_info['version'], version=crd_info['version'],
namespace=namespace, plural=crd_info['plural'], pretty='true'): namespace=namespace, plural=crd_info['plural'], pretty='true'):
if event['type']=='ADDED' or event['type']=='MODIFIED': # ADDED MODIFIED DELETED if event['type'] == 'ADDED' or event['type'] == 'MODIFIED': # ADDED MODIFIED DELETED
deal_event(event,crd_info,namespace) deal_event(event, crd_info, namespace)
elif event['type']=='ERROR': elif event['type'] == 'ERROR':
w = watch.Watch() w = watch.Watch()
time.sleep(60) time.sleep(60)
except Exception as ee: except Exception as ee:
print(ee) print(ee)
# 不能使用异步io因为stream会阻塞
if __name__=='__main__':
listen_crd()
# 不能使用异步io因为stream会阻塞
if __name__ == '__main__':
listen_crd()

View File

@ -1,11 +1,10 @@
import pysnooper import pysnooper
import time,datetime, os import time, datetime, os
from kubernetes import client from kubernetes import client
from kubernetes import watch from kubernetes import watch
import json import json
import math import math
from myapp.utils.py.py_k8s import check_status_time,K8s from myapp.utils.py.py_k8s import check_status_time, K8s
from myapp.utils.py.py_prometheus import Prometheus from myapp.utils.py.py_prometheus import Prometheus
from myapp.project import push_message from myapp.project import push_message
from myapp import app from myapp import app
@ -17,26 +16,28 @@ from myapp.models.model_job import (
) )
from myapp.utils.celery import session_scope from myapp.utils.celery import session_scope
conf=app.config
prometheus = Prometheus(conf.get('PROMETHEUS',''))
cluster=os.getenv('ENVIRONMENT','').lower() conf = app.config
prometheus = Prometheus(conf.get('PROMETHEUS', ''))
cluster = os.getenv('ENVIRONMENT', '').lower()
if not cluster: if not cluster:
print('no cluster %s'%cluster) print('no cluster %s' % cluster)
exit(1) exit(1)
else: else:
clusters = conf.get('CLUSTERS',{}) clusters = conf.get('CLUSTERS', {})
if clusters and cluster in clusters: if clusters and cluster in clusters:
kubeconfig = clusters[cluster].get('KUBECONFIG','') kubeconfig = clusters[cluster].get('KUBECONFIG', '')
K8s(kubeconfig) K8s(kubeconfig)
# k8s_config.kube_config.load_kube_config(config_file=kubeconfig) # k8s_config.kube_config.load_kube_config(config_file=kubeconfig)
else: else:
print('no kubeconfig in cluster %s' % cluster) print('no kubeconfig in cluster %s' % cluster)
exit(1) exit(1)
# 推送微信消息 # 推送微信消息
# @pysnooper.snoop() # @pysnooper.snoop()
def deliver_message(workflow,dbsession): def deliver_message(workflow, dbsession):
if not workflow: if not workflow:
return return
@ -46,9 +47,9 @@ def deliver_message(workflow,dbsession):
pipeline_id = json.loads(workflow.labels).get("pipeline-id", '') pipeline_id = json.loads(workflow.labels).get("pipeline-id", '')
if pipeline_id and int(pipeline_id) > 0: if pipeline_id and int(pipeline_id) > 0:
pipeline = dbsession.query(Pipeline).filter_by(id=int(pipeline_id)).first() pipeline = dbsession.query(Pipeline).filter_by(id=int(pipeline_id)).first()
alert_user=pipeline.alert_user.split(',') if pipeline.alert_user else [] alert_user = pipeline.alert_user.split(',') if pipeline.alert_user else []
alert_user=[user.strip() for user in alert_user if user.strip()] alert_user = [user.strip() for user in alert_user if user.strip()]
receivers+=alert_user receivers += alert_user
if not receivers: if not receivers:
print('no receivers') print('no receivers')
@ -57,7 +58,7 @@ def deliver_message(workflow,dbsession):
info_json = json.loads(workflow.info_json) info_json = json.loads(workflow.info_json)
# print(info_json,workflow.status) # print(info_json,workflow.status)
if workflow.status in info_json['alert_status'] and workflow.status not in info_json['has_push']: if workflow.status in info_json['alert_status'] and workflow.status not in info_json['has_push']:
receivers=list(set(receivers)) receivers = list(set(receivers))
# data = { # data = {
# "Sender": sender, # "Sender": sender,
# "Rcptto":receivers, # "Rcptto":receivers,
@ -78,11 +79,12 @@ def deliver_message(workflow,dbsession):
"pod详情":help_url "pod详情":help_url
} }
if message: if message:
push_message(receivers,message,link) push_message(receivers, message, link)
# 保存workflow记录 # 保存workflow记录
# @pysnooper.snoop() # @pysnooper.snoop()
def save_workflow(crd,dbsession): def save_workflow(crd, dbsession):
pipeline_id = crd['labels'].get('pipeline-id', '') pipeline_id = crd['labels'].get('pipeline-id', '')
pipeline = dbsession.query(Pipeline).filter_by(id=int(pipeline_id)).first() pipeline = dbsession.query(Pipeline).filter_by(id=int(pipeline_id)).first()
if not pipeline: if not pipeline:
@ -106,7 +108,7 @@ def save_workflow(crd,dbsession):
workflow.labels = json.dumps(crd['labels'], indent=4, ensure_ascii=False) workflow.labels = json.dumps(crd['labels'], indent=4, ensure_ascii=False)
workflow.spec = json.dumps(crd['spec'], indent=4, ensure_ascii=False), workflow.spec = json.dumps(crd['spec'], indent=4, ensure_ascii=False),
workflow.status_more = json.dumps(crd['status_more'], indent=4, ensure_ascii=False) workflow.status_more = json.dumps(crd['status_more'], indent=4, ensure_ascii=False)
workflow.cluster=cluster workflow.cluster = cluster
dbsession.commit() dbsession.commit()
else: else:
@ -118,7 +120,7 @@ def save_workflow(crd,dbsession):
"has_push": '' "has_push": ''
} }
print('new workflow') print('new workflow')
workflow = Workflow(name=crd['name'], cluster=cluster,namespace=crd['namespace'], create_time=crd['create_time'], workflow = Workflow(name=crd['name'], cluster=cluster, namespace=crd['namespace'], create_time=crd['create_time'],
status=crd['status'], status=crd['status'],
annotations=json.dumps(crd['annotations'], indent=4, ensure_ascii=False), annotations=json.dumps(crd['annotations'], indent=4, ensure_ascii=False),
labels=json.dumps(crd['labels'], indent=4, ensure_ascii=False), labels=json.dumps(crd['labels'], indent=4, ensure_ascii=False),
@ -130,16 +132,17 @@ def save_workflow(crd,dbsession):
dbsession.commit() dbsession.commit()
# 更新runhistory # 更新runhistory
pipeline_run_id = json.loads(workflow.labels).get("run-id",'') pipeline_run_id = json.loads(workflow.labels).get("run-id", '')
if pipeline_run_id: if pipeline_run_id:
run_history = dbsession.query(RunHistory).filter_by(run_id=pipeline_run_id).first() run_history = dbsession.query(RunHistory).filter_by(run_id=pipeline_run_id).first()
if run_history: if run_history:
run_history.status=crd['status'] run_history.status = crd['status']
dbsession.commit() dbsession.commit()
return workflow return workflow
# @pysnooper.snoop() # @pysnooper.snoop()
def check_has_push(crd,dbsession): def check_has_push(crd, dbsession):
workflow = dbsession.query(Workflow).filter(Workflow.name == crd['name']).filter(Workflow.namespace == crd['namespace']).first() workflow = dbsession.query(Workflow).filter(Workflow.name == crd['name']).filter(Workflow.namespace == crd['namespace']).first()
if workflow and workflow.info_json: if workflow and workflow.info_json:
info_json = json.loads(workflow.info_json) info_json = json.loads(workflow.info_json)
@ -152,55 +155,55 @@ def check_has_push(crd,dbsession):
# 推送修改通知 # 推送修改通知
# @pysnooper.snoop() # @pysnooper.snoop()
def push_resource_rec(workflow,dbsession): def push_resource_rec(workflow, dbsession):
pipeline_id = json.loads(workflow.labels).get('pipeline-id', '') pipeline_id = json.loads(workflow.labels).get('pipeline-id', '')
pipeline = dbsession.query(Pipeline).filter_by(id=int(pipeline_id)).first() pipeline = dbsession.query(Pipeline).filter_by(id=int(pipeline_id)).first()
if pipeline: if pipeline:
init_message = 'pipeline(%s)根据近10次的任务训练资源使用情况系统做如下调整:\n'%pipeline.describe init_message = 'pipeline(%s)根据近10次的任务训练资源使用情况系统做如下调整:\n' % pipeline.describe
message = init_message message = init_message
tasks = dbsession.query(Task).filter(Task.pipeline_id == int(pipeline_id)).all() tasks = dbsession.query(Task).filter(Task.pipeline_id == int(pipeline_id)).all()
for task in tasks: for task in tasks:
if 'NO_RESOURCE_CHECK' not in task.job_template.env.replace("-","_").upper(): if 'NO_RESOURCE_CHECK' not in task.job_template.env.replace("-", "_").upper():
task_monitorings = json.loads(task.monitoring).get('task',[]) task_monitorings = json.loads(task.monitoring).get('task', [])
if len(task_monitorings)>9: if len(task_monitorings) > 9:
max_cpu = 0 max_cpu = 0
max_memory=0 max_memory = 0
for task_monitoring in task_monitorings: for task_monitoring in task_monitorings:
if float(task_monitoring.get('cpu',0))>max_cpu: if float(task_monitoring.get('cpu', 0)) > max_cpu:
max_cpu = float(task_monitoring.get('cpu',0)) max_cpu = float(task_monitoring.get('cpu', 0))
if float(task_monitoring.get('memory', 0)) > max_memory: if float(task_monitoring.get('memory', 0)) > max_memory:
max_memory = float(task_monitoring.get('memory', 0)) max_memory = float(task_monitoring.get('memory', 0))
if max_cpu: if max_cpu:
rec_cpu = math.ceil(max_cpu*1.4)+2 rec_cpu = math.ceil(max_cpu * 1.4) + 2
if rec_cpu>150: if rec_cpu > 150:
rec_cpu=150 rec_cpu = 150
if rec_cpu!=int(task.resource_cpu): if rec_cpu != int(task.resource_cpu):
message += "task(%s)原申请cpu:%s近10次最大使用cpu:%s,新申请值:%s\n" % (task.label,task.resource_cpu, max_cpu, rec_cpu) message += "task(%s)原申请cpu:%s近10次最大使用cpu:%s,新申请值:%s\n" % (task.label, task.resource_cpu, max_cpu, rec_cpu)
task.resource_cpu = str(rec_cpu) task.resource_cpu = str(rec_cpu)
if max_memory: if max_memory:
rec_memory = math.ceil(max_memory*1.4)+2 rec_memory = math.ceil(max_memory * 1.4) + 2
if rec_memory>350: if rec_memory > 350:
rec_memory=350 rec_memory = 350
if rec_memory!=int(task.resource_memory.replace('G','').replace('M','')): if rec_memory != int(task.resource_memory.replace('G', '').replace('M', '')):
message += "task(%s)原申请mem:%s近10次最大使用mem:%s(G),新申请值:%s\n" % (task.label,task.resource_memory, max_memory, str(rec_memory)+"G") message += "task(%s)原申请mem:%s近10次最大使用mem:%s(G),新申请值:%s\n" % (task.label, task.resource_memory, max_memory, str(rec_memory) + "G")
task.resource_memory = str(rec_memory)+"G" task.resource_memory = str(rec_memory) + "G"
dbsession.commit() dbsession.commit()
if message!=init_message: if message != init_message:
alert_user = pipeline.alert_user.split(',') if pipeline.alert_user else [] alert_user = pipeline.alert_user.split(',') if pipeline.alert_user else []
alert_user = [user.strip() for user in alert_user if user.strip()] alert_user = [user.strip() for user in alert_user if user.strip()]
receivers = alert_user+[pipeline.created_by.username] receivers = alert_user + [pipeline.created_by.username]
receivers=list(set(receivers)) receivers = list(set(receivers))
push_message(receivers,message) push_message(receivers, message)
# 推送训练耗时通知 # 推送训练耗时通知
# @pysnooper.snoop() # @pysnooper.snoop()
def push_task_time(workflow,dbsession): def push_task_time(workflow, dbsession):
if not workflow: if not workflow:
return return
nodes = json.loads(workflow.status_more).get('nodes',{}) nodes = json.loads(workflow.status_more).get('nodes', {})
pods = {} pods = {}
for node_name in nodes: for node_name in nodes:
if nodes[node_name]['type'] == 'Pod' and nodes[node_name]['phase'] == 'Succeeded': if nodes[node_name]['type'] == 'Pod' and nodes[node_name]['phase'] == 'Succeeded':
@ -209,8 +212,8 @@ def push_task_time(workflow,dbsession):
if pipeline_id and pods: if pipeline_id and pods:
pipeline = dbsession.query(Pipeline).filter_by(id=pipeline_id).first() pipeline = dbsession.query(Pipeline).filter_by(id=pipeline_id).first()
if pipeline: if pipeline:
message='\n%s %s各task耗时酌情优化:\n'%(pipeline.describe,pipeline.created_by.username) message = '\n%s %s各task耗时酌情优化:\n' % (pipeline.describe, pipeline.created_by.username)
task_pod_time={} task_pod_time = {}
for pod_name in pods: for pod_name in pods:
# print(pods[pod_name]) # print(pods[pod_name])
task_name = pods[pod_name]['displayName'] task_name = pods[pod_name]['displayName']
@ -222,65 +225,64 @@ def push_task_time(workflow,dbsession):
if startAt in task_pod_time and task_pod_time[startAt]: if startAt in task_pod_time and task_pod_time[startAt]:
task_pod_time[startAt].append( task_pod_time[startAt].append(
{ {
"task":task.label, "task": task.label,
"run_time":str(run_time) "run_time": str(run_time)
} }
) )
else: else:
task_pod_time[startAt]=[ task_pod_time[startAt] = [
{ {
"task": task.label, "task": task.label,
"run_time": str(run_time) "run_time": str(run_time)
} }
] ]
task_pod_time_sorted = sorted(task_pod_time.items(),key=lambda item:item[0]) task_pod_time_sorted = sorted(task_pod_time.items(), key=lambda item: item[0])
max_task_run_time = 0 max_task_run_time = 0
for task_pods in task_pod_time_sorted: for task_pods in task_pod_time_sorted:
for task_pod in task_pods[1]: for task_pod in task_pods[1]:
message+=task_pod['task']+":"+task_pod['run_time']+"(h)\n" message += task_pod['task'] + ":" + task_pod['run_time'] + "(h)\n"
try: try:
if float(task_pod['run_time'])>max_task_run_time: if float(task_pod['run_time']) > max_task_run_time:
max_task_run_time = float(task_pod['run_time']) max_task_run_time = float(task_pod['run_time'])
except Exception as e: except Exception as e:
print(e) print(e)
# 记录是否已经推送,不然反复推送不好 # 记录是否已经推送,不然反复推送不好
info_json = json.loads(workflow.info_json) info_json = json.loads(workflow.info_json)
if info_json.get('push_task_time',''): if info_json.get('push_task_time', ''):
pass pass
else: else:
info_json['push_task_time'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') info_json['push_task_time'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
workflow.info_json = json.dumps(info_json, indent=4, ensure_ascii=False) workflow.info_json = json.dumps(info_json, indent=4, ensure_ascii=False)
dbsession.commit() dbsession.commit()
message+="\n" message += "\n"
link = { link = {
"点击查看资源的使用": "http://%s/pipeline_modelview/web/monitoring/%s"%(conf.get('HOST'),pipeline_id) "点击查看资源的使用": "http://%s/pipeline_modelview/web/monitoring/%s" % (conf.get('HOST'), pipeline_id)
} }
# 有单任务运行时长超过4个小时才通知 # 有单任务运行时长超过4个小时才通知
if max_task_run_time>4: if max_task_run_time > 4:
push_message(conf.get('ADMIN_USER').split(','),message,link) push_message(conf.get('ADMIN_USER').split(','), message, link)
alert_user = pipeline.alert_user.split(',') if pipeline.alert_user else [] alert_user = pipeline.alert_user.split(',') if pipeline.alert_user else []
alert_user = [user.strip() for user in alert_user if user.strip()] alert_user = [user.strip() for user in alert_user if user.strip()]
receivers = alert_user + [workflow.username] receivers = alert_user + [workflow.username]
receivers = list(set(receivers)) receivers = list(set(receivers))
push_message(receivers,message,link) push_message(receivers, message, link)
# @pysnooper.snoop() # @pysnooper.snoop()
def save_monitoring(workflow,dbsession): def save_monitoring(workflow, dbsession):
try: try:
if workflow.status=='Succeeded': if workflow.status == 'Succeeded':
# 获取下面的所有pod # 获取下面的所有pod
nodes = json.loads(workflow.status_more).get('nodes',{}) nodes = json.loads(workflow.status_more).get('nodes', {})
pods={} pods = {}
for node_name in nodes: for node_name in nodes:
if nodes[node_name]['type']=='Pod' and nodes[node_name]['phase']=='Succeeded': if nodes[node_name]['type'] == 'Pod' and nodes[node_name]['phase'] == 'Succeeded':
pods[node_name]=nodes[node_name] pods[node_name] = nodes[node_name]
pipeline_id = json.loads(workflow.labels).get('pipeline-id','') pipeline_id = json.loads(workflow.labels).get('pipeline-id', '')
if pipeline_id and pods: if pipeline_id and pods:
for pod_name in pods: for pod_name in pods:
print(pods[pod_name]) print(pods[pod_name])
@ -290,13 +292,13 @@ def save_monitoring(workflow,dbsession):
task = dbsession.query(Task).filter(Task.pipeline_id == int(pipeline_id)).filter(Task.name == task_name).first() task = dbsession.query(Task).filter(Task.pipeline_id == int(pipeline_id)).filter(Task.name == task_name).first()
metrics = prometheus.get_pod_resource_metric(pod_name, namespace='pipeline') metrics = prometheus.get_pod_resource_metric(pod_name, namespace='pipeline')
monitoring = json.loads(task.monitoring) if task and task.monitoring else {} monitoring = json.loads(task.monitoring) if task and task.monitoring else {}
task_monitoring = monitoring.get('task',[]) task_monitoring = monitoring.get('task', [])
if metrics: if metrics:
task_monitoring.append({ task_monitoring.append({
"cpu":metrics.get('cpu', ''), "cpu": metrics.get('cpu', ''),
"memory":metrics.get('memory', ''), "memory": metrics.get('memory', ''),
"pod_name":pod_name, "pod_name": pod_name,
"update_time":datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') "update_time": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}) })
# 清理监控记录 # 清理监控记录
@ -306,16 +308,16 @@ def save_monitoring(workflow,dbsession):
if float(metric.get('cpu',0))>0.1 and float(metric.get('memory',0))>0.1 and metric['update_time']>(datetime.datetime.now()-datetime.timedelta(days=30)).strftime('%Y-%m-%d %H:%M:%S'): if float(metric.get('cpu',0))>0.1 and float(metric.get('memory',0))>0.1 and metric['update_time']>(datetime.datetime.now()-datetime.timedelta(days=30)).strftime('%Y-%m-%d %H:%M:%S'):
task_monitoring_new.append(metric) task_monitoring_new.append(metric)
if len(task_monitoring_new)>10: if len(task_monitoring_new) > 10:
del task_monitoring_new[0] del task_monitoring_new[0]
monitoring_new={} monitoring_new = {}
monitoring_new['task']= task_monitoring_new monitoring_new['task'] = task_monitoring_new
monitoring_new['tfjob'] = monitoring.get('tfjob',[]) monitoring_new['tfjob'] = monitoring.get('tfjob', [])
print(monitoring_new) print(monitoring_new)
if task: if task:
task.monitoring = json.dumps(monitoring_new,ensure_ascii=False,indent=4) task.monitoring = json.dumps(monitoring_new, ensure_ascii=False, indent=4)
dbsession.commit() dbsession.commit()
push_task_time(workflow, dbsession) push_task_time(workflow, dbsession)
@ -326,9 +328,8 @@ def save_monitoring(workflow,dbsession):
print(e) print(e)
# @pysnooper.snoop() # @pysnooper.snoop()
def save_history(workflow,dbsession): def save_history(workflow, dbsession):
info_json = json.loads(workflow.info_json) info_json = json.loads(workflow.info_json)
if info_json['has_push']: if info_json['has_push']:
if not workflow.status in info_json['has_push']: if not workflow.status in info_json['has_push']:
@ -338,17 +339,15 @@ def save_history(workflow,dbsession):
workflow.info_json = json.dumps(info_json, indent=4, ensure_ascii=False) workflow.info_json = json.dumps(info_json, indent=4, ensure_ascii=False)
dbsession.commit() dbsession.commit()
# @pysnooper.snoop() # @pysnooper.snoop()
def check_crd_exist(group,version,namespace,plural,name): def check_crd_exist(group, version, namespace, plural, name):
exist_crd = client.CustomObjectsApi().get_namespaced_custom_object(group,version,namespace,plural,name) exist_crd = client.CustomObjectsApi().get_namespaced_custom_object(group, version, namespace, plural, name)
return exist_crd return exist_crd
# @pysnooper.snoop() # @pysnooper.snoop()
def deal_event(event,workflow_info,namespace): def deal_event(event, workflow_info, namespace):
with session_scope(nullpool=True) as dbsession: with session_scope(nullpool=True) as dbsession:
try: try:
crd_object = event['object'] crd_object = event['object']
@ -359,14 +358,13 @@ def deal_event(event,workflow_info,namespace):
creat_time = crd_object['metadata']['creationTimestamp'].replace('T', ' ').replace('Z', '') creat_time = crd_object['metadata']['creationTimestamp'].replace('T', ' ').replace('Z', '')
creat_time = (datetime.datetime.strptime(creat_time,'%Y-%m-%d %H:%M:%S')+datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S') creat_time = (datetime.datetime.strptime(creat_time,'%Y-%m-%d %H:%M:%S')+datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')
# 不能直接使用里面的状态 # 不能直接使用里面的状态
status='' status = ''
if 'status' in crd_object and 'nodes' in crd_object['status']: if 'status' in crd_object and 'nodes' in crd_object['status']:
keys = list(crd_object['status']['nodes'].keys()) keys = list(crd_object['status']['nodes'].keys())
status = crd_object['status']['nodes'][keys[-1]]['phase'] status = crd_object['status']['nodes'][keys[-1]]['phase']
if status != 'Pending': if status != 'Pending':
status = crd_object['status']['phase'] status = crd_object['status']['phase']
back_object = { back_object = {
"name": crd_object['metadata']['name'], "name": crd_object['metadata']['name'],
"namespace": crd_object['metadata']['namespace'] if 'namespace' in crd_object['metadata'] else '', "namespace": crd_object['metadata']['namespace'] if 'namespace' in crd_object['metadata'] else '',
@ -382,43 +380,43 @@ def deal_event(event,workflow_info,namespace):
back_object['username'] = back_object['labels']['run-rtx'] back_object['username'] = back_object['labels']['run-rtx']
elif 'upload-rtx' in back_object: elif 'upload-rtx' in back_object:
back_object['username'] = back_object['labels']['upload-rtx'] back_object['username'] = back_object['labels']['upload-rtx']
workflow = save_workflow(back_object,dbsession) workflow = save_workflow(back_object, dbsession)
if workflow: if workflow:
has_push = check_has_push(back_object,dbsession) has_push = check_has_push(back_object, dbsession)
if not has_push: if not has_push:
try: try:
deliver_message(workflow,dbsession) deliver_message(workflow, dbsession)
except Exception as e1: except Exception as e1:
print('push fail:', e1) print('push fail:', e1)
push_message(conf.get('ADMIN_USER').split(','),'push fail'+str(e1)) push_message(conf.get('ADMIN_USER').split(','), 'push fail' + str(e1))
save_history(workflow,dbsession) save_history(workflow, dbsession)
save_monitoring(workflow, dbsession) save_monitoring(workflow, dbsession)
except Exception as e: except Exception as e:
print(e) print(e)
# @pysnooper.snoop() # @pysnooper.snoop()
def listen_workflow(): def listen_workflow():
workflow_info = conf.get('CRD_INFO')['workflow'] workflow_info = conf.get('CRD_INFO')['workflow']
namespace = conf.get('PIPELINE_NAMESPACE') # 不仅这一个命名空间 namespace = conf.get('PIPELINE_NAMESPACE') # 不仅这一个命名空间
w = watch.Watch() w = watch.Watch()
print('begin listen') print('begin listen')
while(True): while (True):
try: try:
for event in w.stream(client.CustomObjectsApi().list_namespaced_custom_object, group=workflow_info['group'], for event in w.stream(client.CustomObjectsApi().list_namespaced_custom_object, group=workflow_info['group'],
version=workflow_info["version"], version=workflow_info["version"],
namespace=namespace, plural=workflow_info["plural"]): # label_selector=label, namespace=namespace, plural=workflow_info["plural"]): # label_selector=label,
if event['type']=='ADDED' or event['type']=='MODIFIED': # ADDED MODIFIED DELETED if event['type'] == 'ADDED' or event['type'] == 'MODIFIED': # ADDED MODIFIED DELETED
deal_event(event,workflow_info,namespace) deal_event(event, workflow_info, namespace)
elif event['type']=='ERROR': elif event['type'] == 'ERROR':
w = watch.Watch() w = watch.Watch()
time.sleep(60) time.sleep(60)
except Exception as ee: except Exception as ee:
print(ee) print(ee)
# 不能使用异步io因为stream会阻塞 # 不能使用异步io因为stream会阻塞
if __name__=='__main__': if __name__ == '__main__':
listen_workflow() listen_workflow()