client_mongo_mysql_liantong.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. # coding:utf-8
  2. import time
  3. from a2s.tools import json_serialize, json_deserialize
  4. from a2s.a2s_client import a2s_execute
  5. from docs.config import ReluMongodb
  6. from util.mogodb_helper import MongoDBInterface
  7. from pymongo import MongoClient
  8. from util.mysql_tool import MysqlUtil
  9. import json
  10. from datetime import datetime, timedelta
  11. from bson import ObjectId
  12. ReluClient = MongoDBInterface(ReluMongodb)
  13. # 评估服务配置
  14. a2s_ip = "172.20.100.235:9090"
  15. topic = "quality_bid"
  16. #本地测试用的主题
  17. # topic = "test_quality_bid"
  18. timeout = 300
  19. # 开始评估
  20. def start_quality(data: dict, rules_id: int, a2s_ip, topic, timeout, retry=3):
  21. # 本次不使用SSL,所以channel是不安全的
  22. row = {"data": data, "rules_id": rules_id}
  23. bytes_data = json_serialize(row)
  24. for t in range(retry):
  25. print("topic",topic)
  26. try:
  27. resp_data = a2s_execute(a2s_ip, topic, timeout, bytes_data)
  28. if resp_data is None:
  29. continue
  30. result = json_deserialize(resp_data)
  31. return result
  32. except Exception as e:
  33. print(e)
  34. return {}
  35. # 获取规则ID
  36. def get_rule(company, version):
  37. rule_id = ReluClient.find_rule_by_company(ReluMongodb["col"], company, version)
  38. return rule_id
  39. def find_error_id(conn, cleaned_key, sub_value):
  40. """
  41. 查找 error_dict 中的 id
  42. """
  43. query = """SELECT id FROM error_dict WHERE fields = %s AND error = %s"""
  44. params = (cleaned_key, sub_value)
  45. result = MysqlUtil.query_data(conn, query, params)
  46. #[(10,)]
  47. # 检查查询结果是否为空
  48. if not result:
  49. print(f"Error: No matching record found for fields={cleaned_key}, error={sub_value}")
  50. return None # 或者返回一个默认值,根据需求而定
  51. record = result[0][0]
  52. return record
  53. def insert_batch_data(conn, params):
  54. """
  55. 执行批量插入数据
  56. """
  57. query = """INSERT IGNORE INTO bid_analysis_liantong (mongoid,toptype,subtype, site, spidercode, channel,comeintime, area, city, district, score, error_type, create_time,
  58. agency,agencyperson,agencytel,bidamount,bidendtime,bidopentime,bidstarttime,bidway,budget,buyer,buyerperson,buyertel,com_package,docendtime,
  59. docstarttime,est_purchase_time,href,projectcode,projectname,publishtime,s_winner,title,winnerorder,winnerperson,winnertel)
  60. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
  61. MysqlUtil.insert_data(conn, query, params)
  62. def insert_dynamic_error_field(conn, cleaned_key, error_ids, mongoid):
  63. """
  64. 动态插入 error_ids 到相应的 cleaned_key_error 字段
  65. """
  66. # 构造动态插入 SQL 语句,更新指定的 cleaned_key_error 字段
  67. query = f"""
  68. UPDATE bid_analysis_liantong
  69. SET {cleaned_key}_error = %s
  70. WHERE mongoid = %s
  71. """
  72. # 拼接多个 error_id,用分隔符分开
  73. error_ids_str = ','.join(map(str, error_ids))
  74. params = (error_ids_str, mongoid )
  75. MysqlUtil.update_data(conn, query, params)
  76. def has_non_empty_qa(data):
  77. # 获取data字典
  78. data_dict = data.get('data', {})
  79. # 遍历所有键值对
  80. for key, value in data_dict.items():
  81. # 检查键以'_qa'结尾且值不为空
  82. if key.endswith('_qa') and value: # value不为None、空字典、空列表等
  83. return True
  84. return False
  85. def parse_timestamp(timestamp):
  86. if not timestamp:
  87. return None
  88. try:
  89. return datetime.fromtimestamp(int(timestamp))
  90. except (ValueError, TypeError):
  91. return None
  92. def insert_bid_statistics (col,conn,query,batch_id):
  93. # 定义来源 1标讯简版2拟在建3新闻4预算5专项债
  94. data_source = 1
  95. # 使用聚合管道进行多条件统计
  96. pipeline = [
  97. {"$match": query},
  98. {"$facet": {
  99. "总量": [{"$count": "count"}], # 添加总量统计
  100. "招标": [{"$match": {"toptype": "招标"}}, {"$count": "count"}],
  101. "预告": [{"$match": {"toptype": "预告"}}, {"$count": "count"}],
  102. "结果": [{"$match": {"toptype": "结果"}}, {"$count": "count"}],
  103. "其它": [{"$match": {"toptype": "其它"}}, {"$count": "count"}],
  104. "other": [{"$match": {"toptype": {"$nin": ["招标", "预告", "结果", "其它"]}}}, {"$count": "count"}]
  105. }}
  106. ]
  107. result = list(col.aggregate(pipeline))[0]
  108. # 提取统计结果
  109. count_total = result["总量"][0]["count"] if result["总量"] else 0
  110. count_zhaobiao = result["招标"][0]["count"] if result["招标"] else 0
  111. count_yugao = result["预告"][0]["count"] if result["预告"] else 0
  112. count_jieguo = result["结果"][0]["count"] if result["结果"] else 0
  113. count_qita = result["其它"][0]["count"] if result["其它"] else 0
  114. count_other = result["other"][0]["count"] if result["other"] else 0
  115. count_jieguo = count_jieguo + count_qita
  116. print(f"招标数量: {count_zhaobiao}")
  117. print(f"预告数量: {count_yugao}")
  118. print(f"结果数量: {count_jieguo}")
  119. print(f"其它类型数量: {count_other}")
  120. sql_query = """INSERT IGNORE INTO bid_statistics_liantong (zhaobiao_count,yugao_count,jieguo_count,other_count, total_count, batch_id,data_source)
  121. VALUES ( %s, %s, %s, %s, %s,%s,%s)"""
  122. params = (count_zhaobiao,count_yugao,count_jieguo,count_other,count_total,batch_id,data_source)
  123. MysqlUtil.insert_data(conn, sql_query, params)
  124. def batch_load_data():
  125. """
  126. 批量数据质量检查
  127. """
  128. # 获取今天的日期(字符串格式)
  129. today_date = datetime.now().strftime("%Y-%m-%d")
  130. yesterday_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
  131. # 获取昨天 00:00:00 的时间戳
  132. start_date = int(datetime.strptime(f"{yesterday_date} 00:00:00", "%Y-%m-%d %H:%M:%S").timestamp())
  133. # print("start_date",start_date)
  134. # 获取今天 00:00:00 的时间戳
  135. end_date = int(datetime.strptime(f"{today_date} 00:00:00", "%Y-%m-%d %H:%M:%S").timestamp())
  136. # print("end_date", end_date)
  137. # 规则查询,根据必要条件 公司名称(用户ID)、版本号
  138. rules_id = get_rule("中国联通简版-标讯", "v1.4")
  139. print(rules_id)
  140. # 初始化mysql
  141. conn = MysqlUtil.connect_to_mysql(host='172.20.45.129', port='4000', user='root', password='=PDT49#80Z!RVv52_z',database='quality')
  142. max_id = ObjectId("0" * 24)
  143. # max_id = ObjectId("688363ebf0c6ad8b095e2245")
  144. # 查询条件:_id >= max_id, appid匹配,且 createtime 在 [start_date, end_date] 之间
  145. query = {
  146. "_id": {"$gte": max_id},
  147. # "_id": max_id,
  148. "appid": "jyGQ1XQQsEAwNeSENOFR9D",
  149. "createtime": {"$gte": start_date, "$lte": end_date}
  150. }
  151. # mongo_client = MongoClient('mongodb://127.0.0.1:27087/', unicode_decode_error_handler="ignore",directConnection=True) # 修改为你的连接地址
  152. mongo_client = MongoClient('mongodb://172.20.17.61:27080/', unicode_decode_error_handler="ignore",directConnection=True) # 正式环境
  153. client = mongo_client.jyqyfw
  154. coll_user = client["usermail"]
  155. #存入数据总量,计算缺失率
  156. insert_bid_statistics(coll_user,conn,query,today_date)
  157. while True:
  158. try:
  159. for item in coll_user.find(query).sort("_id", 1):
  160. print("------数据处理开始--------")
  161. max_id = item["_id"]
  162. item["_id"] = str(item["_id"])
  163. print(f"正在处理数据: {max_id}")
  164. # 质量检查逻辑
  165. result = start_quality(item, rules_id, a2s_ip, topic, timeout)
  166. print(result)
  167. code = result.get("code")
  168. if code != 200:
  169. # 数据出错,跳过
  170. continue
  171. #只将有错误的数据存库
  172. if has_non_empty_qa(result):
  173. data = result.get("data", {})
  174. # 数据插入到 MySQL
  175. toptype = item.get("toptype", "")
  176. subtype = item.get("subtype", "")
  177. site = item.get("site", "")
  178. spidercode = item.get("spidercode", "")
  179. channel = item.get("channel", "")
  180. comeintime = item.get("comeintime", "")
  181. comeintime = datetime.fromtimestamp(comeintime)
  182. area = item.get("area", "")
  183. city = item.get("city", "")
  184. district = item.get("district", "")
  185. #---
  186. agency = item.get("agency", "")
  187. agencyperson = item.get("agencyperson", "")
  188. agencytel = item.get("agencytel", "")
  189. bidamount = item.get("bidamount", "")
  190. bidendtime = item.get("bidendtime", "")
  191. bidendtime = parse_timestamp(bidendtime)
  192. bidopentime = item.get("bidopentime", "")
  193. bidopentime = parse_timestamp(bidopentime)
  194. bidstarttime = item.get("bidstarttime", "")
  195. bidstarttime = parse_timestamp(bidstarttime)
  196. bidway = item.get("bidway", "")
  197. budget = item.get("budget", "")
  198. buyer = item.get("buyer", "")
  199. buyerperson = item.get("buyerperson", "")
  200. buyertel = item.get("buyertel", "")
  201. com_package = item.get("com_package", "") #json串
  202. com_package =json.dumps(com_package)
  203. docendtime = item.get("docendtime", "")
  204. docendtime = parse_timestamp(docendtime)
  205. docstarttime = item.get("docstarttime", "")
  206. docstarttime = parse_timestamp(docstarttime)
  207. est_purchase_time = item.get("est_purchase_time", "")
  208. est_purchase_time = parse_timestamp(est_purchase_time)
  209. href = item.get("href", "")
  210. projectcode = item.get("projectcode", "")
  211. projectname = item.get("projectname", "")
  212. publishtime = item.get("publishtime", "")
  213. publishtime = parse_timestamp(publishtime)
  214. s_winner = item.get("s_winner", "")
  215. title = item.get("title", "")
  216. winnerorder = item.get("winnerorder", "")#json串
  217. winnerorder = json.dumps(winnerorder)
  218. winnerperson = item.get("winnerperson", "")
  219. winnertel = item.get("winnertel", "")
  220. #---
  221. score = data.get("score", "")
  222. error_type_data = json.dumps(data)
  223. create_time = today_date
  224. params = (item["_id"], toptype, subtype, site, spidercode,channel, comeintime, area, city, district, score, error_type_data,create_time,
  225. agency,agencyperson,agencytel,bidamount,bidendtime,bidopentime,bidstarttime,bidway,budget,buyer,buyerperson,buyertel,com_package,docendtime,
  226. docstarttime,est_purchase_time,href,projectcode,projectname,publishtime,s_winner,title,winnerorder,winnerperson,winnertel)
  227. insert_batch_data(conn, params)
  228. print("---- 数据处理完成 ----")
  229. break
  230. except Exception as e:
  231. print(f"错误: {e}")
  232. import traceback
  233. traceback.print_exc() # 打印完整堆栈信息
  234. time.sleep(10)
  235. finally:
  236. # 确保在循环结束后关闭连接
  237. conn.close() # 关闭MySQL连接
  238. mongo_client.close() # 关闭MongoDB连接
  239. if __name__ == '__main__':
  240. batch_load_data()