|
@@ -0,0 +1,263 @@
|
|
|
|
+# coding:utf-8
|
|
|
|
+import time
|
|
|
|
+from a2s.tools import json_serialize, json_deserialize
|
|
|
|
+from a2s.a2s_client import a2s_execute
|
|
|
|
+from docs.config import ReluMongodb
|
|
|
|
+from util.mogodb_helper import MongoDBInterface
|
|
|
|
+from pymongo import MongoClient
|
|
|
|
+from util.mysql_tool import MysqlUtil
|
|
|
|
+import json
|
|
|
|
+from datetime import datetime, timedelta
|
|
|
|
+from elasticsearch import Elasticsearch
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ReluClient = MongoDBInterface(ReluMongodb)
|
|
|
|
+
|
|
|
|
+# 评估服务配置
|
|
|
|
+a2s_ip = "192.168.3.240:9090"
|
|
|
|
+# a2s_ip = "172.17.0.11:9090"
|
|
|
|
+topic = "quality_bid"
|
|
|
|
+#本地测试用的主题
|
|
|
|
+# topic = "test_quality_bid"
|
|
|
|
+timeout = 180
|
|
|
|
+
|
|
|
|
+# 获取当前时间
|
|
|
|
+now = datetime.now()
|
|
|
|
+current_datetime = now.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
+# 获取今天的日期
|
|
|
|
+today = datetime.today()
|
|
|
|
+# 获取昨天的日期
|
|
|
|
+yesterday = today - timedelta(days=1)
|
|
|
|
+# 获取昨天0点的时间
|
|
|
|
+yesterday_midnight = datetime(yesterday.year, yesterday.month, yesterday.day)
|
|
|
|
+# 获取今天0点的时间
|
|
|
|
+today_midnight = datetime(today.year, today.month, today.day)
|
|
|
|
+# 转换为Unix时间戳
|
|
|
|
+start_date = int(yesterday_midnight.timestamp())
|
|
|
|
+end_date = int(today_midnight.timestamp())
|
|
|
|
+
|
|
|
|
+# ES 连接配置
|
|
|
|
+es_host = "http://127.0.0.1:19800"
|
|
|
|
+es_username = "jianyuGr"
|
|
|
|
+es_password = "we3g8glKfe#"
|
|
|
|
+
|
|
|
|
+# 初始化 Elasticsearch 客户端
|
|
|
|
+es_client = Elasticsearch(es_host,http_auth=(es_username, es_password),retry_on_timeout=True) # 使用基本认证
|
|
|
|
+
|
|
|
|
+# 开始评估
|
|
|
|
+def start_quality(data: dict, rules_id: int, a2s_ip, topic, timeout, retry=3):
|
|
|
|
+ # 本次不使用SSL,所以channel是不安全的
|
|
|
|
+ row = {"data": data, "rules_id": rules_id}
|
|
|
|
+ bytes_data = json_serialize(row)
|
|
|
|
+ for t in range(retry):
|
|
|
|
+ print("topic",topic)
|
|
|
|
+ try:
|
|
|
|
+ resp_data = a2s_execute(a2s_ip, topic, timeout, bytes_data)
|
|
|
|
+ if resp_data is None:
|
|
|
|
+ continue
|
|
|
|
+ result = json_deserialize(resp_data)
|
|
|
|
+ return result
|
|
|
|
+ except Exception as e:
|
|
|
|
+ print(e)
|
|
|
|
+ return {}
|
|
|
|
+
|
|
|
|
+# 获取规则ID
|
|
|
|
+def get_rule(company, version):
|
|
|
|
+ rule_id = ReluClient.find_rule_by_company(ReluMongodb["col"], company, version)
|
|
|
|
+ return rule_id
|
|
|
|
+
|
|
|
|
+def find_error_id(conn, cleaned_key, sub_value):
|
|
|
|
+ """
|
|
|
|
+ 查找 error_dict 中的 id
|
|
|
|
+ """
|
|
|
|
+ query = """SELECT id FROM error_dict WHERE fields = %s AND error = %s"""
|
|
|
|
+ params = (cleaned_key, sub_value)
|
|
|
|
+ result = MysqlUtil.query_data(conn, query, params)
|
|
|
|
+ #[(10,)]
|
|
|
|
+ # 检查查询结果是否为空
|
|
|
|
+ if not result:
|
|
|
|
+ print(f"Error: No matching record found for fields={cleaned_key}, error={sub_value}")
|
|
|
|
+ return None # 或者返回一个默认值,根据需求而定
|
|
|
|
+
|
|
|
|
+ record = result[0][0]
|
|
|
|
+ return record
|
|
|
|
+
|
|
|
|
+def insert_batch_data(conn, params):
|
|
|
|
+ """
|
|
|
|
+ 执行批量插入数据
|
|
|
|
+ """
|
|
|
|
+ query = """INSERT IGNORE INTO bid_analysis (mongoid, site, spidercode, comeintime, area, city, district, score, error_type, spider_modified_time)
|
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
|
|
|
|
+ MysqlUtil.insert_data(conn, query, params)
|
|
|
|
+
|
|
|
|
+def insert_dynamic_error_field(conn, cleaned_key, error_ids, mongoid):
|
|
|
|
+ """
|
|
|
|
+ 动态插入 error_ids 到相应的 cleaned_key_error 字段
|
|
|
|
+ """
|
|
|
|
+ # 构造动态插入 SQL 语句,更新指定的 cleaned_key_error 字段
|
|
|
|
+ query = f"""
|
|
|
|
+ UPDATE bid_analysis
|
|
|
|
+ SET {cleaned_key}_error = %s
|
|
|
|
+ WHERE mongoid = %s
|
|
|
|
+ """
|
|
|
|
+ # 拼接多个 error_id,用分隔符分开
|
|
|
|
+ error_ids_str = ','.join(map(str, error_ids))
|
|
|
|
+ params = (error_ids_str, mongoid )
|
|
|
|
+
|
|
|
|
+ MysqlUtil.update_data(conn, query, params)
|
|
|
|
+
|
|
|
|
+def get_last_processed_id():
|
|
|
|
+ """
|
|
|
|
+ 获取上次处理的最大 ID (例如从数据库或文件中读取)
|
|
|
|
+ """
|
|
|
|
+ # 这里假设从文件读取中断 ID,你也可以从数据库或 Redis 等存储获取
|
|
|
|
+ try:
|
|
|
|
+ with open('docs/last_processed_id.txt', 'r') as f:
|
|
|
|
+ last_id = f.read().strip()
|
|
|
|
+ if last_id:
|
|
|
|
+ return last_id
|
|
|
|
+ else:
|
|
|
|
+ return None
|
|
|
|
+ except FileNotFoundError:
|
|
|
|
+ return None
|
|
|
|
+
|
|
|
|
+def save_last_processed_id(last_id):
|
|
|
|
+ """
|
|
|
|
+ 保存当前处理的最大 ID,用于恢复
|
|
|
|
+ """
|
|
|
|
+ with open('docs/last_processed_id.txt', 'w') as f:
|
|
|
|
+ f.write(str(last_id))
|
|
|
|
+
|
|
|
|
+def batch_load_data():
|
|
|
|
+ """
|
|
|
|
+ 批量数据质量检查
|
|
|
|
+ """
|
|
|
|
+ # 规则查询,根据必要条件 公司名称(用户ID)、版本号
|
|
|
|
+ rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.2")
|
|
|
|
+ print(rules_id)
|
|
|
|
+
|
|
|
|
+ # 初始化mysql
|
|
|
|
+ # conn = MysqlUtil.connect_to_mysql(host='192.168.3.14', port='4000', user='DataScBi', password='DT#Sc20221123Ht',database='quality')
|
|
|
|
+ conn = MysqlUtil.connect_to_mysql(host='192.168.3.217', port='4000', user='root', password='=PDT49#80Z!RVv52_z',database='quality')
|
|
|
|
+ # 初始化爬虫代码库
|
|
|
|
+ collection = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["lua_logs_auditor"]
|
|
|
|
+
|
|
|
|
+ # 获取上次处理的 ID,如果没有,则从头开始
|
|
|
|
+ last_processed_id = get_last_processed_id()
|
|
|
|
+ print(f"上次处理的 ID: {last_processed_id}")
|
|
|
|
+
|
|
|
|
+ # 获取ES数据
|
|
|
|
+ es_query = {
|
|
|
|
+ "query": {
|
|
|
|
+ "bool": {
|
|
|
|
+ "filter": [
|
|
|
|
+ {
|
|
|
|
+ "range": {
|
|
|
|
+ "comeintime": {
|
|
|
|
+ "gte": start_date,
|
|
|
|
+ "lt": end_date
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ ]
|
|
|
|
+ }
|
|
|
|
+ },
|
|
|
|
+ "sort": [
|
|
|
|
+ {"_id": {"order": "asc"}} # 如果 comeintime 相同,再按 _id 排序
|
|
|
|
+ ],
|
|
|
|
+ "size": 100 # 每次返回的数据量
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ # 如果有上次处理的 ID,使用 `search_after` 进行分页
|
|
|
|
+ if last_processed_id:
|
|
|
|
+ es_query["search_after"] = [last_processed_id] # 确保传入的是字符串类型的 _id
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ # 使用 scroll API 来分批获取数据
|
|
|
|
+ response = es_client.search(index="bidding", body=es_query, size=100)
|
|
|
|
+ hits = response['hits']['hits']
|
|
|
|
+
|
|
|
|
+ while hits:
|
|
|
|
+ print(f"---- 批次开始 ----")
|
|
|
|
+ max_id = None
|
|
|
|
+ for hit in hits:
|
|
|
|
+ item = hit["_source"]
|
|
|
|
+ print("------一条数据开始--------")
|
|
|
|
+ max_id = hit["_id"]
|
|
|
|
+ print(f"正在处理数据: {max_id}")
|
|
|
|
+ item["_id"] = str(hit["_id"])
|
|
|
|
+
|
|
|
|
+ # 质量检查逻辑
|
|
|
|
+ result = start_quality(item, rules_id, a2s_ip, topic, timeout)
|
|
|
|
+ print(result)
|
|
|
|
+
|
|
|
|
+ code = result.get("code")
|
|
|
|
+ if code != 200:
|
|
|
|
+ # 数据出错,跳过
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+ data = result.get("data", {})
|
|
|
|
+
|
|
|
|
+ # 数据插入到 MySQL
|
|
|
|
+ site = item.get("site", "")
|
|
|
|
+ spidercode = item.get("spidercode", "")
|
|
|
|
+ comeintime = item.get("comeintime", "")
|
|
|
|
+ comeintime = datetime.fromtimestamp(comeintime)
|
|
|
|
+ area = item.get("area", "")
|
|
|
|
+ city = item.get("city", "")
|
|
|
|
+ district = item.get("district", "")
|
|
|
|
+ score = item.get("score", "")
|
|
|
|
+ error_type_data = json.dumps(data)
|
|
|
|
+ spider_modified_time = current_datetime
|
|
|
|
+
|
|
|
|
+ info = collection.find_one({"code": spidercode})
|
|
|
|
+ if info:
|
|
|
|
+ spider_modified_time = info.get("modifytime", "")
|
|
|
|
+ spider_modified_time = datetime.fromtimestamp(spider_modified_time)
|
|
|
|
+
|
|
|
|
+ params = (item["_id"], site, spidercode, comeintime, area, city, district, score, error_type_data,spider_modified_time)
|
|
|
|
+ insert_batch_data(conn, params)
|
|
|
|
+
|
|
|
|
+ # 遍历错误原因字典并提取非空字典中的值
|
|
|
|
+ for key, value in data.items():
|
|
|
|
+ error_ids = []
|
|
|
|
+ if isinstance(value, dict) and value:
|
|
|
|
+ cleaned_key = key[:-3] if key.endswith('_qa') else key # 去除 '_qa' 后缀
|
|
|
|
+ for sub_key, sub_value in value.items():
|
|
|
|
+ error_id = find_error_id(conn, cleaned_key, sub_value)
|
|
|
|
+ if error_id:
|
|
|
|
+ error_ids.append(error_id)
|
|
|
|
+ print(f" {sub_key}: {sub_value}")
|
|
|
|
+
|
|
|
|
+ # 插入错误ID到cleaned_key_error字段
|
|
|
|
+ insert_dynamic_error_field(conn, cleaned_key, error_ids, item["_id"])
|
|
|
|
+ print("------一条数据结束------")
|
|
|
|
+ # 保存当前批次处理的最大 ID
|
|
|
|
+ if max_id:
|
|
|
|
+ save_last_processed_id(max_id)
|
|
|
|
+ print(f"保存当前处理的最大 ID: {max_id}")
|
|
|
|
+ # 批次结束的打印信息
|
|
|
|
+ print("---- 当前批次数据处理完成 ----")
|
|
|
|
+
|
|
|
|
+ # 获取下一批数据
|
|
|
|
+ search_after = hits[-1]["_id"] # 获取当前批次最后一条数据的 _id 作为下一批的起始点
|
|
|
|
+ es_query["search_after"] = [search_after] # 保持 _id 类型一致
|
|
|
|
+ response = es_client.search(index="bidding", body=es_query, scroll="1m")
|
|
|
|
+ hits = response['hits']['hits']
|
|
|
|
+
|
|
|
|
+ # 如果没有更多数据,跳出循环
|
|
|
|
+ if not hits:
|
|
|
|
+ print("没有更多数据,结束批次处理")
|
|
|
|
+ break
|
|
|
|
+ print("数据处理完成")
|
|
|
|
+ except Exception as e:
|
|
|
|
+ print(f"错误: {e}")
|
|
|
|
+ time.sleep(10)
|
|
|
|
+ finally:
|
|
|
|
+ if conn.is_connected():
|
|
|
|
+ conn.close() # 确保连接关闭
|
|
|
|
+ print("MySQL 连接已关闭")
|
|
|
|
+
|
|
|
|
+if __name__ == '__main__':
|
|
|
|
+ batch_load_data()
|
|
|
|
+
|