#!/usr/bin/env python # -*- coding:utf-8 -*- # author : liumiaomiao #从es库中导出数据到测试环境mongo库 from lib.es_tools import esutil from datetime import datetime, timedelta from lib.mongo_tools import Data_save # 定义一周的时间范围,转换为Unix时间戳格式 end_date = int(datetime.now().timestamp()) start_date = int((datetime.now() - timedelta(days=7)).timestamp()) class EScount: #标准库bidding-es 每周统计入库数量 def es_bidding(self): """ es链接 """ db_config = { # es 'es_host': '127.0.0.1', 'es_port': 19800, 'es_http_auth': ('jianyuGr','we3g8glKfe#'), # 重新申请 'timeout': 10000, 'index': "bidding" } query = {"query": {"bool": {"must": [{"range": {"comeintime": {"from": f"{start_date}", "to": f"{end_date}"}}}]}}} # 传入查询语句query 以及配置信息 # es=esutil.get_es(db_config["es_host"], db_config["es_http_auth"], db_config["es_port"],db_config["index"]) counts=esutil.get_es_count(query,**db_config) count = counts['count'] print("标准库es-bidding每周入库数据量:",count) return count # 高质量库bidding-ai-es 每周统计入库数量 def es_bidding_ai(self): """ es链接 """ db_config = { # es 'es_host': '127.0.0.1', 'es_port': 19800, 'es_http_auth': ('jianyuGr','we3g8glKfe#'), # 重新申请 'timeout': 10000, 'index': "bidding_ai" } query = {"query": {"bool": {"must": [{"range": {"comeintime": {"from": f"{start_date}", "to": f"{end_date}"}}}]}}} # 传入查询语句query 以及配置信息 # es=esutil.get_es(db_config["es_host"], db_config["es_http_auth"], db_config["es_port"],db_config["index"]) counts=esutil.get_es_count(query,**db_config) count = counts['count'] print("高质量库es-bidding每周入库数据量:",count) return count # 标准库bidding-es 碎片化数据每周统计入库数量 def es_bidding_fragment(self): #正式环境 db_config = { # es 'es_host': '127.0.0.1', 'es_port': 19800, 'es_http_auth': ('jianyuGr', 'we3g8glKfe#'), # 重新申请 'timeout': 10000, 'index': "bidding" } # #测试环境http://192.168.3.149:9201 # db_config = { # # es # 'es_host': '192.168.3.149', # 'es_port': 9201, # # 'es_http_auth': ('jianyuGr', 'we3g8glKfe#'), # 重新申请 # 'timeout': 10000, # 'index': "bidding" # } # 定义要监控的字段值 tags = [ "情报_法务", "情报_财务审计", "情报_招标代理", "情报_管理咨询", "情报_保险", "情报_工程设计咨询", "情报_安防", "情报_印务商机", "情报_环境采购", "情报_家具招投标" ] # 初始化字典,将所有标签的计数设置为0 data = {} for tag in tags: query = { "query": {"bool": {"must": [{"range": {"comeintime": {"from": f"{start_date}", "to": f"{end_date}"}}}, {"term": {"tag_topinformation": tag}}]}}} count = esutil.get_es_count(query, **db_config) print(f"标准库es-bidding{tag}每周入库数据量:", count['count']) data[tag]=count['count'] # 检查数据字典以确保所有标签都被更新 print("数据字典内容:", data) # 打印整个数据字典 return data #拟在建es数据 每周统计入库数量 def es_nzj(self): """ es链接 """ db_config = { # es 'es_host': '127.0.0.1', 'es_port': 19800, 'es_http_auth': ('jianyuGr', 'we3g8glKfe#'), # 重新申请 'timeout': 10000, 'index': "proposed_v1" } query = { "query": {"match_all": {}}} # 传入查询语句query 以及配置信息 # es=esutil.get_es(db_config["es_host"], db_config["es_http_auth"], db_config["es_port"],db_config["index"]) counts = esutil.get_es_count(query, **db_config) count=counts['count'] print("拟在建es入库数据总量:", count) return count def save_to_mongo(self,title,count): collection=Data_save.save_con(host='192.168.3.149',port=27180,database='data_quality',collection='statistics') now = datetime.now() timestamp = int(now.timestamp()) document = { title: { "timestamp": timestamp, "count": count } } Data_save.insert_one(collection,document) escount=EScount()