monitor_tools.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # author : liumiaomiao
  4. #从es库中导出数据到测试环境mongo库
  5. from lib.es_tools import esutil
  6. from datetime import datetime, timedelta
  7. from lib.mongo_tools import MongoUtil,Data_save,MongoSentence
  8. from lib.mysql_tools import MysqlUtil
  9. from lib.clickhouse_tools import ClickhouseUtil, logger
  10. # 定义一周的时间范围,转换为Unix时间戳格式
  11. end_date = int(datetime.now().timestamp())
  12. start_date = int((datetime.now() - timedelta(days=7)).timestamp())
  13. class monitorTools:
  14. #标准库bidding-es 每周统计入库数量
  15. def es_bidding(self):
  16. """
  17. es链接
  18. """
  19. db_config = {
  20. # es
  21. 'es_host': '127.0.0.1',
  22. 'es_port': 19800,
  23. 'es_http_auth': ('jianyuGr','we3g8glKfe#'), # 重新申请
  24. 'timeout': 10000,
  25. 'index': "bidding"
  26. }
  27. query = {"query": {"bool": {"must": [{"range": {"comeintime": {"from": f"{start_date}", "to": f"{end_date}"}}}]}}}
  28. # 传入查询语句query 以及配置信息
  29. # es=esutil.get_es(db_config["es_host"], db_config["es_http_auth"], db_config["es_port"],db_config["index"])
  30. counts=esutil.get_es_count(query,**db_config)
  31. count = counts['count']
  32. print("标准库es-bidding每周入库数据量:",count)
  33. return count
  34. # 高质量库bidding-ai-es 每周统计入库数量
  35. def es_bidding_ai(self):
  36. """
  37. es链接
  38. """
  39. db_config = {
  40. # es
  41. 'es_host': '127.0.0.1',
  42. 'es_port': 19800,
  43. 'es_http_auth': ('jianyuGr','we3g8glKfe#'), # 重新申请
  44. 'timeout': 10000,
  45. 'index': "bidding_ai"
  46. }
  47. query = {"query": {"bool": {"must": [{"range": {"comeintime": {"from": f"{start_date}", "to": f"{end_date}"}}}]}}}
  48. # 传入查询语句query 以及配置信息
  49. # es=esutil.get_es(db_config["es_host"], db_config["es_http_auth"], db_config["es_port"],db_config["index"])
  50. counts=esutil.get_es_count(query,**db_config)
  51. count = counts['count']
  52. print("高质量库es-bidding每周入库数据量:",count)
  53. return count
  54. # 标准库bidding-es 碎片化数据每周统计入库数量
  55. def es_bidding_fragment(self):
  56. #正式环境
  57. db_config = {
  58. # es
  59. 'es_host': '127.0.0.1',
  60. 'es_port': 19800,
  61. 'es_http_auth': ('jianyuGr', 'we3g8glKfe#'), # 重新申请
  62. 'timeout': 10000,
  63. 'index': "bidding"
  64. }
  65. # #测试环境http://192.168.3.149:9201
  66. # db_config = {
  67. # # es
  68. # 'es_host': '192.168.3.149',
  69. # 'es_port': 9201,
  70. # # 'es_http_auth': ('jianyuGr', 'we3g8glKfe#'), # 重新申请
  71. # 'timeout': 10000,
  72. # 'index': "bidding"
  73. # }
  74. # 定义要监控的字段值
  75. tags = [
  76. "情报_法务",
  77. "情报_财务审计",
  78. "情报_招标代理",
  79. "情报_管理咨询",
  80. "情报_保险",
  81. "情报_工程设计咨询",
  82. "情报_安防",
  83. "情报_印务商机",
  84. "情报_环境采购",
  85. "情报_家具招投标"
  86. ]
  87. # 初始化字典,将所有标签的计数设置为0
  88. data = {}
  89. for tag in tags:
  90. query = {
  91. "query": {"bool": {"must": [{"range": {"comeintime": {"from": f"{start_date}", "to": f"{end_date}"}}},
  92. {"term": {"tag_topinformation": tag}}]}}}
  93. count = esutil.get_es_count(query, **db_config)
  94. print(f"标准库es-bidding{tag}每周入库数据量:", count['count'])
  95. data[tag]=count['count']
  96. # 检查数据字典以确保所有标签都被更新
  97. print("数据字典内容:", data) # 打印整个数据字典
  98. return data
  99. #拟在建es数据 每周统计入库数量
  100. def es_nzj(self):
  101. """
  102. es链接
  103. """
  104. db_config = {
  105. # es
  106. 'es_host': '127.0.0.1',
  107. 'es_port': 19800,
  108. 'es_http_auth': ('jianyuGr', 'we3g8glKfe#'), # 重新申请
  109. 'timeout': 10000,
  110. 'index': "proposed_v1"
  111. }
  112. query = {
  113. "query": {"match_all": {}}}
  114. # 传入查询语句query 以及配置信息
  115. # es=esutil.get_es(db_config["es_host"], db_config["es_http_auth"], db_config["es_port"],db_config["index"])
  116. counts = esutil.get_es_count(query, **db_config)
  117. count=counts['count']
  118. print("拟在建es入库数据总量:", count)
  119. return count
  120. #标准库bidding-mongo 每周统计入库数量
  121. def bidding(self):
  122. collection = MongoUtil.get_coon(host='127.0.0.1:27088', database='qfw',collection='bidding',authuser='viewdata',authpass='viewdata')
  123. query = { "comeintime": {"$gte": start_date, "$lt": end_date}}
  124. count=MongoSentence.count(collection,query)
  125. print("标准库bidding-mongo 每周统计入库数量",count)
  126. return count
  127. #高质量库bidding-mongo 每周统计入库数量
  128. def bidding_ai(self):
  129. collection = MongoUtil.get_coon(host='127.0.0.1:27088', database='qfw_ai',collection='bidding',authuser='viewdata',authpass='viewdata')
  130. query = { "comeintime": {"$gte": start_date, "$lt": end_date}}
  131. count=MongoSentence.count(collection,query)
  132. print("高质量库bidding-mongo 每周统计入库数量",count)
  133. return count
  134. #标准库bidding-mongo碎片化数据 每周统计入库数量
  135. def bidding_fragment(self):
  136. collection = MongoUtil.get_coon(host='127.0.0.1:27088', database='qfw',collection='bidding',authuser='viewdata',authpass='viewdata')
  137. # 定义要监控的字段值
  138. tags = [
  139. "情报_法务",
  140. "情报_财务审计",
  141. "情报_招标代理",
  142. "情报_管理咨询",
  143. "情报_保险",
  144. "情报_工程设计咨询",
  145. "情报_安防",
  146. "情报_印务商机",
  147. "情报_环境采购",
  148. "情报_家具招投标"
  149. ]
  150. data={}
  151. for tag in tags:
  152. query = {"comeintime": {"$gte": start_date, "$lt": end_date},"tag_topinformation":tag}
  153. count=MongoSentence.count(collection,query)
  154. print(f"标准库bidding-mongo{tag}每周统计入库数量",count)
  155. data[tag]=count
  156. return data
  157. #拟在建baseinfo-mysql 每周统计入库数量
  158. def nzj(self):
  159. # MySQL 数据库连接配置
  160. # mysql_db_config = {
  161. # 'host': '192.168.3.149',
  162. # 'port': 4000,
  163. # 'user': 'datagroup',
  164. # 'password': 'Dgrpdb#2024@36',
  165. # 'database': 'jianyu_subjectdb',
  166. # 'charset': 'utf8mb4'
  167. # }
  168. now = datetime.now()
  169. end_date = now.strftime("%Y-%m-%d %H:%M:%S")
  170. start_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
  171. # SQL 查询
  172. mysql_query = "SELECT COUNT(*) FROM dwd_f_nzj_baseinfo WHERE createtime >= %s AND createtime <= %s"
  173. params = (start_date, end_date)
  174. conn=MysqlUtil.connect_to_mysql(host='192.168.3.149',port='4000',user='datagroup',password='Dgrpdb#2024@36',database='jianyu_subjectdb')
  175. count=MysqlUtil.execute_sql(conn,mysql_query,params)
  176. print("拟在建baseinfo-mysql每周统计入库数量", count)
  177. return count
  178. #人脉数据,每周统计入库数量
  179. def connections(self):
  180. try:
  181. query = f"SELECT COUNT(*) FROM information.transaction_info_all WHERE create_time >={start_date} AND create_time <={end_date}"
  182. # conn=ClickhouseUtil.connect_to_clickhouse(host='192.168.3.207',port='19000',user='jytop',password='pwdTopJy123',database='information')
  183. conn=ClickhouseUtil.connect_to_clickhouse(host='127.0.0.1',port='9000',user='biservice',password='Bi_top95215#',database='information')
  184. count=ClickhouseUtil.execute_sql(conn,query)
  185. result=count[0][0]
  186. print("人脉数据每周统计入库数量", result)
  187. return result
  188. except Exception as e:
  189. logger.error("An error occurred: %s", e)
  190. raise
  191. #统计结果入库
  192. def save_to_mongo(self,title,count):
  193. collection=Data_save.save_con(host='192.168.3.149',port=27180,database='data_quality',collection='statistics')
  194. now = datetime.now()
  195. timestamp = int(now.timestamp())
  196. document = {
  197. title: {
  198. "timestamp": timestamp,
  199. "count": count
  200. }
  201. }
  202. Data_save.insert_one(collection,document)
  203. monitor=monitorTools()