add cube pushgateway

This commit is contained in:
pengluan 2022-12-06 10:48:09 +08:00
parent 51b31448d6
commit d48a83c4f9
15 changed files with 808 additions and 68 deletions

View File

@ -0,0 +1,31 @@
# docker build -t ccr.ccs.tencentyun.com/cube-studio/prometheus:pushgateway .
FROM ubuntu:18.04
RUN apt update && \
apt install -y python3.6-dev python3-pip curl iputils-ping net-tools python3-setuptools && \
apt install -y --force-yes --no-install-recommends locales ttf-wqy-microhei ttf-wqy-zenhei xfonts-wqy && \
locale-gen zh_CN && locale-gen zh_CN.utf8 && \
ln -s /usr/bin/python3 /usr/bin/python && \
ln -s /usr/bin/pip3 /usr/bin/pip && \
pip install simplejson requests uvloop asyncio paramiko prometheus_client aiohttp webhooks opsdroid aiohttp_cors && \
rm -rf /root/.cache && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* /usr/share/man /usr/share/doc /usr/share/doc-base
ENV LANG zh_CN.UTF-8
ENV LC_ALL zh_CN.UTF-8
ENV LANGUAGE zh_CN.UTF-8
COPY util /app/pushgateway/util
COPY test /app/pushgateway/test
COPY server.py /app/pushgateway/
EXPOSE 80
USER root
WORKDIR /app/pushgateway
CMD ["python","server.py"]

View File

@ -0,0 +1,72 @@
# pushgateway 使用教程
## 1、prometheus 数据统计
post数据格式
```
type:'prometheus'
metrics:
{
'metric_name1':{
'lables':['method', 'clientip'],
'describe':'this is test',
'exist_not_update_type':'clear',
'exist_update_type':'update',
'not_exist_update_type':'add',
'push_deal_type':'clear',
'data':[
[['get','192.168.11.127'],4],
[['get','192.168.12.49'],3],
[['post','192.168.11.127'],5]
]
},
'metric_name2':{
}
}
```
其中
exist_not_update_type(已存在但不更新的数据),必选参数
exist_update_type(已存在切更新的数据),必选参数
not_exist_update_type(不存在但更新的数据),必选参数
pull_finish_deal_type(数据被拉取以后的处理行为),必选参数
可选参数
```
# update 覆盖原有值
# clear 删除属性
# keep 保留原状态
# add 属性的value累加
# reset 属性的值设置为0
```
python server内部存储结构
```
{
'metric_name1':{
'lables':['method', 'clientip'],
'describe':'this is test',
'exist_not_update_type':'clear',
'exist_update_type':'update',
'not_exist_update_type':'add',
'data':{
('get','192.168.11.127'):4,
('get','192.168.12.49'):3,
('post','192.168.11.127'):5
}
},
'metric_name2':{
}
}
```
## 2、报警推送代理
post访问接口`/{client}/webhook`
参数
sender_type字符串。推送类型目前支持wechat和rtx_group
sender字符串。推送者TME_DataInfra或企业微信机器人key
username字符串。接收用户(逗号分隔多用户)(微信推送时为rtx企业微信群推送时为空)
message: 推送字符串如果有message字段则仅推送message字段否则除上面之外的所有字段会json序列化为message推送

View File

@ -0,0 +1,18 @@
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
labels:
k8s-app: cloud-pushgateway # 这里必须是k8s-app 不然prometheus采集不了该资源
name: cloud-pushgateway
namespace: monitoring
spec:
endpoints: # endpoints
- interval: 30s
port: http # 端口号只能是字符串, 在pod中定义的端口name
jobLabel: k8s-app
namespaceSelector:
matchNames:
- monitoring
selector:
matchLabels:
app: cloud-pushgateway-service # 匹配service

View File

