支持国家化

This commit is contained in:
data-infra 2023-12-11 13:17:26 +08:00
parent f9579fb855
commit cb95b9520d
5 changed files with 23 additions and 15 deletions

View File

@ -1,7 +1,7 @@
import sys
import os
import sys, os
import sys, os
from flask_babel import gettext as __
from flask_babel import lazy_gettext as _
dir_common = os.path.split(os.path.realpath(__file__))[0] + '/../'
print(dir_common)
sys.path.append(dir_common) # 将根目录添加到系统目录,才能正常引用common文件夹
@ -33,13 +33,13 @@ def check_push():
unscheduld_num = r.llen('celery')
print(unscheduld_num)
if unscheduld_num > 100:
push_admin('超过100个任务堆积未被调度')
push_admin(__('超过100个任务堆积未被调度'))
return
if r.exists('unacked'):
unacked_num = r.hlen('unacked')
print(unacked_num)
if unacked_num > 500:
push_admin("超过500个调度未完成")
push_admin(__("超过500个调度未完成"))
except Exception as e:
print(e)

View File

@ -1,4 +1,6 @@
import time, datetime, os
from flask_babel import gettext as __
from flask_babel import lazy_gettext as _
from kubernetes import client
from kubernetes import watch
import json
@ -221,13 +223,13 @@ def save_history(pytorchjob, dbsession):
dbsession.commit()
@pysnooper.snoop()
# @pysnooper.snoop()
def check_crd_exist(group, version, namespace, plural, name):
exist_crd = client.CustomObjectsApi().get_namespaced_custom_object(group, version, namespace, plural, name)
return exist_crd
@pysnooper.snoop()
# @pysnooper.snoop()
def deal_event(event, crd_info, namespace):
with session_scope(nullpool=True) as dbsession:
try:
@ -277,7 +279,7 @@ def deal_event(event, crd_info, namespace):
print(e)
@pysnooper.snoop()
# @pysnooper.snoop()
def listen_crd():
crd_info = conf.get('CRD_INFO')['pytorchjob']
namespace = conf.get('PIPELINE_NAMESPACE')

View File

@ -1,4 +1,6 @@
import time, os
from flask_babel import gettext as __
from flask_babel import lazy_gettext as _
from kubernetes import client
from kubernetes import watch
from myapp.utils.py.py_k8s import K8s

View File

@ -1,4 +1,6 @@
import time, datetime, os
from flask_babel import gettext as __
from flask_babel import lazy_gettext as _
from kubernetes import client
from kubernetes import watch
import json
@ -227,7 +229,7 @@ def check_crd_exist(group, version, namespace, plural, name):
return exist_crd
@pysnooper.snoop()
# @pysnooper.snoop()
def deal_event(event, crd_info, namespace):
with session_scope(nullpool=True) as dbsession:
try:
@ -277,7 +279,7 @@ def deal_event(event, crd_info, namespace):
print(e)
@pysnooper.snoop()
# @pysnooper.snoop()
def listen_crd():
crd_info = conf.get('CRD_INFO')['tfjob']
namespace = conf.get('PIPELINE_NAMESPACE')

View File

@ -1,5 +1,7 @@
import pysnooper
import time, datetime, os
from flask_babel import gettext as __
from flask_babel import lazy_gettext as _
from kubernetes import client
from kubernetes import watch
import json
@ -76,7 +78,7 @@ def deliver_message(workflow, dbsession):
message = "workflow: %s \npipeline: %s(%s) \nnamespace: %s\nstatus: % s \nstart_time: %s\nfinish_time: %s\n" % (workflow.name,info_json.get('pipeline_name',''),info_json.get('describe',''),workflow.namespace,workflow.status,start_time,finish_time)
message+='\n'
link={
"pod详情":help_url
__("pod详情"):help_url
}
if message:
push_message(receivers, message, link)
@ -159,7 +161,7 @@ def push_resource_rec(workflow, dbsession):
pipeline_id = json.loads(workflow.labels).get('pipeline-id', '')
pipeline = dbsession.query(Pipeline).filter_by(id=int(pipeline_id)).first()
if pipeline:
init_message = 'pipeline(%s)根据近10次的任务训练资源使用情况系统做如下调整:\n' % pipeline.describe
init_message = __('pipeline(%s)根据近10次的任务训练资源使用情况系统做如下调整:\n') % pipeline.describe
message = init_message
tasks = dbsession.query(Task).filter(Task.pipeline_id == int(pipeline_id)).all()
for task in tasks:
@ -178,14 +180,14 @@ def push_resource_rec(workflow, dbsession):
if rec_cpu > 150:
rec_cpu = 150
if rec_cpu != int(task.resource_cpu):
message += "task(%s)原申请cpu:%s近10次最大使用cpu:%s,新申请值:%s\n" % (task.label, task.resource_cpu, max_cpu, rec_cpu)
message += __("task(%s)原申请cpu:%s近10次最大使用cpu:%s,新申请值:%s\n") % (task.label, task.resource_cpu, max_cpu, rec_cpu)
task.resource_cpu = str(rec_cpu)
if max_memory:
rec_memory = math.ceil(max_memory * 1.4) + 2
if rec_memory > 350:
rec_memory = 350
if rec_memory != int(task.resource_memory.replace('G', '').replace('M', '')):
message += "task(%s)原申请mem:%s近10次最大使用mem:%s(G),新申请值:%s\n" % (task.label, task.resource_memory, max_memory, str(rec_memory) + "G")
message += __("task(%s)原申请mem:%s近10次最大使用mem:%s(G),新申请值:%s\n") % (task.label, task.resource_memory, max_memory, str(rec_memory) + "G")
task.resource_memory = str(rec_memory) + "G"
dbsession.commit()
if message != init_message:
@ -212,7 +214,7 @@ def push_task_time(workflow, dbsession):
if pipeline_id and pods:
pipeline = dbsession.query(Pipeline).filter_by(id=pipeline_id).first()
if pipeline:
message = '\n%s %s各task耗时酌情优化:\n' % (pipeline.describe, pipeline.created_by.username)
message = __('\n%s %s各task耗时酌情优化:\n') % (pipeline.describe, pipeline.created_by.username)
task_pod_time = {}
for pod_name in pods:
# print(pods[pod_name])