cube-studio/myapp/tools/watch_service.py

86 lines
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time,logging,os,sys
import asyncio
from kubernetes import client
from kubernetes import watch
from os import path
import json
import requests
import math
from sqlalchemy.exc import InvalidRequestError,OperationalError
import pysnooper
import copy
import myapp
from myapp.utils.py.py_k8s import check_status_time,K8s
from myapp.utils.py.py_prometheus import Prometheus
from myapp.project import push_admin,push_message
from myapp import app, db, security_manager
from myapp.models.model_job import (
Pipeline,
Workflow,
Task
)
from myapp.utils.celery import session_scope
conf=app.config
cluster=os.getenv('ENVIRONMENT','').lower()
if not cluster:
print('no cluster %s'%cluster)
exit(1)
else:
clusters = conf.get('CLUSTERS',{})
if clusters and cluster in clusters:
kubeconfig = clusters[cluster].get('KUBECONFIG','')
K8s(kubeconfig)
else:
print('no kubeconfig in cluster %s' % cluster)
exit(1)
from myapp.models.model_serving import InferenceService
from datetime import datetime, timezone, timedelta
# @pysnooper.snoop()
def listen_service():
namespace = conf.get('SERVICE_NAMESPACE')
w = watch.Watch()
# label = 'pipelines.kubeflow.org/kfp_sdk_version=1.0.4'
while(True):
try:
print('begin listen')
for event in w.stream(client.CoreV1Api().list_namespaced_pod, namespace=namespace,timeout_seconds=60): # label_selector=label,
with session_scope(nullpool=True) as dbsession:
try:
if event['object'].status and event['object'].status.container_statuses and event["type"]=='MODIFIED': # 容器重启会触发MODIFIED
# terminated 终止waiting 等待启动running 运行中
container_statuse= event['object'].status.container_statuses[0].state
terminated = container_statuse.terminated
waiting = container_statuse.waiting
running = container_statuse.running
service_name=event['object'].metadata.labels.get('app','')
inferenceserving = dbsession.query(InferenceService).filter_by(name=service_name).first() if service_name else None
if service_name and inferenceserving:
# print(event['object'].status)
if terminated and terminated.finished_at: # 任务终止
finished_at = int(terminated.finished_at.astimezone(timezone(timedelta(hours=8))).timestamp()) # 要找事件发生的时间
if (datetime.now().timestamp() - finished_at) < 5:
message = "pod: %s, user: %s, status: %s" % (event['object'].metadata.name,inferenceserving.created_by.username, 'terminated')
push_message([inferenceserving.created_by.username]+conf.get('ADMIN_USER').split(','), message)
# if running and running.started_at: # 任务重启运行
# start_time = int(running.started_at.astimezone(timezone(timedelta(hours=8))).timestamp()) # 要找事件发生的时间
# if (datetime.now().timestamp() - start_time) < 5:
# message = "pod %s %s" % (event['object'].metadata.name, 'running')
# push_message([inferenceserving.created_by.username]+conf.get('ADMIN_USER').split(','), message)
except Exception as e:
print(e)
except Exception as ee:
print(ee)
time.sleep(5)
# 不能使用异步io因为stream会阻塞
if __name__=='__main__':
listen_service()