@ -0,0 +1,74 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: cloud-pushgateway-deployment
namespace: monitoring
labels:
app: cloud-pushgateway-deployment
spec: # 包括一些containerstoragevolume以及其他Kubernetes需要的参数以及诸如是否在容器失败时重新启动容器的属性。可在特定Kubernetes API找到完整的Kubernetes Pod的属性。
selector:
matchLabels:
app: cloud-pushgateway-pod
replicas: 1 # 选项定义需要的副本个数此处可以设置很多属性例如受此Deployment影响的Pod的选择器等
template: # template其实就是对Pod对象的定义
metadata:
labels:
app: cloud-pushgateway-pod
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: monitoring
operator: In
values:
- "true"
volumes:
- name: tz-config
hostPath:
path: /usr/share/zoneinfo/Asia/Shanghai
imagePullSecrets:
- name: hubsecret
containers:
- name: cloud-pushgateway-container # 容器名称
image: ccr.ccs.tencentyun.com/cube-studio/prometheus:pushgateway
command: ['python','server.py']
# command: ['sleep','300000']
workingDir: /app/pushgateway #
imagePullPolicy: Always
env:
- name: sender
value: TME_DataInfra # development
# - name: receiver
# value: pengluan
ports: # 容器将会监听的指定端口号
- containerPort: 80
resources:
limits:
cpu: 1000m
memory: 2000Mi
requests:
cpu: 10m
memory: 100Mi
readinessProbe:
httpGet:
path: "/"
port: 80
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 30
livenessProbe:
httpGet:
path: "/"
port: 80
initialDelaySeconds: 100
timeoutSeconds: 5
periodSeconds: 30
volumeMounts:
- name: tz-config
mountPath: /etc/localtime
# curl -H 'Content-type: application/json' -X POST -d '{"message":"${MY_POD_NAME} juno restart"}' http://9.87.159.253/juno/webhook?username=pengluan&sender_type=wechat
# curl -H 'Content-type: application/json' -X POST -d '{"message":"'"${MY_POD_NAME}"' juno restart"}' http://9.87.159.253/juno/webhook?username=pengluan&sender_type=wechat

View File

@ -1,41 +0,0 @@
apiVersion: extensions/v1
kind: Deployment
metadata:
labels:
k8s-app: prometheus-pushgateway
name: prometheus-pushgateway
namespace: monitoring
spec:
replicas: 1
template:
metadata:
labels:
k8s-app: prometheus-pushgateway
component: "prometheus-pushgateway"
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: monitoring
operator: In
values:
- "true"
serviceAccountName: prometheus-pushgateway
containers:
- name: prometheus-pushgateway
image: "prom/pushgateway:v0.5.2"
imagePullPolicy: "IfNotPresent"
args:
ports:
- containerPort: 9091
readinessProbe:
httpGet:
path: /#/status
port: 9091
initialDelaySeconds: 10
timeoutSeconds: 10
resources:
{}

View File

@ -1,20 +0,0 @@
apiVersion: v1
kind: Service
metadata:
annotations:
prometheus.io/probe: pushgateway
labels:
k8s-app: prometheus-pushgateway
name: prometheus-pushgateway
namespace: monitoring
spec:
ports:
- name: http
port: 9091
protocol: TCP
targetPort: 9091
# nodePort: 31014
selector:
k8s-app: prometheus-pushgateway
component: "prometheus-pushgateway"
# type: LoadBalancer # NodePort

View File

@ -1,7 +0,0 @@
apiVersion: v1
kind: ServiceAccount
metadata:
labels:
k8s-app: prometheus-pushgateway
name: prometheus-pushgateway
namespace: monitoring

View File

