#!/usr/bin/env python # -*- coding:utf-8 -*- # author : liumiaomiao #从es库中导出数据到测试环境mongo库 from lib.es_tools import esutil from datetime import datetime, timedelta from lib.mongo_tools import MongoUtil,Data_save,MongoSentence from lib.mysql_tools import MysqlUtil from lib.clickhouse_tools import ClickhouseUtil, logger # 定义一周的时间范围,转换为Unix时间戳格式 end_date = int(datetime.now().timestamp()) start_date = int((datetime.now() - timedelta(days=7)).timestamp()) print(f"开始时间:{start_date}--结束时间{end_date}") class monitorTools: #标准库bidding-es 每周统计入库数量 def es_bidding(self): """ es链接 """ db_config = { # es 'es_host': '127.0.0.1', 'es_port': 19800, 'es_http_auth': ('jianyuGr','we3g8glKfe#'), # 重新申请 'timeout': 10000, 'index': "bidding" } query = {"query": {"bool": {"must": [{"range": {"comeintime": {"from": f"{start_date}", "to": f"{end_date}"}}}]}}} # 传入查询语句query 以及配置信息 # es=esutil.get_es(db_config["es_host"], db_config["es_http_auth"], db_config["es_port"],db_config["index"]) counts=esutil.get_es_count(query,**db_config) count = counts['count'] print("标准库es-bidding每周入库数据量:",count) return count # 高质量库bidding-ai-es 每周统计入库数量 def es_bidding_ai(self): """ es链接 """ db_config = { # es 'es_host': '127.0.0.1', 'es_port': 19800, 'es_http_auth': ('jianyuGr','we3g8glKfe#'), # 重新申请 'timeout': 10000, 'index': "bidding_ai" } query = {"query": {"bool": {"must": [{"range": {"comeintime": {"from": f"{start_date}", "to": f"{end_date}"}}}]}}} # 传入查询语句query 以及配置信息 # es=esutil.get_es(db_config["es_host"], db_config["es_http_auth"], db_config["es_port"],db_config["index"]) counts=esutil.get_es_count(query,**db_config) count = counts['count'] print("高质量库es-bidding每周入库数据量:",count) return count # 标准库bidding-es 碎片化数据每周统计入库数量 def es_bidding_fragment(self): #正式环境 db_config = { # es 'es_host': '127.0.0.1', 'es_port': 19800, 'es_http_auth': ('jianyuGr', 'we3g8glKfe#'), # 重新申请 'timeout': 10000, 'index': "bidding" } # #测试环境http://192.168.3.149:9201 # db_config = { # # es # 'es_host': '192.168.3.149', # 'es_port': 9201, # # 'es_http_auth': ('jianyuGr', 'we3g8glKfe#'), # 重新申请 # 'timeout': 10000, # 'index': "bidding" # } # 定义要监控的字段值 tags = [ "情报_法务", "情报_财务审计", "情报_招标代理", "情报_管理咨询", "情报_保险", "情报_工程设计咨询", "情报_安防", "情报_印务商机", "情报_环境采购", "情报_家具招投标" ] # 初始化字典,将所有标签的计数设置为0 data = {} for tag in tags: query = { "query": {"bool": {"must": [{"range": {"comeintime": {"from": f"{start_date}", "to": f"{end_date}"}}}, {"term": {"tag_topinformation": tag}}]}}} count = esutil.get_es_count(query, **db_config) print(f"标准库es-bidding{tag}每周入库数据量:", count['count']) data[tag]=count['count'] # 检查数据字典以确保所有标签都被更新 print("数据字典内容:", data) # 打印整个数据字典 return data #拟在建es数据 每周统计入库数量 def es_nzj(self): """ es链接 """ db_config = { # es 'es_host': '127.0.0.1', 'es_port': 19800, 'es_http_auth': ('jianyuGr', 'we3g8glKfe#'), # 重新申请 'timeout': 10000, 'index': "proposed_v1" } query = { "query": {"match_all": {}}} # 传入查询语句query 以及配置信息 # es=esutil.get_es(db_config["es_host"], db_config["es_http_auth"], db_config["es_port"],db_config["index"]) counts = esutil.get_es_count(query, **db_config) count=counts['count'] print("拟在建es入库数据总量:", count) return count #标准库bidding-mongo 每周统计入库数量 def bidding(self): collection = MongoUtil.get_coon(host='127.0.0.1:27088', database='qfw',collection='bidding',authuser='viewdata',authpass='viewdata') query = { "comeintime": {"$gte": start_date, "$lt": end_date}} count=MongoSentence.count(collection,query) print("标准库bidding-mongo 每周统计入库数量",count) return count #高质量库bidding-mongo 每周统计入库数量 def bidding_ai(self): collection = MongoUtil.get_coon(host='127.0.0.1:27088', database='qfw_ai',collection='bidding',authuser='viewdata',authpass='viewdata') query = { "comeintime": {"$gte": start_date, "$lt": end_date}} count=MongoSentence.count(collection,query) print("高质量库bidding-mongo 每周统计入库数量",count) return count #标准库bidding-mongo碎片化数据 每周统计入库数量 def bidding_fragment(self): collection = MongoUtil.get_coon(host='127.0.0.1:27088', database='qfw',collection='bidding',authuser='viewdata',authpass='viewdata') # 定义要监控的字段值 tags = [ "情报_法务", "情报_财务审计", "情报_招标代理", "情报_管理咨询", "情报_保险", "情报_工程设计咨询", "情报_安防", "情报_印务商机", "情报_环境采购", "情报_家具招投标" ] data={} for tag in tags: query = {"comeintime": {"$gte": start_date, "$lt": end_date},"tag_topinformation":tag} count=MongoSentence.count(collection,query) print(f"标准库bidding-mongo{tag}每周统计入库数量",count) data[tag]=count return data #拟在建baseinfo-mysql 每周统计入库数量 def nzj(self): # MySQL 数据库连接配置 # mysql_db_config = { # 'host': '192.168.3.149', # 'port': 4000, # 'user': 'datagroup', # 'password': 'Dgrpdb#2024@36', # 'database': 'jianyu_subjectdb', # 'charset': 'utf8mb4' # } now = datetime.now() end_date = now.strftime("%Y-%m-%d %H:%M:%S") start_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S") # SQL 查询 mysql_query = "SELECT COUNT(*) FROM jianyu_subjectdb.dwd_f_nzj_baseinfo WHERE createtime >= %s AND createtime <= %s" params = (start_date, end_date) conn=MysqlUtil.connect_to_mysql(host='192.168.3.149',port='4000',user='datagroup',password='Dgrpdb#2024@36',database='jianyu_subjectdb') count=MysqlUtil.execute_sql(conn,mysql_query,params) print("拟在建baseinfo-mysql每周统计入库数量", count) return count #人脉数据,每周统计入库数量 def connections(self): try: query = f"SELECT COUNT(*) FROM information.transaction_info_all WHERE create_time >={start_date} AND create_time <={end_date}" # conn=ClickhouseUtil.connect_to_clickhouse(host='192.168.3.207',port='19000',user='jytop',password='pwdTopJy123',database='information') conn=ClickhouseUtil.connect_to_clickhouse(host='127.0.0.1',port='9000',user='biservice',password='Bi_top95215#',database='information') count=ClickhouseUtil.execute_sql(conn,query) result=count[0][0] print("人脉数据每周统计入库数量", result) return result except Exception as e: logger.error("An error occurred: %s", e) raise #统计结果入库 def save_to_mongo(self,title,count): collection=Data_save.save_con(host='192.168.3.149',port=27180,database='data_quality',collection='statistics') now = datetime.now() timestamp = int(now.timestamp()) document = { title: { "timestamp": timestamp, "count": count } } Data_save.insert_one(collection,document) monitor=monitorTools()