# coding:utf-8 import time from a2s.tools import json_serialize, json_deserialize from a2s.a2s_client import a2s_execute from docs.config import ReluMongodb from util.mogodb_helper import MongoDBInterface from pymongo import MongoClient from util.mysql_tool import MysqlUtil import json from datetime import datetime, timedelta from elasticsearch import Elasticsearch ReluClient = MongoDBInterface(ReluMongodb) # 评估服务配置 a2s_ip = "172.20.100.235:9090" topic = "quality_bid" #本地测试用的主题 # topic = "test_quality_bid" timeout = 180 # # ES 连接配置 # es_host = "http://127.0.0.1:19800" # es_username = "jianyuGr" # es_password = "we3g8glKfe#" #正式es es_host = "http://172.17.4.184:19908" es_username = "qyfw_es_2" es_password = "Khfdals33#" # 初始化 Elasticsearch 客户端 es_client = Elasticsearch(es_host,http_auth=(es_username, es_password),retry_on_timeout=True) # 使用基本认证 # 开始评估 def start_quality(data: dict, rules_id: int, a2s_ip, topic, timeout, retry=3): # 本次不使用SSL,所以channel是不安全的 row = {"data": data, "rules_id": rules_id} bytes_data = json_serialize(row) for t in range(retry): print("topic",topic) try: resp_data = a2s_execute(a2s_ip, topic, timeout, bytes_data) if resp_data is None: continue result = json_deserialize(resp_data) return result except Exception as e: print(e) return {} # 获取规则ID def get_rule(company, version): rule_id = ReluClient.find_rule_by_company(ReluMongodb["col"], company, version) return rule_id def find_error_id(conn, cleaned_key, sub_value): """ 查找 error_dict 中的 id """ query = """SELECT id FROM error_dict WHERE fields = %s AND error = %s""" params = (cleaned_key, sub_value) result = MysqlUtil.query_data(conn, query, params) #[(10,)] # 检查查询结果是否为空 if not result: print(f"Error: No matching record found for fields={cleaned_key}, error={sub_value}") return None # 或者返回一个默认值,根据需求而定 record = result[0][0] return record def insert_batch_data(conn, params): """ 执行批量插入数据 """ query = """INSERT IGNORE INTO bid_analysis (mongoid,toptype,subtype, site, spidercode, channel,comeintime, area, city, district, score, error_type, create_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s,%s)""" MysqlUtil.insert_data(conn, query, params) def insert_dynamic_error_field(conn, cleaned_key, error_ids, mongoid): """ 动态插入 error_ids 到相应的 cleaned_key_error 字段 """ # 构造动态插入 SQL 语句,更新指定的 cleaned_key_error 字段 query = f""" UPDATE bid_analysis SET {cleaned_key}_error = %s WHERE mongoid = %s """ # 拼接多个 error_id,用分隔符分开 error_ids_str = ','.join(map(str, error_ids)) params = (error_ids_str, mongoid ) MysqlUtil.update_data(conn, query, params) def get_last_processed_id(): """ 获取上次处理的最大 ID (例如从数据库或文件中读取) """ # 这里假设从文件读取中断 ID,你也可以从数据库或 Redis 等存储获取 try: with open('docs/last_processed_id_mysql.txt', 'r') as f: last_id = f.read().strip() if last_id: return last_id else: return None except FileNotFoundError: return None def save_last_processed_id(last_id): """ 保存当前处理的最大 ID,用于恢复 """ with open('docs/last_processed_id_mysql.txt', 'w') as f: f.write(str(last_id)) def clear_last_processed_id(): """ 清空 last_processed_id.txt 文件 """ open('docs/last_processed_id_mysql.txt', 'w').close() def batch_load_data(): """ 批量数据质量检查 """ # 获取今天的日期(字符串格式) today_date = datetime.now().strftime("%Y-%m-%d") # 获取今天 8:00:00 的时间戳 start_date = int(datetime.strptime(f"{today_date} 08:00:00", "%Y-%m-%d %H:%M:%S").timestamp()) # 获取今天 12:00:00 的时间戳 end_date = int(datetime.strptime(f"{today_date} 12:00:00", "%Y-%m-%d %H:%M:%S").timestamp()) # 规则查询,根据必要条件 公司名称(用户ID)、版本号 rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.2") print(rules_id) # 初始化mysql conn = MysqlUtil.connect_to_mysql(host='172.20.45.129', port='4000', user='root', password='=PDT49#80Z!RVv52_z',database='quality') # 获取上次处理的 ID,如果没有,则从头开始 last_processed_id = get_last_processed_id() print(f"上次处理的 ID: {last_processed_id}") # 获取ES数据 es_query = { "query": { "bool": { "filter": [ { "range": { "comeintime": { "gte": start_date, "lt": end_date } } } ] } }, "sort": [ {"_id": {"order": "asc"}} # 如果 comeintime 相同,再按 _id 排序 ], "size": 100 # 每次返回的数据量 } # 如果有上次处理的 ID,使用 `search_after` 进行分页 if last_processed_id: es_query["search_after"] = [last_processed_id] # 确保传入的是字符串类型的 _id try: # 使用 scroll API 来分批获取数据 response = es_client.search(index="bidding", body=es_query) hits = response['hits']['hits'] while hits: print(f"---- 批次开始 ----") max_id = None for hit in hits: item = hit["_source"] print("------一条数据开始--------") max_id = hit["_id"] print(f"正在处理数据: {max_id}") item["_id"] = str(hit["_id"]) # 质量检查逻辑 result = start_quality(item, rules_id, a2s_ip, topic, timeout) print(result) code = result.get("code") if code != 200: # 数据出错,跳过 continue data = result.get("data", {}) # 数据插入到 MySQL toptype = item.get("toptype", "") subtype = item.get("subtype", "") site = item.get("site", "") spidercode = item.get("spidercode", "") channel = item.get("channel", "") comeintime = item.get("comeintime", "") comeintime = datetime.fromtimestamp(comeintime) area = item.get("area", "") city = item.get("city", "") district = item.get("district", "") score = item.get("score", "") error_type_data = json.dumps(data) create_time = today_date params = (item["_id"], toptype, subtype, site, spidercode,channel, comeintime, area, city, district, score, error_type_data,create_time) insert_batch_data(conn, params) print("------一条数据结束------") # 保存当前批次处理的最大 ID if max_id: save_last_processed_id(max_id) print(f"保存当前处理的最大 ID: {max_id}") # 批次结束的打印信息 print("---- 当前批次数据处理完成 ----") # 获取下一批数据 search_after = hits[-1]["_id"] # 获取当前批次最后一条数据的 _id 作为下一批的起始点 es_query["search_after"] = [search_after] # 保持 _id 类型一致 response = es_client.search(index="bidding", body=es_query) hits = response['hits']['hits'] # 如果没有更多数据,跳出循环 if not hits: print("没有更多数据,结束批次处理") clear_last_processed_id() break print("数据处理完成") except Exception as e: print(f"错误: {e}") time.sleep(10) finally: if conn.is_connected(): conn.close() # 确保连接关闭 print("MySQL 连接已关闭") if __name__ == '__main__': batch_load_data()