@ -0,0 +1,238 @@
# pip install webhooks
import sys
import os
dir_common = os.path.split(os.path.realpath(__file__))[0]
sys.path.append(dir_common) # 将根目录添加到系统目录,才能正常引用common文件夹
import logging
# logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
import argparse
from aiohttp import web
import aiohttp
import copy
import asyncio
import base64
import logging
import time,datetime
import json
import requests
from aiohttp.web import middleware
import aiohttp_cors # 支持跨域请求
from util.prometheus_util import *
from util.config import *
import prometheus_client
from prometheus_client import Counter,Gauge
from prometheus_client.core import CollectorRegistry
from prometheus_client import CollectorRegistry, Gauge
import socket
import requests
from webhooks import webhook
from webhooks.senders import targeted
loop = asyncio.get_event_loop() # 获取全局轮训器
hostName = socket.gethostname()
routes = web.RouteTableDef()
promethus=Promethus()
Sender = os.getenv('sender', 'TME_DataInfra')
Receiver = os.getenv('receiver', '')
Sender_type = os.getenv('Sender_type', 'wechat')
def push_message(sender_type,**args):
if sender_type=='wechat':
push_wechat(args['message'],args['sender'],args['receiver'])
if sender_type=='rtx_group':
push_rtx_group(args['message'],args['sender'])
# 微信公共号告警,指向个人推送
def push_wechat(message,sender,receiver):
if not sender or not receiver or not message:
logging.info('no sender %s, or not receiver %s, or no message %s '%(sender,receiver,message))
return
if type(receiver)==str:
receiver = receiver.split(",")
receiver = [str(x).strip() for x in receiver if x]
data = {
"Sender": sender,
"Rcptto": receiver,
"isText": message
}
jsondata = json.dumps(data)
logging.info('begin to send wechat %s' % jsondata)
import urllib
import urllib.parse
import urllib.request
values = urllib.parse.urlencode({"data": jsondata})
resp = urllib.request.urlopen('http://api.weixin.oa.com/itilalarmcgi/sendmsg?%s'%values, timeout=10)
logging.info('reveive resp from wechat: %s'% resp.read().decode("unicode_escape"))
# 企业微信群推送sender为企业微信群的的key,message为字典数据
def push_rtx_group(message,sender):
data = {
"msgtype": "text",
"text": {
"content": message
}
}
url = 'http://in.qyapi.weixin.qq.com/cgi-bin/webhook/send?key=%s'%sender
logging.info('begin to send rtx group %s'%url)
resp = requests.post(url,timeout=10,json=data)
logging.info('reveive resp from rtx: %s'%resp.content)
# 推送数据
@routes.post('/metrics')
async def post_data(request): # 异步监听,只要一有握手就开始触发
try:
data = await request.json() # 等待post数据完成接收只有接收完成才能进行后续操作.data['key']获取参数
except Exception as e:
logging.error("image file too large or cannot convert to json")
return web.json_response(write_response(ERROR_FILE_LARGE,"image file too large or cannot convert to json",{}))
logging.info('receive metrics data %s' % datetime.datetime.now())
status = await promethus.label_data(data['metrics']) # 包含记录信息处理图片存储图片token过期以后要请求license服务器
logging.info('save metrics data finish %s, %s' % (datetime.datetime.now(),str(status)))
header = {"Access-Control-Allow-Origin": "*", 'Access-Control-Allow-Methods': 'GET,POST'}
if status:
return web.json_response(write_response(0,"success",{}),headers=header)
else:
return web.json_response(write_response(1, "error", {}), headers=header)
# 推送数据
@routes.post('/{client}/webhook')
async def client_webhook(request): # 异步监听,只要一有握手就开始触发
logging.info('=================begin a webhook')
global Receiver,Sender,Sender_type
client = request.match_info['client']
try:
data = await request.json() # data可能是dict或者list
args = dict(request.query)
logging.info('src data:%s'%data)
logging.info('src args:%s'%args)
if type(data) == list:
data = data[0]
if args:
for key in args:
data[key] = args[key]
for key in list(data.keys()):
if key.lower() == 'username':
Receiver = data[key]
del data[key]
if key.lower() == 'sender_type':
Sender_type = data[key]
del data[key]
if key.lower() == 'sender':
Sender = data[key]
del data[key]
if 'message' in data:
message = data['message']
else:
message = json.dumps(data, indent=4, ensure_ascii=False).encode('utf8').decode('utf8')
try:
if client=='grafana':
data={
'title': data['title'],
'message':data['message'],
'state':data['state'],
}
message = json.dumps(data,indent=4, ensure_ascii=False).encode('utf8').decode('utf8')
if client=='rancher':
message = json.dumps(data['labels'],indent=4)
if client=='alertmanager':
data_label = copy.deepcopy(data['alerts'][0]['labels'])
if 'job' in data_label: del data_label['job']
if 'service' in data_label: del data_label['service']
if 'prometheus' in data_label: del data_label['prometheus']
if 'endpoint' in data_label: del data_label['endpoint']
if 'pod' in data_label: del data_label['pod']
if 'instance' in data_label: del data_label['instance']
data_push={
'labels':data_label,
# 'annotations':data['alerts'][0]['annotations'],
'status':data['alerts'][0]['status'],
}
message = json.dumps(data_push,indent=4, ensure_ascii=False).encode('utf8').decode('utf8')
if client=='superset':
message = data['message'] # , ensure_ascii=False,indent=4) # .encode('utf8').decode('utf8')
except Exception as e:
logging.error(e)
logging.info('%s %s %s %s'%(Sender_type,Sender,Receiver,message))
push_message(sender_type=Sender_type,message=message+"\n from %s"%client,sender=Sender,receiver=Receiver)
except Exception as e:
logging.info(e)
return web.json_response(write_response(ERROR_FILE_LARGE,"can not access json",{}))
logging.info('finish deal webhook data %s' % datetime.datetime.now())
return web.json_response(write_response(0,"success",{}))
# 推送数据
@routes.post('/junopodstop/customwebhook')
async def client_webhook(request): # 异步监听,只要一有握手就开始触发
logging.info('=================begin a customwebhook')
global Receiver, Sender, Sender_type
client = request.match_info['client']
try:
data = await request.json() # data可能是dict或者list
args = dict(request.query)
logging.info('src data:%s'%data)
logging.info('src args:%s'%args)
username = data['username']
try:
message = data['message']
except Exception as e:
logging.info(e)
logging.info('%s %s %s %s'%(Sender_type,Sender,username,message))
push_message(sender_type=Sender_type,message=message+"\n from %s"%client,sender=Sender,receiver=username)
except Exception as e:
logging.info(e)
return web.json_response(write_response(ERROR_FILE_LARGE,"can not access json",{}))
logging.info('finish deal webhook data %s' % datetime.datetime.now())
return web.json_response(write_response(0,"success",{}))
# 读取数据
@routes.get('/')
async def default(request):
return web.Response(text="OK")
@routes.get('/metrics')
async def get_data(request):
data = await promethus.get_metrics_prometheus()
return web.Response(body=data, content_type="text/plain") # 将计数器的值返回
if __name__ == '__main__':
# init_logger(module_name="face_det") # 初始化日志配置
init_console_logger()
app = web.Application(client_max_size=int(LOCAL_SERVER_SIZE)*1024**2,debug=True) # 创建app设置最大接收图片大小为2M
app.add_routes(routes) # 添加路由映射
# 编写支持跨域的路由
core = aiohttp_cors.setup(app,defaults={
'*':aiohttp_cors.ResourceOptions(
allow_methods='*',
allow_credentials=True,
allow_headers='*',
expose_headers='*'
)
})
for route in list(app.router.routes()):
core.add(route)
logging.info('server start,port is %s, datetime is %s'%(str(LOCAL_SERVER_PORT),str(datetime.datetime.now())))
web.run_app(app,host=LOCAL_SERVER_IP,port=LOCAL_SERVER_PORT) # 启动app
logging.info('server close%s'% datetime.datetime.now())

