123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # author : liumiaomiao
- #从es库中导出数据到测试环境mongo库
- from lib.es_tools import esutil
- from datetime import datetime, timedelta
- from lib.mongo_tools import MongoUtil,Data_save,MongoSentence
- from lib.mysql_tools import MysqlUtil
- from lib.clickhouse_tools import ClickhouseUtil, logger
- # 定义一周的时间范围,转换为Unix时间戳格式
- end_date = int(datetime.now().timestamp())
- start_date = int((datetime.now() - timedelta(days=7)).timestamp())
- print(f"开始时间:{start_date}--结束时间{end_date}")
- class monitorTools:
- #标准库bidding-es 每周统计入库数量
- def es_bidding(self):
- """
- es链接
- """
- db_config = {
- # es
- 'es_host': '127.0.0.1',
- 'es_port': 19800,
- 'es_http_auth': ('jianyuGr','we3g8glKfe#'), # 重新申请
- 'timeout': 10000,
- 'index': "bidding"
- }
- query = {"query": {"bool": {"must": [{"range": {"comeintime": {"from": f"{start_date}", "to": f"{end_date}"}}}]}}}
- # 传入查询语句query 以及配置信息
- # es=esutil.get_es(db_config["es_host"], db_config["es_http_auth"], db_config["es_port"],db_config["index"])
- counts=esutil.get_es_count(query,**db_config)
- count = counts['count']
- print("标准库es-bidding每周入库数据量:",count)
- return count
- # 高质量库bidding-ai-es 每周统计入库数量
- def es_bidding_ai(self):
- """
- es链接
- """
- db_config = {
- # es
- 'es_host': '127.0.0.1',
- 'es_port': 19800,
- 'es_http_auth': ('jianyuGr','we3g8glKfe#'), # 重新申请
- 'timeout': 10000,
- 'index': "bidding_ai"
- }
- query = {"query": {"bool": {"must": [{"range": {"comeintime": {"from": f"{start_date}", "to": f"{end_date}"}}}]}}}
- # 传入查询语句query 以及配置信息
- # es=esutil.get_es(db_config["es_host"], db_config["es_http_auth"], db_config["es_port"],db_config["index"])
- counts=esutil.get_es_count(query,**db_config)
- count = counts['count']
- print("高质量库es-bidding每周入库数据量:",count)
- return count
- # 标准库bidding-es 碎片化数据每周统计入库数量
- def es_bidding_fragment(self):
- #正式环境
- db_config = {
- # es
- 'es_host': '127.0.0.1',
- 'es_port': 19800,
- 'es_http_auth': ('jianyuGr', 'we3g8glKfe#'), # 重新申请
- 'timeout': 10000,
- 'index': "bidding"
- }
- # #测试环境http://192.168.3.149:9201
- # db_config = {
- # # es
- # 'es_host': '192.168.3.149',
- # 'es_port': 9201,
- # # 'es_http_auth': ('jianyuGr', 'we3g8glKfe#'), # 重新申请
- # 'timeout': 10000,
- # 'index': "bidding"
- # }
- # 定义要监控的字段值
- tags = [
- "情报_法务",
- "情报_财务审计",
- "情报_招标代理",
- "情报_管理咨询",
- "情报_保险",
- "情报_工程设计咨询",
- "情报_安防",
- "情报_印务商机",
- "情报_环境采购",
- "情报_家具招投标"
- ]
- # 初始化字典,将所有标签的计数设置为0
- data = {}
- for tag in tags:
- query = {
- "query": {"bool": {"must": [{"range": {"comeintime": {"from": f"{start_date}", "to": f"{end_date}"}}},
- {"term": {"tag_topinformation": tag}}]}}}
- count = esutil.get_es_count(query, **db_config)
- print(f"标准库es-bidding{tag}每周入库数据量:", count['count'])
- data[tag]=count['count']
- # 检查数据字典以确保所有标签都被更新
- print("数据字典内容:", data) # 打印整个数据字典
- return data
- #拟在建es数据 每周统计入库数量
- def es_nzj(self):
- """
- es链接
- """
- db_config = {
- # es
- 'es_host': '127.0.0.1',
- 'es_port': 19800,
- 'es_http_auth': ('jianyuGr', 'we3g8glKfe#'), # 重新申请
- 'timeout': 10000,
- 'index': "proposed_v1"
- }
- query = {
- "query": {"match_all": {}}}
- # 传入查询语句query 以及配置信息
- # es=esutil.get_es(db_config["es_host"], db_config["es_http_auth"], db_config["es_port"],db_config["index"])
- counts = esutil.get_es_count(query, **db_config)
- count=counts['count']
- print("拟在建es入库数据总量:", count)
- return count
- #医械通es,每周统计入库数量
- def medical_es(self):
- """
- es链接
- """
- db_config = {
- # es
- 'es_host': '127.0.0.1',
- 'es_port': 19800,
- 'es_http_auth': ('jianyuGr', 'we3g8glKfe#'), # 重新申请
- 'timeout': 10000,
- 'index': "bidding"
- }
- query = {
- "query": {"bool": {"must": [{"range": {"comeintime": {"from": f"{start_date}", "to": f"{end_date}"}}},{"term": {"bid_field": "0101"}}]}}}
- # 传入查询语句query 以及配置信息
- # es=esutil.get_es(db_config["es_host"], db_config["es_http_auth"], db_config["es_port"],db_config["index"])
- counts = esutil.get_es_count(query, **db_config)
- count = counts['count']
- print("医械通es每周入库数据量:", count)
- return count
- #标准库bidding-mongo 每周统计入库数量
- def bidding(self):
- collection = MongoUtil.get_coon(host='127.0.0.1:27088', database='qfw',collection='bidding',authuser='viewdata',authpass='viewdata')
- query = { "comeintime": {"$gte": start_date, "$lt": end_date}}
- count=MongoSentence.count(collection,query)
- print("标准库bidding-mongo 每周统计入库数量",count)
- return count
- #高质量库bidding-mongo 每周统计入库数量
- def bidding_ai(self):
- collection = MongoUtil.get_coon(host='127.0.0.1:27088', database='qfw_ai',collection='bidding',authuser='viewdata',authpass='viewdata')
- query = { "comeintime": {"$gte": start_date, "$lt": end_date}}
- count=MongoSentence.count(collection,query)
- print("高质量库bidding-mongo 每周统计入库数量",count)
- return count
- #标准库bidding-mongo碎片化数据 每周统计入库数量
- def bidding_fragment(self):
- collection = MongoUtil.get_coon(host='127.0.0.1:27088', database='qfw',collection='bidding',authuser='viewdata',authpass='viewdata')
- # 定义要监控的字段值
- tags = [
- "情报_法务",
- "情报_财务审计",
- "情报_招标代理",
- "情报_管理咨询",
- "情报_保险",
- "情报_工程设计咨询",
- "情报_安防",
- "情报_印务商机",
- "情报_环境采购",
- "情报_家具招投标"
- ]
- # tags = [
- # "情报_环境采购",
- # "情报_家具招投标"
- # ]
- data={}
- for tag in tags:
- query = {"comeintime": {"$gte": start_date, "$lt": end_date},"tag_topinformation":tag}
- count=MongoSentence.count(collection,query)
- print(f"标准库bidding-mongo{tag}每周统计入库数量",count)
- data[tag]=count
- return data
- #拟在建baseinfo-mysql 每周统计入库数量
- def nzj(self):
- # MySQL 数据库连接配置
- # mysql_db_config = {
- # 'host': '192.168.3.149',
- # 'port': 4000,
- # 'user': 'datagroup',
- # 'password': 'Dgrpdb#2024@36',
- # 'database': 'jianyu_subjectdb',
- # 'charset': 'utf8mb4'
- # }
- now = datetime.now()
- end_date = now.strftime("%Y-%m-%d %H:%M:%S")
- start_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
- # SQL 查询
- mysql_query = "SELECT COUNT(*) FROM jianyu_subjectdb.dwd_f_nzj_baseinfo WHERE createtime >= %s AND createtime <= %s"
- params = (start_date, end_date)
- conn=MysqlUtil.connect_to_mysql(host='127.0.0.1',port='4000',user='jydev',password='JSuytest#s211',database='jianyu_subjectdb')
- count=MysqlUtil.execute_sql(conn,mysql_query,params)
- print("拟在建baseinfo-mysql每周统计入库数量", count)
- return count
- #人脉数据,每周统计入库数量
- def connections(self):
- try:
- query = f"SELECT COUNT(*) FROM information.transaction_info_all WHERE create_time >={start_date} AND create_time <={end_date}"
- # conn=ClickhouseUtil.connect_to_clickhouse(host='192.168.3.207',port='19000',user='jytop',password='pwdTopJy123',database='information')
- conn=ClickhouseUtil.connect_to_clickhouse(host='127.0.0.1',port='9000',user='biservice',password='Bi_top95215#',database='information')
- count=ClickhouseUtil.execute_sql(conn,query)
- result=count[0][0]
- print("人脉数据每周统计入库数量", result)
- return result
- except Exception as e:
- logger.error("An error occurred: %s", e)
- raise
- #医械通,每周统计入库数量
- def medical(self):
- collection = MongoUtil.get_coon(host='127.0.0.1:27088', database='qfw', collection='bidding',
- authuser='viewdata', authpass='viewdata')
- query = {"comeintime": {"$gte": start_date, "$lt": end_date},"bid_field":"0101"}
- count = MongoSentence.count(collection, query)
- print("医械通每周统计入库数量", count)
- return count
- #统计结果入库
- def save_to_mongo(self,title,count):
- collection=Data_save.save_con(host='172.20.45.129',port=27002,database='data_quality',collection='statistics')
- now = datetime.now()
- timestamp = int(now.timestamp())
- document = {
- title: {
- "timestamp": timestamp,
- "count": count
- }
- }
- Data_save.insert_one(collection,document)
- monitor=monitorTools()
|