123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- # coding:utf-8
- import time
- from a2s.tools import json_serialize, json_deserialize
- from a2s.a2s_client import a2s_execute
- from docs.config import ReluMongodb
- from util.mogodb_helper import MongoDBInterface
- from pymongo import MongoClient
- from util.mysql_tool import MysqlUtil
- import json
- from datetime import datetime, timedelta
- from bson import ObjectId
- ReluClient = MongoDBInterface(ReluMongodb)
- # 评估服务配置
- a2s_ip = "172.20.100.235:9090"
- # topic = "quality_bid"
- #本地测试用的主题
- topic = "test_quality_bid"
- timeout = 300
- # 开始评估
- def start_quality(data: dict, rules_id: int, a2s_ip, topic, timeout, retry=3):
- # 本次不使用SSL,所以channel是不安全的
- row = {"data": data, "rules_id": rules_id}
- bytes_data = json_serialize(row)
- for t in range(retry):
- print("topic",topic)
- try:
- resp_data = a2s_execute(a2s_ip, topic, timeout, bytes_data)
- if resp_data is None:
- continue
- result = json_deserialize(resp_data)
- return result
- except Exception as e:
- print(e)
- return {}
- # 获取规则ID
- def get_rule(company, version):
- rule_id = ReluClient.find_rule_by_company(ReluMongodb["col"], company, version)
- return rule_id
- def find_error_id(conn, cleaned_key, sub_value):
- """
- 查找 error_dict 中的 id
- """
- query = """SELECT id FROM error_dict WHERE fields = %s AND error = %s"""
- params = (cleaned_key, sub_value)
- result = MysqlUtil.query_data(conn, query, params)
- #[(10,)]
- # 检查查询结果是否为空
- if not result:
- print(f"Error: No matching record found for fields={cleaned_key}, error={sub_value}")
- return None # 或者返回一个默认值,根据需求而定
- record = result[0][0]
- return record
- def insert_batch_data(conn, params):
- """
- 执行批量插入数据
- """
- query = """INSERT IGNORE INTO bid_analysis_liantong (mongoid,toptype,subtype, site, spidercode, channel,comeintime, area, city, district, score, error_type, create_time,
- agency,agencyperson,agencytel,bidamount,bidendtime,bidopentime,bidstarttime,bidway,budget,buyer,buyerperson,buyertel,com_package,docendtime,
- docstarttime,est_purchase_time,href,projectcode,projectname,publishtime,s_winner,title,winnerorder,winnerperson,winnertel)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
- MysqlUtil.insert_data(conn, query, params)
- def insert_dynamic_error_field(conn, cleaned_key, error_ids, mongoid):
- """
- 动态插入 error_ids 到相应的 cleaned_key_error 字段
- """
- # 构造动态插入 SQL 语句,更新指定的 cleaned_key_error 字段
- query = f"""
- UPDATE bid_analysis_liantong
- SET {cleaned_key}_error = %s
- WHERE mongoid = %s
- """
- # 拼接多个 error_id,用分隔符分开
- error_ids_str = ','.join(map(str, error_ids))
- params = (error_ids_str, mongoid )
- MysqlUtil.update_data(conn, query, params)
- def has_non_empty_qa(data):
- # 获取data字典
- data_dict = data.get('data', {})
- # 遍历所有键值对
- for key, value in data_dict.items():
- # 检查键以'_qa'结尾且值不为空
- if key.endswith('_qa') and value: # value不为None、空字典、空列表等
- return True
- return False
- def parse_timestamp(timestamp):
- if not timestamp:
- return None
- try:
- return datetime.fromtimestamp(int(timestamp))
- except (ValueError, TypeError):
- return None
- def batch_load_data():
- """
- 批量数据质量检查
- """
- # 获取今天的日期(字符串格式)
- today_date = datetime.now().strftime("%Y-%m-%d")
- yesterday_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
- # 获取昨天 00:00:00 的时间戳
- start_date = int(datetime.strptime(f"{yesterday_date} 00:00:00", "%Y-%m-%d %H:%M:%S").timestamp())
- # print("start_date",start_date)
- # 获取今天 00:00:00 的时间戳
- end_date = int(datetime.strptime(f"{today_date} 00:00:00", "%Y-%m-%d %H:%M:%S").timestamp())
- # print("end_date", end_date)
- # 规则查询,根据必要条件 公司名称(用户ID)、版本号
- rules_id = get_rule("中国联通", "v1.3")
- print(rules_id)
- # 初始化mysql
- conn = MysqlUtil.connect_to_mysql(host='172.20.45.129', port='4000', user='root', password='=PDT49#80Z!RVv52_z',database='quality')
- max_id = ObjectId("0" * 24)
- # max_id = ObjectId("688363ebf0c6ad8b095e2245")
- # 查询条件:_id >= max_id, appid匹配,且 createtime 在 [start_date, end_date] 之间
- query = {
- "_id": {"$gte": max_id},
- # "_id": max_id,
- "appid": "jyGQ1XQQsEAwNeSENOFR9D",
- "createtime": {"$gte": start_date, "$lte": end_date}
- }
- while True:
- client = MongoClient('mongodb://127.0.0.1:27087/', unicode_decode_error_handler="ignore",directConnection=True).jyqyfw # 修改为你的连接地址
- # client = MongoClient('mongodb://172.20.17.61:27080/', unicode_decode_error_handler="ignore",directConnection=True).jyqyfw # 正式环境
- coll_user = client["usermail"]
- try:
- for item in coll_user.find(query).sort("_id", 1):
- print("------数据处理开始--------")
- max_id = item["_id"]
- item["_id"] = str(item["_id"])
- print(f"正在处理数据: {max_id}")
- # 质量检查逻辑
- result = start_quality(item, rules_id, a2s_ip, topic, timeout)
- print(result)
- code = result.get("code")
- if code != 200:
- # 数据出错,跳过
- continue
- #只将有错误的数据存库
- if has_non_empty_qa(result):
- data = result.get("data", {})
- # 数据插入到 MySQL
- toptype = item.get("toptype", "")
- subtype = item.get("subtype", "")
- site = item.get("site", "")
- spidercode = item.get("spidercode", "")
- channel = item.get("channel", "")
- comeintime = item.get("comeintime", "")
- comeintime = datetime.fromtimestamp(comeintime)
- area = item.get("area", "")
- city = item.get("city", "")
- district = item.get("district", "")
- #---
- agency = item.get("agency", "")
- agencyperson = item.get("agencyperson", "")
- agencytel = item.get("agencytel", "")
- bidamount = item.get("bidamount", "")
- bidendtime = item.get("bidendtime", "")
- bidendtime = parse_timestamp(bidendtime)
- bidopentime = item.get("bidopentime", "")
- bidopentime = parse_timestamp(bidopentime)
- bidstarttime = item.get("bidstarttime", "")
- bidstarttime = parse_timestamp(bidstarttime)
- bidway = item.get("bidway", "")
- budget = item.get("budget", "")
- buyer = item.get("buyer", "")
- buyerperson = item.get("buyerperson", "")
- buyertel = item.get("buyertel", "")
- com_package = item.get("com_package", "") #json串
- com_package =json.dumps(com_package)
- docendtime = item.get("docendtime", "")
- docendtime = parse_timestamp(docendtime)
- docstarttime = item.get("docstarttime", "")
- docstarttime = parse_timestamp(docstarttime)
- est_purchase_time = item.get("est_purchase_time", "")
- est_purchase_time = parse_timestamp(est_purchase_time)
- href = item.get("href", "")
- projectcode = item.get("projectcode", "")
- projectname = item.get("projectname", "")
- publishtime = item.get("publishtime", "")
- publishtime = parse_timestamp(publishtime)
- s_winner = item.get("s_winner", "")
- title = item.get("title", "")
- winnerorder = item.get("winnerorder", "")#json串
- winnerorder = json.dumps(winnerorder)
- winnerperson = item.get("winnerperson", "")
- winnertel = item.get("winnertel", "")
- #---
- score = data.get("score", "")
- error_type_data = json.dumps(data)
- create_time = today_date
- params = (item["_id"], toptype, subtype, site, spidercode,channel, comeintime, area, city, district, score, error_type_data,create_time,
- agency,agencyperson,agencytel,bidamount,bidendtime,bidopentime,bidstarttime,bidway,budget,buyer,buyerperson,buyertel,com_package,docendtime,
- docstarttime,est_purchase_time,href,projectcode,projectname,publishtime,s_winner,title,winnerorder,winnerperson,winnertel)
- insert_batch_data(conn, params)
- print("---- 数据处理完成 ----")
- break
- except Exception as e:
- print(f"错误: {e}")
- import traceback
- traceback.print_exc() # 打印完整堆栈信息
- time.sleep(10)
- if __name__ == '__main__':
- batch_load_data()
|