View File

@ -0,0 +1,18 @@
apiVersion: v1
kind: Service # 类型包含Pod, Deployment、Job、Ingress、Service
metadata: # 包含应用的一些meta信息比如名称、namespace、标签等信息。
name: cloud-pushgateway-service # 名称还能为小写和-
namespace: monitoring
labels:
app: cloud-pushgateway-service
spec:
# type: LoadBalancer # NodePort类型每个 node 上都会监听同一个端口会自动找到pod所在的节点LoadBalancer会为这个服务提供一个对外ipip代理下面的pod。pod可能在不同的机器上
ports:
- port: 80
targetPort: 80 # container端口
protocol: TCP
name: http
selector:
app: cloud-pushgateway-pod
# externalIPs:
# - 9.66.37.181

View File

@ -0,0 +1,8 @@
# bytes_data = open('feature',mode='rb').read()
# list_data= list(bytearray(bytes_data))
# print(len(list_data),max(list_data))
# print(list_data)

View File

@ -0,0 +1,54 @@
# coding=utf-8
import requests
import base64,numpy,cv2
import time
import requests
import time,json
import asyncio
import sys
import requests
import base64,numpy,cv2
import time,datetime,random
if __name__ == "__main__":
# url="http://192.168.11.127:30165/metrics"
url = "http://127.0.0.1:8000/metrics"
print(datetime.datetime.now())
print('==============')
for i in range(1):
data = {
'metrics':{
'request_total': {
'labels': ['method', 'client_ip'],
'describe': 'test',
'exist_not_update_type': 'clear',
'exist_update_type': 'add',
'not_exist_update_type': "update",
'pull_finish_deal_type': "clear",
'data': [
[['get', '192.168.11.127'], random.randint(1, 12)],
[['post', '192.168.11.11'], random.randint(1, 12)],
[['get', '192.168.11.23'], random.randint(1, 12)]
]
}
}
}
begin_time = time.time()
r = requests.post(url,json=data)
result = json.loads(r.text)
print(result)
end_time = time.time()
print(end_time-begin_time)
print('=============')

