# coding:utf-8 import time from a2s.tools import json_serialize, json_deserialize from a2s.a2s_client import a2s_execute from docs.config import ReluMongodb from util.mogodb_helper import MongoDBInterface from pymongo import MongoClient from util.mysql_tool import MysqlUtil import json from datetime import datetime, timedelta from bson import ObjectId ReluClient = MongoDBInterface(ReluMongodb) # 评估服务配置 a2s_ip = "172.20.100.235:9090" # topic = "quality_bid" #本地测试用的主题 topic = "test_quality_bid" timeout = 300 # 开始评估 def start_quality(data: dict, rules_id: int, a2s_ip, topic, timeout, retry=3): # 本次不使用SSL,所以channel是不安全的 row = {"data": data, "rules_id": rules_id} bytes_data = json_serialize(row) for t in range(retry): print("topic",topic) try: resp_data = a2s_execute(a2s_ip, topic, timeout, bytes_data) if resp_data is None: continue result = json_deserialize(resp_data) return result except Exception as e: print(e) return {} # 获取规则ID def get_rule(company, version): rule_id = ReluClient.find_rule_by_company(ReluMongodb["col"], company, version) return rule_id def find_error_id(conn, cleaned_key, sub_value): """ 查找 error_dict 中的 id """ query = """SELECT id FROM error_dict WHERE fields = %s AND error = %s""" params = (cleaned_key, sub_value) result = MysqlUtil.query_data(conn, query, params) #[(10,)] # 检查查询结果是否为空 if not result: print(f"Error: No matching record found for fields={cleaned_key}, error={sub_value}") return None # 或者返回一个默认值,根据需求而定 record = result[0][0] return record def insert_batch_data(conn, params): """ 执行批量插入数据 """ query = """INSERT IGNORE INTO bid_analysis_liantong (mongoid,toptype,subtype, site, spidercode, channel,comeintime, area, city, district, score, error_type, create_time, agency,agencyperson,agencytel,bidamount,bidendtime,bidopentime,bidstarttime,bidway,budget,buyer,buyerperson,buyertel,com_package,docendtime, docstarttime,est_purchase_time,href,projectcode,projectname,publishtime,s_winner,title,winnerorder,winnerperson,winnertel) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" MysqlUtil.insert_data(conn, query, params) def insert_dynamic_error_field(conn, cleaned_key, error_ids, mongoid): """ 动态插入 error_ids 到相应的 cleaned_key_error 字段 """ # 构造动态插入 SQL 语句,更新指定的 cleaned_key_error 字段 query = f""" UPDATE bid_analysis_liantong SET {cleaned_key}_error = %s WHERE mongoid = %s """ # 拼接多个 error_id,用分隔符分开 error_ids_str = ','.join(map(str, error_ids)) params = (error_ids_str, mongoid ) MysqlUtil.update_data(conn, query, params) def has_non_empty_qa(data): # 获取data字典 data_dict = data.get('data', {}) # 遍历所有键值对 for key, value in data_dict.items(): # 检查键以'_qa'结尾且值不为空 if key.endswith('_qa') and value: # value不为None、空字典、空列表等 return True return False def parse_timestamp(timestamp): if not timestamp: return None try: return datetime.fromtimestamp(int(timestamp)) except (ValueError, TypeError): return None def batch_load_data(): """ 批量数据质量检查 """ # 获取今天的日期(字符串格式) today_date = datetime.now().strftime("%Y-%m-%d") yesterday_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") # 获取昨天 00:00:00 的时间戳 start_date = int(datetime.strptime(f"{yesterday_date} 00:00:00", "%Y-%m-%d %H:%M:%S").timestamp()) # print("start_date",start_date) # 获取今天 00:00:00 的时间戳 end_date = int(datetime.strptime(f"{today_date} 00:00:00", "%Y-%m-%d %H:%M:%S").timestamp()) # print("end_date", end_date) # 规则查询,根据必要条件 公司名称(用户ID)、版本号 rules_id = get_rule("中国联通", "v1.3") print(rules_id) # 初始化mysql conn = MysqlUtil.connect_to_mysql(host='172.20.45.129', port='4000', user='root', password='=PDT49#80Z!RVv52_z',database='quality') max_id = ObjectId("0" * 24) # max_id = ObjectId("688363ebf0c6ad8b095e2245") # 查询条件:_id >= max_id, appid匹配,且 createtime 在 [start_date, end_date] 之间 query = { "_id": {"$gte": max_id}, # "_id": max_id, "appid": "jyGQ1XQQsEAwNeSENOFR9D", "createtime": {"$gte": start_date, "$lte": end_date} } while True: client = MongoClient('mongodb://127.0.0.1:27087/', unicode_decode_error_handler="ignore",directConnection=True).jyqyfw # 修改为你的连接地址 # client = MongoClient('mongodb://172.20.17.61:27080/', unicode_decode_error_handler="ignore",directConnection=True).jyqyfw # 正式环境 coll_user = client["usermail"] try: for item in coll_user.find(query).sort("_id", 1): print("------数据处理开始--------") max_id = item["_id"] item["_id"] = str(item["_id"]) print(f"正在处理数据: {max_id}") # 质量检查逻辑 result = start_quality(item, rules_id, a2s_ip, topic, timeout) print(result) code = result.get("code") if code != 200: # 数据出错,跳过 continue #只将有错误的数据存库 if has_non_empty_qa(result): data = result.get("data", {}) # 数据插入到 MySQL toptype = item.get("toptype", "") subtype = item.get("subtype", "") site = item.get("site", "") spidercode = item.get("spidercode", "") channel = item.get("channel", "") comeintime = item.get("comeintime", "") comeintime = datetime.fromtimestamp(comeintime) area = item.get("area", "") city = item.get("city", "") district = item.get("district", "") #--- agency = item.get("agency", "") agencyperson = item.get("agencyperson", "") agencytel = item.get("agencytel", "") bidamount = item.get("bidamount", "") bidendtime = item.get("bidendtime", "") bidendtime = parse_timestamp(bidendtime) bidopentime = item.get("bidopentime", "") bidopentime = parse_timestamp(bidopentime) bidstarttime = item.get("bidstarttime", "") bidstarttime = parse_timestamp(bidstarttime) bidway = item.get("bidway", "") budget = item.get("budget", "") buyer = item.get("buyer", "") buyerperson = item.get("buyerperson", "") buyertel = item.get("buyertel", "") com_package = item.get("com_package", "") #json串 com_package =json.dumps(com_package) docendtime = item.get("docendtime", "") docendtime = parse_timestamp(docendtime) docstarttime = item.get("docstarttime", "") docstarttime = parse_timestamp(docstarttime) est_purchase_time = item.get("est_purchase_time", "") est_purchase_time = parse_timestamp(est_purchase_time) href = item.get("href", "") projectcode = item.get("projectcode", "") projectname = item.get("projectname", "") publishtime = item.get("publishtime", "") publishtime = parse_timestamp(publishtime) s_winner = item.get("s_winner", "") title = item.get("title", "") winnerorder = item.get("winnerorder", "")#json串 winnerorder = json.dumps(winnerorder) winnerperson = item.get("winnerperson", "") winnertel = item.get("winnertel", "") #--- score = data.get("score", "") error_type_data = json.dumps(data) create_time = today_date params = (item["_id"], toptype, subtype, site, spidercode,channel, comeintime, area, city, district, score, error_type_data,create_time, agency,agencyperson,agencytel,bidamount,bidendtime,bidopentime,bidstarttime,bidway,budget,buyer,buyerperson,buyertel,com_package,docendtime, docstarttime,est_purchase_time,href,projectcode,projectname,publishtime,s_winner,title,winnerorder,winnerperson,winnertel) insert_batch_data(conn, params) print("---- 数据处理完成 ----") break except Exception as e: print(f"错误: {e}") import traceback traceback.print_exc() # 打印完整堆栈信息 time.sleep(10) if __name__ == '__main__': batch_load_data()