|
@@ -0,0 +1,169 @@
|
|
|
+# coding:utf-8
|
|
|
+import time
|
|
|
+from a2s.tools import json_serialize, json_deserialize
|
|
|
+from a2s.a2s_client import a2s_execute
|
|
|
+from docs.config import ReluMongodb
|
|
|
+from util.mogodb_helper import MongoDBInterface
|
|
|
+from pymongo import MongoClient
|
|
|
+from util.mysql_tool import MysqlUtil
|
|
|
+import json
|
|
|
+from datetime import datetime, timedelta
|
|
|
+from bson import ObjectId
|
|
|
+
|
|
|
+ReluClient = MongoDBInterface(ReluMongodb)
|
|
|
+
|
|
|
+# 评估服务配置
|
|
|
+a2s_ip = "172.20.100.235:9090"
|
|
|
+topic = "quality_bid"
|
|
|
+#本地测试用的主题
|
|
|
+# topic = "test_quality_bid"
|
|
|
+timeout = 300
|
|
|
+
|
|
|
+
|
|
|
+# 开始评估
|
|
|
+def start_quality(data: dict, rules_id: int, a2s_ip, topic, timeout, retry=3):
|
|
|
+ # 本次不使用SSL,所以channel是不安全的
|
|
|
+ row = {"data": data, "rules_id": rules_id}
|
|
|
+ bytes_data = json_serialize(row)
|
|
|
+ for t in range(retry):
|
|
|
+ print("topic",topic)
|
|
|
+ try:
|
|
|
+ resp_data = a2s_execute(a2s_ip, topic, timeout, bytes_data)
|
|
|
+ if resp_data is None:
|
|
|
+ continue
|
|
|
+ result = json_deserialize(resp_data)
|
|
|
+ return result
|
|
|
+ except Exception as e:
|
|
|
+ print(e)
|
|
|
+ return {}
|
|
|
+
|
|
|
+# 获取规则ID
|
|
|
+def get_rule(company, version):
|
|
|
+ rule_id = ReluClient.find_rule_by_company(ReluMongodb["col"], company, version)
|
|
|
+ return rule_id
|
|
|
+
|
|
|
+def insert_batch_data(conn, params):
|
|
|
+ """
|
|
|
+ 执行批量插入数据
|
|
|
+ """
|
|
|
+ query = """INSERT IGNORE INTO yusuan_analysis_liantong (mongoid, area, city,district, projectname, publish_org, procure_content,kpi,budget,institution,score, error_type, create_time)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s,%s,%s,%s,%s,%s )"""
|
|
|
+ MysqlUtil.insert_data(conn, query, params)
|
|
|
+
|
|
|
+
|
|
|
+def has_non_empty_qa(data):
|
|
|
+ # 获取data字典
|
|
|
+ data_dict = data.get('data', {})
|
|
|
+
|
|
|
+ # 遍历所有键值对
|
|
|
+ for key, value in data_dict.items():
|
|
|
+ # 检查键以'_qa'结尾且值不为空
|
|
|
+ if key.endswith('_qa') and value: # value不为None、空字典、空列表等
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+def insert_bid_statistics (col,conn,query,batch_id):
|
|
|
+ #定义来源 1标讯简版2拟在建3新闻4预算5专项债
|
|
|
+ data_source =4
|
|
|
+ # 使用聚合管道进行多条件统计
|
|
|
+ pipeline = [
|
|
|
+ {"$match": query},
|
|
|
+ {"$facet": {
|
|
|
+ "总量": [{"$count": "count"}], # 添加总量统计
|
|
|
+ }}
|
|
|
+ ]
|
|
|
+
|
|
|
+ result = list(col.aggregate(pipeline))[0]
|
|
|
+
|
|
|
+ # 提取统计结果
|
|
|
+ count_total = result["总量"][0]["count"] if result["总量"] else 0
|
|
|
+
|
|
|
+ sql_query = """INSERT IGNORE INTO bid_statistics_liantong (yusuan_count, batch_id,data_source)
|
|
|
+ VALUES ( %s, %s ,%s)"""
|
|
|
+ params = (count_total, batch_id, data_source)
|
|
|
+ MysqlUtil.insert_data(conn, sql_query, params)
|
|
|
+
|
|
|
+def batch_load_data():
|
|
|
+ """
|
|
|
+ 批量数据质量检查
|
|
|
+ """
|
|
|
+ # 获取今天的日期(字符串格式)
|
|
|
+ today_date = datetime.now().strftime("%Y-%m-%d")
|
|
|
+ yesterday_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
|
|
|
+ # 获取昨天 00:00:00 的时间戳
|
|
|
+ start_date = int(datetime.strptime(f"{yesterday_date} 00:00:00", "%Y-%m-%d %H:%M:%S").timestamp())
|
|
|
+ # print("start_date",start_date)
|
|
|
+ # 获取今天 00:00:00 的时间戳
|
|
|
+ end_date = int(datetime.strptime(f"{today_date} 00:00:00", "%Y-%m-%d %H:%M:%S").timestamp())
|
|
|
+ # print("end_date", end_date)
|
|
|
+ # 规则查询,根据必要条件 公司名称(用户ID)、版本号
|
|
|
+ rules_id = get_rule("中国联通-预算", "v1.4.3")
|
|
|
+ print(rules_id)
|
|
|
+ # 初始化mysql
|
|
|
+ conn = MysqlUtil.connect_to_mysql(host='172.20.45.129', port='4000', user='root', password='=PDT49#80Z!RVv52_z',database='quality')
|
|
|
+
|
|
|
+ max_id = ObjectId("0" * 24)
|
|
|
+ # max_id = ObjectId("6886f76a923643203ed1e99b")
|
|
|
+ # 查询条件:_id >= max_id, appid匹配,且 createtime 在 [start_date, end_date] 之间
|
|
|
+ query = {
|
|
|
+ "_id": {"$gte": max_id},
|
|
|
+ # "_id": max_id,
|
|
|
+ "createtime": {"$gte": start_date, "$lte": end_date}
|
|
|
+ }
|
|
|
+
|
|
|
+ # mongo_client = MongoClient('mongodb://127.0.0.1:27087/', unicode_decode_error_handler="ignore",directConnection=True) # 修改为你的连接地址
|
|
|
+ mongo_client = MongoClient('mongodb://172.20.17.61:27080/', unicode_decode_error_handler="ignore",directConnection=True) # 正式环境
|
|
|
+ client = mongo_client.jyqyfw
|
|
|
+ coll_user = client["usermail_lt_budget"]
|
|
|
+ #存入数据总量,计算缺失率
|
|
|
+ insert_bid_statistics(coll_user,conn,query,today_date)
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ for item in coll_user.find(query).sort("_id", 1):
|
|
|
+ print("------数据处理开始--------")
|
|
|
+ max_id = item["_id"]
|
|
|
+ item["_id"] = str(item["_id"])
|
|
|
+ print(f"正在处理数据: {max_id}")
|
|
|
+ # 质量检查逻辑
|
|
|
+ result = start_quality(item, rules_id, a2s_ip, topic, timeout)
|
|
|
+ print(result)
|
|
|
+ code = result.get("code")
|
|
|
+ if code != 200:
|
|
|
+ # 数据出错,跳过
|
|
|
+ continue
|
|
|
+ #只将有错误的数据存库
|
|
|
+ if has_non_empty_qa(result):
|
|
|
+ data = result.get("data", {})
|
|
|
+
|
|
|
+ # 数据插入到 MySQL
|
|
|
+ area = item.get("area", "")
|
|
|
+ city = item.get("city", "")
|
|
|
+ district = item.get("district", "")
|
|
|
+ projectname = item.get("projectname", "")
|
|
|
+ publish_org = item.get("publish_org", "")
|
|
|
+ procure_content = item.get("procure_content", "")
|
|
|
+ kpi = item.get("kpi", "")
|
|
|
+ budget = item.get("budget", "")
|
|
|
+ institution = item.get("institution", "")
|
|
|
+ score = data.get("score", "")
|
|
|
+ error_type_data = json.dumps(data)
|
|
|
+ create_time = today_date
|
|
|
+
|
|
|
+ params = (item["_id"], area, city, district,projectname, publish_org, procure_content,kpi,budget,institution,score, error_type_data,create_time)
|
|
|
+ insert_batch_data(conn, params)
|
|
|
+
|
|
|
+ print("---- 数据处理完成 ----")
|
|
|
+ break
|
|
|
+ except Exception as e:
|
|
|
+ print(f"错误: {e}")
|
|
|
+ import traceback
|
|
|
+ traceback.print_exc() # 打印完整堆栈信息
|
|
|
+ time.sleep(10)
|
|
|
+ finally:
|
|
|
+ # 确保在循环结束后关闭连接
|
|
|
+ conn.close() # 关闭MySQL连接
|
|
|
+ mongo_client.close() # 关闭MongoDB连接
|
|
|
+if __name__ == '__main__':
|
|
|
+ batch_load_data()
|
|
|
+
|