monitor_tools_online.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # author : liumiaomiao
  4. #从es库中导出数据到测试环境mongo库
  5. from lib.es_tools import esutil
  6. from datetime import datetime, timedelta
  7. from lib.mongo_tools import MongoUtil,Data_save,MongoSentence
  8. from lib.mysql_tools import MysqlUtil
  9. from lib.clickhouse_tools import ClickhouseUtil, logger
  10. class monitorTools:
  11. # 定义一周的时间范围,转换为Unix时间戳格式
  12. end_date = int(datetime.now().timestamp())
  13. start_date = int((datetime.now() - timedelta(days=7)).timestamp())
  14. print(f"开始时间:{start_date}--结束时间{end_date}")
  15. #标准库bidding-es 每周统计入库数量
  16. def es_bidding(self):
  17. """
  18. es链接
  19. """
  20. db_config = {
  21. # es
  22. 'es_host': '172.17.4.184',
  23. 'es_port': 19908,
  24. 'es_http_auth': ('qyfw_es_2','Khfdals33#'), # 重新申请
  25. 'timeout': 10000,
  26. 'index': "bidding"
  27. }
  28. query = {"query": {"bool": {"must": [{"range": {"comeintime": {"from": f"{self.start_date}", "to": f"{self.end_date}"}}}]}}}
  29. # 传入查询语句query 以及配置信息
  30. # es=esutil.get_es(db_config["es_host"], db_config["es_http_auth"], db_config["es_port"],db_config["index"])
  31. counts=esutil.get_es_count(query,**db_config)
  32. count = counts['count']
  33. print("标准库es-bidding每周入库数据量:",count)
  34. return count
  35. # 标准库bidding-es 碎片化数据每周统计入库数量
  36. def es_bidding_fragment(self):
  37. #正式环境
  38. db_config = {
  39. # es
  40. 'es_host': '172.17.4.184',
  41. 'es_port': 19908,
  42. 'es_http_auth': ('qyfw_es_2', 'Khfdals33#'), # 重新申请
  43. 'timeout': 10000,
  44. 'index': "bidding"
  45. }
  46. # #测试环境http://192.168.3.149:9201
  47. # db_config = {
  48. # # es
  49. # 'es_host': '192.168.3.149',
  50. # 'es_port': 9201,
  51. # # 'es_http_auth': ('jianyuGr', 'we3g8glKfe#'), # 重新申请
  52. # 'timeout': 10000,
  53. # 'index': "bidding"
  54. # }
  55. # 定义要监控的字段值
  56. tags = [
  57. "情报_法务",
  58. "情报_财务审计",
  59. "情报_招标代理",
  60. "情报_管理咨询",
  61. "情报_保险",
  62. "情报_工程设计咨询",
  63. "情报_安防",
  64. "情报_印务商机",
  65. "情报_环境采购",
  66. "情报_家具招投标"
  67. ]
  68. # 初始化字典,将所有标签的计数设置为0
  69. data = {}
  70. for tag in tags:
  71. query = {
  72. "query": {"bool": {"must": [{"range": {"comeintime": {"from": f"{self.start_date}", "to": f"{self.end_date}"}}},
  73. {"term": {"tag_topinformation": tag}}]}}}
  74. count = esutil.get_es_count(query, **db_config)
  75. print(f"标准库es-bidding{tag}每周入库数据量:", count['count'])
  76. data[tag]=count['count']
  77. # 检查数据字典以确保所有标签都被更新
  78. print("数据字典内容:", data) # 打印整个数据字典
  79. return data
  80. #拟在建es数据 每周统计入库数量
  81. def es_nzj(self):
  82. """
  83. es链接
  84. """
  85. db_config = {
  86. # es
  87. 'es_host': '172.17.4.184',
  88. 'es_port': 19908,
  89. 'es_http_auth': ('qyfw_es_2', 'Khfdals33#'), # 重新申请
  90. 'timeout': 10000,
  91. 'index': "proposed_v1"
  92. }
  93. query = {
  94. "query": {"match_all": {}}}
  95. # 传入查询语句query 以及配置信息
  96. # es=esutil.get_es(db_config["es_host"], db_config["es_http_auth"], db_config["es_port"],db_config["index"])
  97. counts = esutil.get_es_count(query, **db_config)
  98. count=counts['count']
  99. print("拟在建es入库数据总量:", count)
  100. return count
  101. #医械通es,每周统计入库数量
  102. def medical_es(self):
  103. """
  104. es链接
  105. """
  106. db_config = {
  107. # es
  108. 'es_host': '172.17.4.184',
  109. 'es_port': 19908,
  110. 'es_http_auth': ('qyfw_es_2', 'Khfdals33#'), # 重新申请
  111. 'timeout': 10000,
  112. 'index': "bidding"
  113. }
  114. query = {
  115. "query": {"bool": {"must": [{"range": {"comeintime": {"from": f"{self.start_date}", "to": f"{self.end_date}"}}},{"term": {"bid_field": "0101"}}]}}}
  116. # 传入查询语句query 以及配置信息
  117. # es=esutil.get_es(db_config["es_host"], db_config["es_http_auth"], db_config["es_port"],db_config["index"])
  118. counts = esutil.get_es_count(query, **db_config)
  119. count = counts['count']
  120. print("医械通es每周入库数据量:", count)
  121. return count
  122. #标准库bidding-mongo 每周统计入库数量
  123. def bidding(self):
  124. collection = MongoUtil.get_coon(host='172.31.31.202:27081', database='qfw',collection='bidding',authuser='dataFx',authpass='data@fenxi')
  125. query = { "comeintime": {"$gte": self.start_date, "$lt": self.end_date}}
  126. count=MongoSentence.count(collection,query)
  127. print("标准库bidding-mongo 每周统计入库数量",count)
  128. return count
  129. #标准库bidding-mongo碎片化数据 每周统计入库数量
  130. def bidding_fragment(self):
  131. collection = MongoUtil.get_coon(host='172.31.31.202:27081', database='qfw',collection='bidding',authuser='dataFx',authpass='data@fenxi')
  132. # 定义要监控的字段值
  133. tags = [
  134. "情报_法务",
  135. "情报_财务审计",
  136. "情报_招标代理",
  137. "情报_管理咨询",
  138. "情报_保险",
  139. "情报_工程设计咨询",
  140. "情报_安防",
  141. "情报_印务商机",
  142. "情报_环境采购",
  143. "情报_家具招投标"
  144. ]
  145. # tags = [
  146. # "情报_环境采购",
  147. # "情报_家具招投标"
  148. # ]
  149. data={}
  150. for tag in tags:
  151. query = {"comeintime": {"$gte": self.start_date, "$lt": self.end_date},"tag_topinformation":tag}
  152. count=MongoSentence.count(collection,query)
  153. print(f"标准库bidding-mongo{tag}每周统计入库数量",count)
  154. data[tag]=count
  155. return data
  156. #拟在建baseinfo-mysql 每周统计入库数量
  157. def nzj(self):
  158. # MySQL 数据库连接配置
  159. # mysql_db_config = {
  160. # 'host': '192.168.3.149',
  161. # 'port': 4000,
  162. # 'user': 'datagroup',
  163. # 'password': 'Dgrpdb#2024@36',
  164. # 'database': 'jianyu_subjectdb',
  165. # 'charset': 'utf8mb4'
  166. # }
  167. now = datetime.now()
  168. end_date = now.strftime("%Y-%m-%d %H:%M:%S")
  169. start_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
  170. # SQL 查询
  171. mysql_query = "SELECT COUNT(*) FROM jianyu_subjectdb.dwd_f_nzj_baseinfo WHERE createtime >= %s AND createtime <= %s"
  172. params = (start_date, end_date)
  173. conn=MysqlUtil.connect_to_mysql(host='172.17.162.27',port='14000',user='jydev',password='JSuytest#s211',database='jianyu_subjectdb')
  174. count=MysqlUtil.execute_sql(conn,mysql_query,params)
  175. print("拟在建baseinfo-mysql每周统计入库数量", count)
  176. return count
  177. #人脉数据,每周统计入库数量
  178. def connections(self):
  179. client = None
  180. try:
  181. query = f"SELECT COUNT(*) FROM information.transaction_info_all WHERE create_time >={self.start_date} AND create_time <={self.end_date}"
  182. # conn=ClickhouseUtil.connect_to_clickhouse(host='192.168.3.207',port='19000',user='jytop',password='pwdTopJy123',database='information')
  183. client=ClickhouseUtil.connect_to_clickhouse(host='cc-2ze9tv451wov14w9e.clickhouse.ads.aliyuncs.com',port=9000,user='jydev',password='ClTe0331kho2025',database='information')
  184. count=ClickhouseUtil.execute_sql(client,query)
  185. result=count[0][0]
  186. print("人脉数据每周统计入库数量", result)
  187. return result
  188. except Exception as e:
  189. logger.error("An error occurred: %s", e)
  190. raise
  191. finally:
  192. if client:
  193. client.disconnect() # 释放连接
  194. #医械通,每周统计入库数量
  195. def medical(self):
  196. collection = MongoUtil.get_coon(host='172.31.31.202:27081', database='qfw',collection='bidding',authuser='dataFx',authpass='data@fenxi')
  197. query = {"comeintime": {"$gte": self.start_date, "$lt": self.end_date},"bid_field":"0101"}
  198. count = MongoSentence.count(collection, query)
  199. print("医械通每周统计入库数量", count)
  200. return count
  201. #统计结果入库
  202. def save_to_mongo(self,title,count):
  203. collection=Data_save.save_con(host='172.20.45.129',port=27002,database='data_quality',collection='statistics')
  204. now = datetime.now()
  205. timestamp = int(now.timestamp())
  206. document = {
  207. title: {
  208. "timestamp": timestamp,
  209. "count": count
  210. }
  211. }
  212. Data_save.insert_one(collection,document)
  213. monitor=monitorTools()