metrics.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539
  1. import concurrent.futures
  2. import json
  3. import os
  4. import queue
  5. import random
  6. import socket
  7. import threading
  8. import time
  9. from collections import Counter
  10. from typing import Any
  11. from influxdb import InfluxDBClient
  12. from feapder import setting
  13. from feapder.utils.log import log
  14. from feapder.utils.tools import aio_wrap, ensure_float, ensure_int
  15. _inited_pid = None
  16. # this thread should stop running in the forked process
  17. _executor = concurrent.futures.ThreadPoolExecutor(
  18. max_workers=1, thread_name_prefix="metrics"
  19. )
  20. class MetricsEmitter:
  21. def __init__(
  22. self,
  23. influxdb,
  24. *,
  25. batch_size=10,
  26. max_timer_seq=0,
  27. emit_interval=10,
  28. retention_policy=None,
  29. ratio=1.0,
  30. debug=False,
  31. add_hostname=False,
  32. max_points=10240,
  33. default_tags=None,
  34. time_precision="s",
  35. ):
  36. """
  37. Args:
  38. influxdb: influxdb instance
  39. batch_size: 打点的批次大小
  40. max_timer_seq: 每个时间间隔内最多收集多少个 timer 类型点, 0 表示不限制
  41. emit_interval: 最多等待多长时间必须打点
  42. retention_policy: 对应的 retention policy
  43. ratio: store 和 timer 类型采样率,比如 0.1 表示只有 10% 的点会留下
  44. debug: 是否打印调试日志
  45. add_hostname: 是否添加 hostname 作为 tag
  46. max_points: 本地 buffer 最多累计多少个点
  47. time_precision: 打点精度 默认 s
  48. """
  49. self.pending_points = queue.Queue()
  50. self.batch_size = batch_size
  51. self.influxdb: InfluxDBClient = influxdb
  52. self.tagkv = {}
  53. self.max_timer_seq = max_timer_seq
  54. self.lock = threading.Lock()
  55. self.hostname = socket.gethostname()
  56. self.last_emit_ts = time.time() # 上次提交时间
  57. self.emit_interval = emit_interval # 提交间隔
  58. self.max_points = max_points
  59. self.retention_policy = retention_policy # 支持自定义保留策略
  60. self.debug = debug
  61. self.add_hostname = add_hostname
  62. self.ratio = ratio
  63. self.default_tags = default_tags or {}
  64. self.time_precision = time_precision
  65. def define_tagkv(self, tagk, tagvs):
  66. self.tagkv[tagk] = set(tagvs)
  67. def _point_tagset(self, p):
  68. return f"{p['measurement']}-{sorted(p['tags'].items())}-{p['time']}"
  69. def _accumulate_points(self, points):
  70. """
  71. 对于处于同一个 key 的点做聚合
  72. - 对于 counter 类型,同一个 key 的值(_count)可以累加
  73. - 对于 store 类型,不做任何操作,influxdb 会自行覆盖
  74. - 对于 timer 类型,通过添加一个 _seq 值来区分每个不同的点
  75. """
  76. counters = {} # 临时保留 counter 类型的值
  77. timer_seqs = Counter() # 记录不同 key 的 timer 序列号
  78. new_points = []
  79. for point in points:
  80. point_type = point["tags"].get("_type", None)
  81. tagset = self._point_tagset(point)
  82. # counter 类型全部聚合,不做丢弃
  83. if point_type == "counter":
  84. if tagset not in counters:
  85. counters[tagset] = point
  86. else:
  87. counters[tagset]["fields"]["_count"] += point["fields"]["_count"]
  88. elif point_type == "timer":
  89. if self.max_timer_seq and timer_seqs[tagset] > self.max_timer_seq:
  90. continue
  91. # 掷一把骰子,如果足够幸运才打点
  92. if self.ratio < 1.0 and random.random() > self.ratio:
  93. continue
  94. # 增加 _seq tag,以便区分不同的点
  95. point["tags"]["_seq"] = timer_seqs[tagset]
  96. timer_seqs[tagset] += 1
  97. new_points.append(point)
  98. else:
  99. if self.ratio < 1.0 and random.random() > self.ratio:
  100. continue
  101. new_points.append(point)
  102. # 把累加得到的 counter 值添加进来
  103. new_points.extend(counters.values())
  104. return new_points
  105. def _get_ready_emit(self, force=False):
  106. """
  107. 把当前 pending 的值做聚合并返回
  108. """
  109. if self.debug:
  110. log.info("got %s raw points", self.pending_points.qsize())
  111. # 从 pending 中读取点, 设定一个最大值,避免一直打点,一直获取
  112. points = []
  113. while len(points) < self.max_points or force:
  114. try:
  115. points.append(self.pending_points.get_nowait())
  116. except queue.Empty:
  117. break
  118. # 聚合点
  119. points = self._accumulate_points(points)
  120. if self.debug:
  121. log.info("got %s point", len(points))
  122. log.info(json.dumps(points, indent=4))
  123. return points
  124. def emit(self, point=None, force=False):
  125. """
  126. 1. 添加新点到 pending
  127. 2. 如果符合条件,尝试聚合并打点
  128. 3. 更新打点时间
  129. :param point:
  130. :param force: 强制提交所有点 默认False
  131. :return:
  132. """
  133. if point:
  134. self.pending_points.put(point)
  135. # 判断是否需要提交点 1、数量 2、间隔 3、强力打点
  136. if not (
  137. force
  138. or self.pending_points.qsize() >= self.max_points # noqa: W503
  139. or time.time() - self.last_emit_ts > self.emit_interval # noqa: W503
  140. ):
  141. return
  142. # 需要打点,读取可以打点的值, 确保只有一个线程在做点的压缩
  143. with self.lock:
  144. points = self._get_ready_emit(force=force)
  145. if not points:
  146. return
  147. try:
  148. self.influxdb.write_points(
  149. points,
  150. batch_size=self.batch_size,
  151. time_precision=self.time_precision,
  152. retention_policy=self.retention_policy,
  153. )
  154. except Exception:
  155. log.exception("error writing points")
  156. self.last_emit_ts = time.time()
  157. def flush(self):
  158. if self.debug:
  159. log.info("start draining points %s", self.pending_points.qsize())
  160. self.emit(force=True)
  161. def close(self):
  162. self.flush()
  163. try:
  164. self.influxdb.close()
  165. except Exception as e:
  166. log.exception(e)
  167. def make_point(self, measurement, tags: dict, fields: dict, timestamp=None):
  168. """
  169. 默认的时间戳是"秒"级别的
  170. """
  171. assert measurement, "measurement can't be null"
  172. tags = tags.copy() if tags else {}
  173. tags.update(self.default_tags)
  174. fields = fields.copy() if fields else {}
  175. if timestamp is None:
  176. timestamp = int(time.time())
  177. # 支持自定义hostname
  178. if self.add_hostname and "hostname" not in tags:
  179. tags["hostname"] = self.hostname
  180. point = dict(measurement=measurement, tags=tags, fields=fields, time=timestamp)
  181. if self.tagkv:
  182. for tagk, tagv in tags.items():
  183. if tagv not in self.tagkv[tagk]:
  184. raise ValueError("tag value = %s not in %s", tagv, self.tagkv[tagk])
  185. return point
  186. def get_counter_point(
  187. self,
  188. measurement: str,
  189. key: str = None,
  190. count: int = 1,
  191. tags: dict = None,
  192. timestamp: int = None,
  193. ):
  194. """
  195. counter 不能被覆盖
  196. """
  197. tags = tags.copy() if tags else {}
  198. if key is not None:
  199. tags["_key"] = key
  200. tags["_type"] = "counter"
  201. count = ensure_int(count)
  202. fields = dict(_count=count)
  203. point = self.make_point(measurement, tags, fields, timestamp=timestamp)
  204. return point
  205. def get_store_point(
  206. self,
  207. measurement: str,
  208. key: str = None,
  209. value: Any = 0,
  210. tags: dict = None,
  211. timestamp=None,
  212. ):
  213. tags = tags.copy() if tags else {}
  214. if key is not None:
  215. tags["_key"] = key
  216. tags["_type"] = "store"
  217. fields = dict(_value=value)
  218. point = self.make_point(measurement, tags, fields, timestamp=timestamp)
  219. return point
  220. def get_timer_point(
  221. self,
  222. measurement: str,
  223. key: str = None,
  224. duration: float = 0,
  225. tags: dict = None,
  226. timestamp=None,
  227. ):
  228. tags = tags.copy() if tags else {}
  229. if key is not None:
  230. tags["_key"] = key
  231. tags["_type"] = "timer"
  232. fields = dict(_duration=ensure_float(duration))
  233. point = self.make_point(measurement, tags, fields, timestamp=timestamp)
  234. return point
  235. def emit_any(self, *args, **kwargs):
  236. point = self.make_point(*args, **kwargs)
  237. self.emit(point)
  238. def emit_counter(self, *args, **kwargs):
  239. point = self.get_counter_point(*args, **kwargs)
  240. self.emit(point)
  241. def emit_store(self, *args, **kwargs):
  242. point = self.get_store_point(*args, **kwargs)
  243. self.emit(point)
  244. def emit_timer(self, *args, **kwargs):
  245. point = self.get_timer_point(*args, **kwargs)
  246. self.emit(point)
  247. _emitter: MetricsEmitter = None
  248. _measurement: str = None
  249. def init(
  250. *,
  251. influxdb_host=None,
  252. influxdb_port=None,
  253. influxdb_udp_port=None,
  254. influxdb_database=None,
  255. influxdb_user=None,
  256. influxdb_password=None,
  257. influxdb_measurement=None,
  258. retention_policy=None,
  259. retention_policy_duration="180d",
  260. emit_interval=60,
  261. batch_size=10,
  262. debug=False,
  263. use_udp=False,
  264. timeout=10,
  265. time_precision="s",
  266. **kwargs,
  267. ):
  268. """
  269. 打点监控初始化
  270. Args:
  271. influxdb_host:
  272. influxdb_port:
  273. influxdb_udp_port:
  274. influxdb_database:
  275. influxdb_user:
  276. influxdb_password:
  277. influxdb_measurement: 存储的表,也可以在打点的时候指定
  278. retention_policy: 保留策略
  279. retention_policy_duration: 保留策略过期时间
  280. emit_interval: 打点最大间隔
  281. batch_size: 打点的批次大小
  282. debug: 是否开启调试
  283. use_udp: 是否使用udp协议打点
  284. timeout: 与influxdb建立连接时的超时时间
  285. time_precision: 打点精度 默认秒
  286. **kwargs: 可传递MetricsEmitter类的参数
  287. Returns:
  288. """
  289. global _inited_pid, _emitter, _measurement
  290. if _inited_pid == os.getpid():
  291. return
  292. influxdb_host = influxdb_host or setting.INFLUXDB_HOST
  293. influxdb_port = influxdb_port or setting.INFLUXDB_PORT
  294. influxdb_udp_port = influxdb_udp_port or setting.INFLUXDB_UDP_PORT
  295. influxdb_database = influxdb_database or setting.INFLUXDB_DATABASE
  296. influxdb_user = influxdb_user or setting.INFLUXDB_USER
  297. influxdb_password = influxdb_password or setting.INFLUXDB_PASSWORD
  298. _measurement = influxdb_measurement or setting.INFLUXDB_MEASUREMENT
  299. retention_policy = (
  300. retention_policy or f"{influxdb_database}_{retention_policy_duration}"
  301. )
  302. if not all(
  303. [
  304. influxdb_host,
  305. influxdb_port,
  306. influxdb_udp_port,
  307. influxdb_database,
  308. influxdb_user,
  309. influxdb_password,
  310. ]
  311. ):
  312. return
  313. influxdb_client = InfluxDBClient(
  314. host=influxdb_host,
  315. port=influxdb_port,
  316. udp_port=influxdb_udp_port,
  317. database=influxdb_database,
  318. use_udp=use_udp,
  319. timeout=timeout,
  320. username=influxdb_user,
  321. password=influxdb_password,
  322. )
  323. # 创建数据库
  324. if influxdb_database:
  325. try:
  326. influxdb_client.create_database(influxdb_database)
  327. influxdb_client.create_retention_policy(
  328. retention_policy,
  329. retention_policy_duration,
  330. replication="1",
  331. default=True,
  332. )
  333. except Exception as e:
  334. log.error("metrics init falied: {}".format(e))
  335. return
  336. _emitter = MetricsEmitter(
  337. influxdb_client,
  338. debug=debug,
  339. batch_size=batch_size,
  340. time_precision=time_precision,
  341. retention_policy=retention_policy,
  342. emit_interval=emit_interval,
  343. **kwargs,
  344. )
  345. _inited_pid = os.getpid()
  346. log.info("metrics init successfully")
  347. def emit_any(
  348. tags: dict,
  349. fields: dict,
  350. *,
  351. classify: str = "",
  352. measurement: str = None,
  353. timestamp=None,
  354. ):
  355. """
  356. 原生的打点,不进行额外的处理
  357. Args:
  358. tags: influxdb的tag的字段和值
  359. fields: influxdb的field的字段和值
  360. classify: 点的类别
  361. measurement: 存储的表
  362. timestamp: 点的时间搓,默认为当前时间
  363. Returns:
  364. """
  365. if not _emitter:
  366. return
  367. tags = tags or {}
  368. tags["_classify"] = classify
  369. measurement = measurement or _measurement
  370. _emitter.emit_any(measurement, tags, fields, timestamp)
  371. def emit_counter(
  372. key: str = None,
  373. count: int = 1,
  374. *,
  375. classify: str = "",
  376. tags: dict = None,
  377. measurement: str = None,
  378. timestamp: int = None,
  379. ):
  380. """
  381. 聚合打点,即会将一段时间内的点求和,然后打一个点数和
  382. Args:
  383. key: 与点绑定的key值
  384. count: 点数
  385. classify: 点的类别
  386. tags: influxdb的tag的字段和值
  387. measurement: 存储的表
  388. timestamp: 点的时间搓,默认为当前时间
  389. Returns:
  390. """
  391. if not _emitter:
  392. return
  393. tags = tags or {}
  394. tags["_classify"] = classify
  395. measurement = measurement or _measurement
  396. _emitter.emit_counter(measurement, key, count, tags, timestamp)
  397. def emit_timer(
  398. key: str = None,
  399. duration: float = 0,
  400. *,
  401. classify: str = "",
  402. tags: dict = None,
  403. measurement: str = None,
  404. timestamp=None,
  405. ):
  406. """
  407. 时间打点,用于监控程序的运行时长等,每个duration一个点,不会被覆盖
  408. Args:
  409. key: 与点绑定的key值
  410. duration: 时长
  411. classify: 点的类别
  412. tags: influxdb的tag的字段和值
  413. measurement: 存储的表
  414. timestamp: 点的时间搓,默认为当前时间
  415. Returns:
  416. """
  417. if not _emitter:
  418. return
  419. tags = tags or {}
  420. tags["_classify"] = classify
  421. measurement = measurement or _measurement
  422. _emitter.emit_timer(measurement, key, duration, tags, timestamp)
  423. def emit_store(
  424. key: str = None,
  425. value: Any = 0,
  426. *,
  427. classify: str = "",
  428. tags: dict = None,
  429. measurement: str,
  430. timestamp=None,
  431. ):
  432. """
  433. 直接打点,不进行额外的处理
  434. Args:
  435. key: 与点绑定的key值
  436. value: 点的值
  437. classify: 点的类别
  438. tags: influxdb的tag的字段和值
  439. measurement: 存储的表
  440. timestamp: 点的时间搓,默认为当前时间
  441. Returns:
  442. """
  443. if not _emitter:
  444. return
  445. tags = tags or {}
  446. tags["_classify"] = classify
  447. measurement = measurement or _measurement
  448. _emitter.emit_store(measurement, key, value, tags, timestamp)
  449. def flush():
  450. """
  451. 强刷点到influxdb
  452. Returns:
  453. """
  454. if not _emitter:
  455. return
  456. _emitter.flush()
  457. def close():
  458. """
  459. 关闭
  460. Returns:
  461. """
  462. if not _emitter:
  463. return
  464. _emitter.close()
  465. # 协程打点
  466. aemit_counter = aio_wrap(executor=_executor)(emit_counter)
  467. aemit_store = aio_wrap(executor=_executor)(emit_store)
  468. aemit_timer = aio_wrap(executor=_executor)(emit_timer)