client_mongo_mysql_news_liantong.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. # coding:utf-8
  2. import time
  3. from a2s.tools import json_serialize, json_deserialize
  4. from a2s.a2s_client import a2s_execute
  5. from docs.config import ReluMongodb
  6. from util.mogodb_helper import MongoDBInterface
  7. from pymongo import MongoClient
  8. from util.mysql_tool import MysqlUtil
  9. import json
  10. from datetime import datetime, timedelta
  11. from bson import ObjectId
  12. ReluClient = MongoDBInterface(ReluMongodb)
  13. # 评估服务配置
  14. a2s_ip = "172.20.100.235:9090"
  15. topic = "quality_bid"
  16. #本地测试用的主题
  17. # topic = "test_quality_bid"
  18. timeout = 300
  19. # 开始评估
  20. def start_quality(data: dict, rules_id: int, a2s_ip, topic, timeout, retry=3):
  21. # 本次不使用SSL,所以channel是不安全的
  22. row = {"data": data, "rules_id": rules_id}
  23. bytes_data = json_serialize(row)
  24. for t in range(retry):
  25. print("topic",topic)
  26. try:
  27. resp_data = a2s_execute(a2s_ip, topic, timeout, bytes_data)
  28. if resp_data is None:
  29. continue
  30. result = json_deserialize(resp_data)
  31. return result
  32. except Exception as e:
  33. print(e)
  34. return {}
  35. # 获取规则ID
  36. def get_rule(company, version):
  37. rule_id = ReluClient.find_rule_by_company(ReluMongodb["col"], company, version)
  38. return rule_id
  39. def insert_batch_data(conn, params):
  40. """
  41. 执行批量插入数据
  42. """
  43. query = """INSERT IGNORE INTO news_analysis_liantong (mongoid, area, city, title, publish_org, score, error_type, create_time)
  44. VALUES (%s, %s, %s, %s, %s, %s, %s, %s )"""
  45. MysqlUtil.insert_data(conn, query, params)
  46. def has_non_empty_qa(data):
  47. # 获取data字典
  48. data_dict = data.get('data', {})
  49. # 遍历所有键值对
  50. for key, value in data_dict.items():
  51. # 检查键以'_qa'结尾且值不为空
  52. if key.endswith('_qa') and value: # value不为None、空字典、空列表等
  53. return True
  54. return False
  55. def insert_bid_statistics (col,conn,query,batch_id):
  56. # 定义来源 1标讯简版2拟在建3新闻4预算5专项债
  57. data_source = 3
  58. # 使用聚合管道进行多条件统计
  59. pipeline = [
  60. {"$match": query},
  61. {"$facet": {
  62. "总量": [{"$count": "count"}], # 添加总量统计
  63. }}
  64. ]
  65. result = list(col.aggregate(pipeline))[0]
  66. # 提取统计结果
  67. count_total = result["总量"][0]["count"] if result["总量"] else 0
  68. sql_query = """INSERT IGNORE INTO bid_statistics_liantong (news_count, batch_id,data_source)
  69. VALUES ( %s, %s ,%s)"""
  70. params = (count_total,batch_id,data_source)
  71. MysqlUtil.insert_data(conn, sql_query, params)
  72. def batch_load_data():
  73. """
  74. 批量数据质量检查
  75. """
  76. # 获取今天的日期(字符串格式)
  77. today_date = datetime.now().strftime("%Y-%m-%d")
  78. yesterday_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
  79. # 获取昨天 00:00:00 的时间戳
  80. start_date = int(datetime.strptime(f"{yesterday_date} 00:00:00", "%Y-%m-%d %H:%M:%S").timestamp())
  81. # print("start_date",start_date)
  82. # 获取今天 00:00:00 的时间戳
  83. end_date = int(datetime.strptime(f"{today_date} 00:00:00", "%Y-%m-%d %H:%M:%S").timestamp())
  84. # print("end_date", end_date)
  85. # 规则查询,根据必要条件 公司名称(用户ID)、版本号
  86. rules_id = get_rule("中国联通-新闻", "v1.4.1")
  87. print(rules_id)
  88. # 初始化mysql
  89. conn = MysqlUtil.connect_to_mysql(host='172.20.45.129', port='4000', user='root', password='=PDT49#80Z!RVv52_z',database='quality')
  90. max_id = ObjectId("0" * 24)
  91. # max_id = ObjectId("688363ebf0c6ad8b095e2245")
  92. # 查询条件:_id >= max_id, appid匹配,且 createtime 在 [start_date, end_date] 之间
  93. query = {
  94. "_id": {"$gte": max_id},
  95. "createtime": {"$gte": start_date, "$lte": end_date}
  96. }
  97. # mongo_client = MongoClient('mongodb://127.0.0.1:27087/', unicode_decode_error_handler="ignore",directConnection=True) # 修改为你的连接地址
  98. mongo_client = MongoClient('mongodb://172.20.17.61:27080/', unicode_decode_error_handler="ignore",directConnection=True) # 正式环境
  99. client = mongo_client.jyqyfw
  100. coll_user = client["usermail_lt_news"]
  101. #存入数据总量,计算缺失率
  102. insert_bid_statistics(coll_user,conn,query,today_date)
  103. while True:
  104. try:
  105. for item in coll_user.find(query).sort("_id", 1):
  106. print("------数据处理开始--------")
  107. max_id = item["_id"]
  108. item["_id"] = str(item["_id"])
  109. print(f"正在处理数据: {max_id}")
  110. # 质量检查逻辑
  111. result = start_quality(item, rules_id, a2s_ip, topic, timeout)
  112. print(result)
  113. code = result.get("code")
  114. if code != 200:
  115. # 数据出错,跳过
  116. continue
  117. #只将有错误的数据存库
  118. if has_non_empty_qa(result):
  119. data = result.get("data", {})
  120. # 数据插入到 MySQL
  121. area = item.get("area", "")
  122. city = item.get("city", "")
  123. title = item.get("title", "")
  124. publish_org = item.get("publish_org", "")
  125. score = data.get("score", "")
  126. error_type_data = json.dumps(data)
  127. create_time = today_date
  128. params = (item["_id"], area, city, title, publish_org, score, error_type_data,create_time)
  129. insert_batch_data(conn, params)
  130. print("---- 数据处理完成 ----")
  131. break
  132. except Exception as e:
  133. print(f"错误: {e}")
  134. import traceback
  135. traceback.print_exc() # 打印完整堆栈信息
  136. time.sleep(10)
  137. finally:
  138. # 确保在循环结束后关闭连接
  139. conn.close() # 关闭MySQL连接
  140. mongo_client.close() # 关闭MongoDB连接
  141. if __name__ == '__main__':
  142. batch_load_data()