View File

@ -0,0 +1,108 @@
import logging
import logging.handlers
import os
# 错误码
ERROR_FILE_LARGE=1
LOCAL_SERVER_IP = "0.0.0.0" # 服务器的ip地址
LOCAL_SERVER_PORT = 80 # 服务器的端口号
LOCAL_SERVER_KEY = '123456' # 华为发送来的常数秘钥
LOCAL_SERVER_SIZE = 5 # 图片大小限制 5M
# response响应字典根据arctern_req和result和response生成arctern_reply
def write_response(error, message, result):
response = {
'error': error, # 错误编号成功标号为0
'message': message, # 错误或成功描述。字符串
'result': result # 成功的返回结果,字典格式
}
return response
def init_logger(parent_path='/intellif/log/', sub_path='',module_name=''):
print('init_logger')
"""
usage: output to logfile and console simultaneous
"""
path = parent_path + sub_path
if not os.path.exists(path):
try:
os.makedirs(path)
except Exception as e:
print('please excute with root privilege, makdir error %s' % e)
# create a logger
logger = logging.getLogger(module_name)
logger.setLevel(logging.INFO) # 最低输出等级
# create log file name
if module_name == '':
log_file = path + 'server.log'
else:
log_file = path + module_name + '.log'
try:
# define log format
# formatter = logging.Formatter(
# '%(asctime)s %(filename)s[line:%(lineno)d] ' +
# '%(levelname)s %(message)s', '%Y-%m-%d %H:%M:%S')
formatter = logging.Formatter(
'%(asctime)s %(levelname)s %(message)s',
'%H:%M:%S')
# 定义一个1小时换一次log文件的handler
# filename是输出日志文件名的前缀
# backupCount是保留日志个数。默认的0是不会自动删除掉日志。若设10则在文件的创建过程中库会判断是否有超过这个10若超过则会从最先创建的开始删除
# interval是指等待多少个单位when的时间后Logger会自动重建文件当然这个文件的创建取决于filename + suffix若这个文件跟之前的文件有重名则会自动覆盖掉以前的文件所以有些情况suffix要定义的不能因为when而重复。
hdlr = logging.handlers.TimedRotatingFileHandler(filename=log_file, when='H', interval=1, backupCount=48)
hdlr.setLevel(logging.INFO) # 当前log输出等级
hdlr.setFormatter(formatter)
# 设置后缀名称跟strftime的格式一样
hdlr.suffix = "%Y-%m-%d_%H-%M-%S.log"
logger.addHandler(hdlr)
# create a streamhandler to output to console
# ch = logging.StreamHandler()
# ch.setLevel(logging.INFO) # 当前log输出等级
# ch.setFormatter(formatter)
# logger.addHandler(ch)
# create a filehandler to record log to file
# fh = logging.FileHandler(log_file)
# fh.setLevel(logging.INFO)
# fh.setFormatter(formatter)
# logger.addHandler(fh)
logger.info(logger.name)
except Exception as e:
print(e)
logging.info(
'please execute with root privilege, init logger error %s' % e)
def init_console_logger():
"""
usage: only output to console
"""
# create a logger
print('init_console_logger')
logger = logging.getLogger()
logger.setLevel(logging.INFO) # 最低输出等级
# create a streamhandler to output to console
ch = logging.StreamHandler()
ch.setLevel(logging.INFO) # 当前输出流输出等级
# define log format
# formatter = logging.Formatter(
# '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
# '%Y-%m-%d %H:%M:%S')
formatter = logging.Formatter(
'%(asctime)s %(levelname)s %(message)s',
'%H:%M:%S')
ch.setFormatter(formatter)
# add handler
logger.addHandler(ch)

