123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539 |
- import concurrent.futures
- import json
- import os
- import queue
- import random
- import socket
- import threading
- import time
- from collections import Counter
- from typing import Any
- from influxdb import InfluxDBClient
- from feapder import setting
- from feapder.utils.log import log
- from feapder.utils.tools import aio_wrap, ensure_float, ensure_int
- _inited_pid = None
- # this thread should stop running in the forked process
- _executor = concurrent.futures.ThreadPoolExecutor(
- max_workers=1, thread_name_prefix="metrics"
- )
- class MetricsEmitter:
- def __init__(
- self,
- influxdb,
- *,
- batch_size=10,
- max_timer_seq=0,
- emit_interval=10,
- retention_policy=None,
- ratio=1.0,
- debug=False,
- add_hostname=False,
- max_points=10240,
- default_tags=None,
- time_precision="s",
- ):
- """
- Args:
- influxdb: influxdb instance
- batch_size: 打点的批次大小
- max_timer_seq: 每个时间间隔内最多收集多少个 timer 类型点, 0 表示不限制
- emit_interval: 最多等待多长时间必须打点
- retention_policy: 对应的 retention policy
- ratio: store 和 timer 类型采样率,比如 0.1 表示只有 10% 的点会留下
- debug: 是否打印调试日志
- add_hostname: 是否添加 hostname 作为 tag
- max_points: 本地 buffer 最多累计多少个点
- time_precision: 打点精度 默认 s
- """
- self.pending_points = queue.Queue()
- self.batch_size = batch_size
- self.influxdb: InfluxDBClient = influxdb
- self.tagkv = {}
- self.max_timer_seq = max_timer_seq
- self.lock = threading.Lock()
- self.hostname = socket.gethostname()
- self.last_emit_ts = time.time() # 上次提交时间
- self.emit_interval = emit_interval # 提交间隔
- self.max_points = max_points
- self.retention_policy = retention_policy # 支持自定义保留策略
- self.debug = debug
- self.add_hostname = add_hostname
- self.ratio = ratio
- self.default_tags = default_tags or {}
- self.time_precision = time_precision
- def define_tagkv(self, tagk, tagvs):
- self.tagkv[tagk] = set(tagvs)
- def _point_tagset(self, p):
- return f"{p['measurement']}-{sorted(p['tags'].items())}-{p['time']}"
- def _accumulate_points(self, points):
- """
- 对于处于同一个 key 的点做聚合
- - 对于 counter 类型,同一个 key 的值(_count)可以累加
- - 对于 store 类型,不做任何操作,influxdb 会自行覆盖
- - 对于 timer 类型,通过添加一个 _seq 值来区分每个不同的点
- """
- counters = {} # 临时保留 counter 类型的值
- timer_seqs = Counter() # 记录不同 key 的 timer 序列号
- new_points = []
- for point in points:
- point_type = point["tags"].get("_type", None)
- tagset = self._point_tagset(point)
- # counter 类型全部聚合,不做丢弃
- if point_type == "counter":
- if tagset not in counters:
- counters[tagset] = point
- else:
- counters[tagset]["fields"]["_count"] += point["fields"]["_count"]
- elif point_type == "timer":
- if self.max_timer_seq and timer_seqs[tagset] > self.max_timer_seq:
- continue
- # 掷一把骰子,如果足够幸运才打点
- if self.ratio < 1.0 and random.random() > self.ratio:
- continue
- # 增加 _seq tag,以便区分不同的点
- point["tags"]["_seq"] = timer_seqs[tagset]
- timer_seqs[tagset] += 1
- new_points.append(point)
- else:
- if self.ratio < 1.0 and random.random() > self.ratio:
- continue
- new_points.append(point)
- # 把累加得到的 counter 值添加进来
- new_points.extend(counters.values())
- return new_points
- def _get_ready_emit(self, force=False):
- """
- 把当前 pending 的值做聚合并返回
- """
- if self.debug:
- log.info("got %s raw points", self.pending_points.qsize())
- # 从 pending 中读取点, 设定一个最大值,避免一直打点,一直获取
- points = []
- while len(points) < self.max_points or force:
- try:
- points.append(self.pending_points.get_nowait())
- except queue.Empty:
- break
- # 聚合点
- points = self._accumulate_points(points)
- if self.debug:
- log.info("got %s point", len(points))
- log.info(json.dumps(points, indent=4))
- return points
- def emit(self, point=None, force=False):
- """
- 1. 添加新点到 pending
- 2. 如果符合条件,尝试聚合并打点
- 3. 更新打点时间
- :param point:
- :param force: 强制提交所有点 默认False
- :return:
- """
- if point:
- self.pending_points.put(point)
- # 判断是否需要提交点 1、数量 2、间隔 3、强力打点
- if not (
- force
- or self.pending_points.qsize() >= self.max_points # noqa: W503
- or time.time() - self.last_emit_ts > self.emit_interval # noqa: W503
- ):
- return
- # 需要打点,读取可以打点的值, 确保只有一个线程在做点的压缩
- with self.lock:
- points = self._get_ready_emit(force=force)
- if not points:
- return
- try:
- self.influxdb.write_points(
- points,
- batch_size=self.batch_size,
- time_precision=self.time_precision,
- retention_policy=self.retention_policy,
- )
- except Exception:
- log.exception("error writing points")
- self.last_emit_ts = time.time()
- def flush(self):
- if self.debug:
- log.info("start draining points %s", self.pending_points.qsize())
- self.emit(force=True)
- def close(self):
- self.flush()
- try:
- self.influxdb.close()
- except Exception as e:
- log.exception(e)
- def make_point(self, measurement, tags: dict, fields: dict, timestamp=None):
- """
- 默认的时间戳是"秒"级别的
- """
- assert measurement, "measurement can't be null"
- tags = tags.copy() if tags else {}
- tags.update(self.default_tags)
- fields = fields.copy() if fields else {}
- if timestamp is None:
- timestamp = int(time.time())
- # 支持自定义hostname
- if self.add_hostname and "hostname" not in tags:
- tags["hostname"] = self.hostname
- point = dict(measurement=measurement, tags=tags, fields=fields, time=timestamp)
- if self.tagkv:
- for tagk, tagv in tags.items():
- if tagv not in self.tagkv[tagk]:
- raise ValueError("tag value = %s not in %s", tagv, self.tagkv[tagk])
- return point
- def get_counter_point(
- self,
- measurement: str,
- key: str = None,
- count: int = 1,
- tags: dict = None,
- timestamp: int = None,
- ):
- """
- counter 不能被覆盖
- """
- tags = tags.copy() if tags else {}
- if key is not None:
- tags["_key"] = key
- tags["_type"] = "counter"
- count = ensure_int(count)
- fields = dict(_count=count)
- point = self.make_point(measurement, tags, fields, timestamp=timestamp)
- return point
- def get_store_point(
- self,
- measurement: str,
- key: str = None,
- value: Any = 0,
- tags: dict = None,
- timestamp=None,
- ):
- tags = tags.copy() if tags else {}
- if key is not None:
- tags["_key"] = key
- tags["_type"] = "store"
- fields = dict(_value=value)
- point = self.make_point(measurement, tags, fields, timestamp=timestamp)
- return point
- def get_timer_point(
- self,
- measurement: str,
- key: str = None,
- duration: float = 0,
- tags: dict = None,
- timestamp=None,
- ):
- tags = tags.copy() if tags else {}
- if key is not None:
- tags["_key"] = key
- tags["_type"] = "timer"
- fields = dict(_duration=ensure_float(duration))
- point = self.make_point(measurement, tags, fields, timestamp=timestamp)
- return point
- def emit_any(self, *args, **kwargs):
- point = self.make_point(*args, **kwargs)
- self.emit(point)
- def emit_counter(self, *args, **kwargs):
- point = self.get_counter_point(*args, **kwargs)
- self.emit(point)
- def emit_store(self, *args, **kwargs):
- point = self.get_store_point(*args, **kwargs)
- self.emit(point)
- def emit_timer(self, *args, **kwargs):
- point = self.get_timer_point(*args, **kwargs)
- self.emit(point)
- _emitter: MetricsEmitter = None
- _measurement: str = None
- def init(
- *,
- influxdb_host=None,
- influxdb_port=None,
- influxdb_udp_port=None,
- influxdb_database=None,
- influxdb_user=None,
- influxdb_password=None,
- influxdb_measurement=None,
- retention_policy=None,
- retention_policy_duration="180d",
- emit_interval=60,
- batch_size=10,
- debug=False,
- use_udp=False,
- timeout=10,
- time_precision="s",
- **kwargs,
- ):
- """
- 打点监控初始化
- Args:
- influxdb_host:
- influxdb_port:
- influxdb_udp_port:
- influxdb_database:
- influxdb_user:
- influxdb_password:
- influxdb_measurement: 存储的表,也可以在打点的时候指定
- retention_policy: 保留策略
- retention_policy_duration: 保留策略过期时间
- emit_interval: 打点最大间隔
- batch_size: 打点的批次大小
- debug: 是否开启调试
- use_udp: 是否使用udp协议打点
- timeout: 与influxdb建立连接时的超时时间
- time_precision: 打点精度 默认秒
- **kwargs: 可传递MetricsEmitter类的参数
- Returns:
- """
- global _inited_pid, _emitter, _measurement
- if _inited_pid == os.getpid():
- return
- influxdb_host = influxdb_host or setting.INFLUXDB_HOST
- influxdb_port = influxdb_port or setting.INFLUXDB_PORT
- influxdb_udp_port = influxdb_udp_port or setting.INFLUXDB_UDP_PORT
- influxdb_database = influxdb_database or setting.INFLUXDB_DATABASE
- influxdb_user = influxdb_user or setting.INFLUXDB_USER
- influxdb_password = influxdb_password or setting.INFLUXDB_PASSWORD
- _measurement = influxdb_measurement or setting.INFLUXDB_MEASUREMENT
- retention_policy = (
- retention_policy or f"{influxdb_database}_{retention_policy_duration}"
- )
- if not all(
- [
- influxdb_host,
- influxdb_port,
- influxdb_udp_port,
- influxdb_database,
- influxdb_user,
- influxdb_password,
- ]
- ):
- return
- influxdb_client = InfluxDBClient(
- host=influxdb_host,
- port=influxdb_port,
- udp_port=influxdb_udp_port,
- database=influxdb_database,
- use_udp=use_udp,
- timeout=timeout,
- username=influxdb_user,
- password=influxdb_password,
- )
- # 创建数据库
- if influxdb_database:
- try:
- influxdb_client.create_database(influxdb_database)
- influxdb_client.create_retention_policy(
- retention_policy,
- retention_policy_duration,
- replication="1",
- default=True,
- )
- except Exception as e:
- log.error("metrics init falied: {}".format(e))
- return
- _emitter = MetricsEmitter(
- influxdb_client,
- debug=debug,
- batch_size=batch_size,
- time_precision=time_precision,
- retention_policy=retention_policy,
- emit_interval=emit_interval,
- **kwargs,
- )
- _inited_pid = os.getpid()
- log.info("metrics init successfully")
- def emit_any(
- tags: dict,
- fields: dict,
- *,
- classify: str = "",
- measurement: str = None,
- timestamp=None,
- ):
- """
- 原生的打点,不进行额外的处理
- Args:
- tags: influxdb的tag的字段和值
- fields: influxdb的field的字段和值
- classify: 点的类别
- measurement: 存储的表
- timestamp: 点的时间搓,默认为当前时间
- Returns:
- """
- if not _emitter:
- return
- tags = tags or {}
- tags["_classify"] = classify
- measurement = measurement or _measurement
- _emitter.emit_any(measurement, tags, fields, timestamp)
- def emit_counter(
- key: str = None,
- count: int = 1,
- *,
- classify: str = "",
- tags: dict = None,
- measurement: str = None,
- timestamp: int = None,
- ):
- """
- 聚合打点,即会将一段时间内的点求和,然后打一个点数和
- Args:
- key: 与点绑定的key值
- count: 点数
- classify: 点的类别
- tags: influxdb的tag的字段和值
- measurement: 存储的表
- timestamp: 点的时间搓,默认为当前时间
- Returns:
- """
- if not _emitter:
- return
- tags = tags or {}
- tags["_classify"] = classify
- measurement = measurement or _measurement
- _emitter.emit_counter(measurement, key, count, tags, timestamp)
- def emit_timer(
- key: str = None,
- duration: float = 0,
- *,
- classify: str = "",
- tags: dict = None,
- measurement: str = None,
- timestamp=None,
- ):
- """
- 时间打点,用于监控程序的运行时长等,每个duration一个点,不会被覆盖
- Args:
- key: 与点绑定的key值
- duration: 时长
- classify: 点的类别
- tags: influxdb的tag的字段和值
- measurement: 存储的表
- timestamp: 点的时间搓,默认为当前时间
- Returns:
- """
- if not _emitter:
- return
- tags = tags or {}
- tags["_classify"] = classify
- measurement = measurement or _measurement
- _emitter.emit_timer(measurement, key, duration, tags, timestamp)
- def emit_store(
- key: str = None,
- value: Any = 0,
- *,
- classify: str = "",
- tags: dict = None,
- measurement: str,
- timestamp=None,
- ):
- """
- 直接打点,不进行额外的处理
- Args:
- key: 与点绑定的key值
- value: 点的值
- classify: 点的类别
- tags: influxdb的tag的字段和值
- measurement: 存储的表
- timestamp: 点的时间搓,默认为当前时间
- Returns:
- """
- if not _emitter:
- return
- tags = tags or {}
- tags["_classify"] = classify
- measurement = measurement or _measurement
- _emitter.emit_store(measurement, key, value, tags, timestamp)
- def flush():
- """
- 强刷点到influxdb
- Returns:
- """
- if not _emitter:
- return
- _emitter.flush()
- def close():
- """
- 关闭
- Returns:
- """
- if not _emitter:
- return
- _emitter.close()
- # 协程打点
- aemit_counter = aio_wrap(executor=_executor)(emit_counter)
- aemit_store = aio_wrap(executor=_executor)(emit_store)
- aemit_timer = aio_wrap(executor=_executor)(emit_timer)
|