# coding:utf-8 import time from a2s.tools import json_serialize, json_deserialize from a2s.a2s_client import a2s_execute from docs.config import ReluMongodb from util.mogodb_helper import MongoDBInterface from pymongo import MongoClient from util.mysql_tool import MysqlUtil import json from datetime import datetime, timedelta from elasticsearch import Elasticsearch #直接给es数据打分,并将结果存入mongo库,不在拉取数据打分 ReluClient = MongoDBInterface(ReluMongodb) # 评估服务配置 a2s_ip = "192.168.3.240:9090" # a2s_ip = "172.17.0.11:9090" topic = "quality_bid" #本地测试用的主题 # topic = "test_quality_bid" timeout = 180 # 获取当前时间 now = datetime.now() # current_datetime = int(now.timestamp()) # 计算昨天的时间 yesterday = now - timedelta(days=1) # 获取昨天早上8点的时间 yesterday_8_am = datetime(yesterday.year, yesterday.month, yesterday.day, 8, 0, 0) # 转换为时间戳(秒级) current_datetime = int(yesterday_8_am.timestamp()) # 时间段 start_date = int(datetime(2025, 1, 23, 8, 0, 0).timestamp()) # 2025-01-20 00:00:00 end_date = int(datetime(2025, 1, 23, 12, 0, 0).timestamp()) # 2025-01-20 23:59:59 # ES 连接配置 es_host = "http://127.0.0.1:19800" es_username = "jianyuGr" es_password = "we3g8glKfe#" # 初始化 Elasticsearch 客户端 es_client = Elasticsearch(es_host,http_auth=(es_username, es_password),retry_on_timeout=True) # 使用基本认证 # 开始评估 def start_quality(data: dict, rules_id: int, a2s_ip, topic, timeout, retry=3): # 本次不使用SSL,所以channel是不安全的 row = {"data": data, "rules_id": rules_id} bytes_data = json_serialize(row) for t in range(retry): print("topic",topic) try: resp_data = a2s_execute(a2s_ip, topic, timeout, bytes_data) if resp_data is None: continue result = json_deserialize(resp_data) return result except Exception as e: print(e) return {} # 获取规则ID def get_rule(company, version): rule_id = ReluClient.find_rule_by_company(ReluMongodb["col"], company, version) return rule_id def insert_batch_data_mongo(collection,params): """ 执行批量插入数据到 MongoDB """ # 将参数转换为字典列表 documents=["mongoid","toptype","subtype","site","spidercode","channel","comeintime", "area","city","district","error_type","spider_modified_time","spider_important","site_important","create_time"] doc={} for indx,param in enumerate(params): doc[documents[indx]] =param print(f"{doc}数据") # 插入数据 try: collection.insert_one(doc) # `ordered=False` 忽略重复数据错误 print(f"{doc}数据已成功插入到 MongoDB") except Exception as e: print("插入数据时发生错误:", e) def get_last_processed_id(): """ 获取上次处理的最大 ID (例如从数据库或文件中读取) """ # 这里假设从文件读取中断 ID,你也可以从数据库或 Redis 等存储获取 try: with open('docs/last_processed_id.txt', 'r') as f: last_id = f.read().strip() if last_id: return last_id else: return None except FileNotFoundError: return None def save_last_processed_id(last_id): """ 保存当前处理的最大 ID,用于恢复 """ with open('docs/last_processed_id.txt', 'w') as f: f.write(str(last_id)) def batch_load_data(): """ 批量数据质量检查 """ # 规则查询,根据必要条件 公司名称(用户ID)、版本号 rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.2") print(rules_id) # 初始化爬虫代码库 collection_spider = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["lua_logs_auditor"] # 初始化爬虫config代码库 collection_config = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["luaconfig"] # 初始化site代码库 collection_site = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["site"] #初始化mongo库 conn = MongoClient('192.168.3.149', 27180, unicode_decode_error_handler="ignore").data_quality coll_user = conn["bid_analysis"] # 获取上次处理的 ID,如果没有,则从头开始 last_processed_id = get_last_processed_id() print(f"上次处理的 ID: {last_processed_id}") # 获取ES数据 es_query = { "query": { "bool": { "filter": [ { "range": { "comeintime": { "gte": start_date, "lt": end_date } } } ] } }, "sort": [ {"_id": {"order": "asc"}} # 如果 comeintime 相同,再按 _id 排序 ], "size": 100 # 每次返回的数据量 } # 如果有上次处理的 ID,使用 `search_after` 进行分页 if last_processed_id: es_query["search_after"] = [last_processed_id] # 确保传入的是字符串类型的 _id try: # 使用 scroll API 来分批获取数据 response = es_client.search(index="bidding", body=es_query, size=100) hits = response['hits']['hits'] while hits: print(f"---- 批次开始 ----") max_id = None for hit in hits: item = hit["_source"] print("------一条数据开始--------") max_id = hit["_id"] print(f"正在处理数据: {max_id}") item["_id"] = str(hit["_id"]) # 质量检查逻辑 result = start_quality(item, rules_id, a2s_ip, topic, timeout) print(result) code = result.get("code") if code != 200: # 数据出错,跳过 continue data = result.get("data", {}) # 数据插入到 mongo toptype = item.get("toptype", "") subtype = item.get("subtype", "") site = item.get("site", "") spidercode = item.get("spidercode", "") channel = item.get("channel", "") comeintime = item.get("comeintime", "") area = item.get("area", "") city = item.get("city", "") district = item.get("district", "") spider_modified_time = current_datetime spider_important = False site_important = 0 create_time = current_datetime info = collection_spider.find_one({"code": spidercode}) if info: spider_modified_time = info.get("modifytime", "") info_config=collection_config.find_one({"code": spidercode}) if info_config: spider_important = info_config.get("spiderimportant","") info_site = collection_site.find_one({"site": site}) if info_site: site_important = info_site.get("important","") params = (item["_id"], toptype, subtype, site, spidercode,channel, comeintime, area, city, district, data,spider_modified_time,spider_important,site_important,create_time) insert_batch_data_mongo(coll_user, params) print("------一条数据结束------") # 保存当前批次处理的最大 ID if max_id: save_last_processed_id(max_id) print(f"保存当前处理的最大 ID: {max_id}") # 批次结束的打印信息 print("---- 当前批次数据处理完成 ----") # 获取下一批数据 search_after = hits[-1]["_id"] # 获取当前批次最后一条数据的 _id 作为下一批的起始点 es_query["search_after"] = [search_after] # 保持 _id 类型一致 response = es_client.search(index="bidding", body=es_query, size="100") hits = response['hits']['hits'] # 如果没有更多数据,跳出循环 if not hits: print("没有更多数据,结束批次处理") break print("数据处理完成") except Exception as e: print(f"错误: {e}") time.sleep(10) finally: if conn.is_connected(): conn.close() # 确保连接关闭 print("MySQL 连接已关闭") if __name__ == '__main__': batch_load_data()