View File

@ -0,0 +1,187 @@
# coding=utf-8
import base64
import asyncio
import json,datetime,time
import logging
import os
import io
import prometheus_client
from prometheus_client import Counter,Gauge
from prometheus_client.core import CollectorRegistry
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
class Promethus():
def __init__(self):
self.loop = asyncio.get_event_loop() # 获取全局轮训器
self.registry = CollectorRegistry() # 存放所有Metrics的容器以Name-MetricKey-Value形式维护其中的Metric对象。
self.all_metric={}
# 可以包含多个metric,以字典的形式传输
async def label_data(self,json_data):
# print(json_data)
try:
for metric_name in json_data:
metric_post=json_data[metric_name]
print(metric_post)
labels=metric_post.get('labels',[])
describe=metric_post.get('describe','')
# update 覆盖原有值
# clear 删除
# keep 保留原样
# add 值添加
# reset 值设置为0
exist_not_update_type= metric_post.get('exist_not_update_type',None)
exist_update_type = metric_post.get('exist_update_type', None)
not_exist_update_type = metric_post.get('not_exist_update_type', None)
pull_finish_deal_type = metric_post.get('pull_finish_deal_type', None) # 被拉取以后的处理行为
# 创建metric
if metric_name not in self.all_metric:
if labels:
await self.create_metric(metric_name,labels,describe) # labels 是不能变的.只不过每种labels取值时的metric_value 是否要保留是不一定了.
else:
continue
# 更新metric属性
if exist_not_update_type:
self.all_metric[metric_name]['exist_not_update_type']=exist_not_update_type
if exist_update_type:
self.all_metric[metric_name]['exist_update_type']=exist_update_type
if not_exist_update_type:
self.all_metric[metric_name]['not_exist_update_type']=not_exist_update_type
if pull_finish_deal_type:
self.all_metric[metric_name]['pull_finish_deal_type']=pull_finish_deal_type
exist_not_update_type = self.all_metric[metric_name]['exist_not_update_type']
exist_update_type = self.all_metric[metric_name]['exist_update_type']
not_exist_update_type = self.all_metric[metric_name]['not_exist_update_type']
# 对数据做一下变形
data_tuple={}
if 'data' in metric_post:
for one_data in metric_post['data']:
attr,value=one_data
attr=tuple(attr)
data_tuple[attr]=value
# print(data_tuple)
# 按规则更新数据
# 对已存在但是没有更新的数据的处理,默认不变化
if exist_not_update_type=='clear':
for attr in self.all_metric[metric_name]['data']:
if attr not in data_tuple:
del self.all_metric[metric_name]['data']
elif exist_not_update_type=='reset':
for attr in self.all_metric[metric_name]['data']:
if attr not in data_tuple:
self.all_metric[metric_name]['data']=0
# 对已存在,同时也更新的数据进行处理,默认不变化
if exist_update_type == 'update':
for attr in data_tuple:
if attr in self.all_metric[metric_name]['data']:
self.all_metric[metric_name]['data'][attr] = data_tuple[attr]
elif exist_update_type == 'add':
for attr in data_tuple:
if attr in self.all_metric[metric_name]['data']:
self.all_metric[metric_name]['data'][attr] += data_tuple[attr]
elif exist_update_type == 'clear':
for attr in data_tuple:
if attr in self.all_metric[metric_name]['data']:
del self.all_metric[metric_name]['data'][attr]
elif exist_update_type == 'reset':
for attr in data_tuple:
if attr in self.all_metric[metric_name]['data']:
self.all_metric[metric_name]['data'][attr]=0
# 对不存在,同时更新的数据进行处理.默认不变化
if not_exist_update_type == 'reset':
for attr in data_tuple:
if attr not in self.all_metric[metric_name]['data']:
self.all_metric[metric_name]['data'][attr] = 0
elif not_exist_update_type == 'add' or not_exist_update_type == 'update' :
for attr in data_tuple:
if attr not in self.all_metric[metric_name]['data']:
self.all_metric[metric_name]['data'][attr] = data_tuple[attr]
return True
except Exception as e:
print(e)
return False
return False
# 删除matric
async def delete(self,metric_name):
del self.all_metric[metric_name]
# 读取metric的数据
async def get_metric_prometheus(self,metric_name):
if metric_name in self.all_metric[metric_name]:
# 根据data生成prometheus格式数据,
metric = Gauge(metric_name, self.all_metric[metric_name]['describe'], self.all_metric[metric_name]['labels'])
for attr in self.all_metric[metric_name]['data']:
metric.labels(*attr).set(self.all_metric[metric_name]['data'][attr])
prometheus_data = prometheus_client.generate_latest(metric)
# 处理拉取数据后逻辑
if self.all_metric[metric_name]['pull_finish_deal_type']=='clear':
self.all_metric[metric_name]['data']={}
elif self.all_metric[metric_name]['pull_finish_deal_type']=='reset':
for attr in self.all_metric[metric_name]['data']:
self.all_metric[metric_name]['data'][attr]=0
return prometheus_data
return None
# 获取所有的metrics数据
async def get_metrics_prometheus(self,onlyread=False):
self.registry= CollectorRegistry()
for metric_name in self.all_metric:
# Gauge默认存放在全局register中,且全局register中不能存在相同名称的metric
metric = Gauge(name=metric_name, documentation=self.all_metric[metric_name]['describe'],labelnames=self.all_metric[metric_name]['labels'],registry=self.registry)
for attr in self.all_metric[metric_name]['data']:
metric.labels(*attr).set(self.all_metric[metric_name]['data'][attr])
if not onlyread:
# 处理拉取数据后逻辑
if self.all_metric[metric_name]['pull_finish_deal_type']=='clear':
self.all_metric[metric_name]['data']={}
elif self.all_metric[metric_name]['pull_finish_deal_type']=='reset':
for attr in self.all_metric[metric_name]['data']:
self.all_metric[metric_name]['data'][attr]=0
return prometheus_client.generate_latest(self.registry)
#读取metric的信息和数据
async def get_metric(self,metric_name):
return self.all_metric[metric_name]
# 获取所有metric信息
async def get_metrics(self):
return self.all_metric
# labels为可以为该数据打的标签
async def create_metric(self,metric_name,labels,describe=''):
print('create metric %s'%metric_name)
if metric_name in self.all_metric:
del self.all_metric[metric_name]
metric={
'labels':labels,
'describe':describe,
'data':{},
}
self.all_metric[metric_name] = metric