client_mysql_new.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. # coding:utf-8
  2. import time
  3. from a2s.tools import json_serialize, json_deserialize
  4. from a2s.a2s_client import a2s_execute
  5. from docs.config import ReluMongodb
  6. from util.mogodb_helper import MongoDBInterface
  7. from pymongo import MongoClient
  8. from util.mysql_tool import MysqlUtil
  9. import json
  10. from datetime import datetime, timedelta
  11. from elasticsearch import Elasticsearch
  12. ReluClient = MongoDBInterface(ReluMongodb)
  13. # 评估服务配置
  14. a2s_ip = "172.20.100.235:9090"
  15. topic = "quality_bid"
  16. #本地测试用的主题
  17. # topic = "test_quality_bid"
  18. timeout = 180
  19. # # 测试es
  20. # es_host = "http://127.0.0.1:19800"
  21. # es_username = "jianyuGr"
  22. # es_password = "we3g8glKfe#"
  23. #正式es
  24. es_host = "http://172.17.4.184:19908"
  25. es_username = "qyfw_es_2"
  26. es_password = "Khfdals33#"
  27. # 初始化 Elasticsearch 客户端
  28. es_client = Elasticsearch(es_host,http_auth=(es_username, es_password),retry_on_timeout=True) # 使用基本认证
  29. # 开始评估
  30. def start_quality(data: dict, rules_id: int, a2s_ip, topic, timeout, retry=3):
  31. # 本次不使用SSL,所以channel是不安全的
  32. row = {"data": data, "rules_id": rules_id}
  33. bytes_data = json_serialize(row)
  34. for t in range(retry):
  35. print("topic",topic)
  36. try:
  37. resp_data = a2s_execute(a2s_ip, topic, timeout, bytes_data)
  38. if resp_data is None:
  39. continue
  40. result = json_deserialize(resp_data)
  41. return result
  42. except Exception as e:
  43. print(e)
  44. return {}
  45. # 获取规则ID
  46. def get_rule(company, version):
  47. rule_id = ReluClient.find_rule_by_company(ReluMongodb["col"], company, version)
  48. return rule_id
  49. def find_error_id(conn, cleaned_key, sub_value):
  50. """
  51. 查找 error_dict 中的 id
  52. """
  53. query = """SELECT id FROM error_dict WHERE fields = %s AND error = %s"""
  54. params = (cleaned_key, sub_value)
  55. result = MysqlUtil.query_data(conn, query, params)
  56. #[(10,)]
  57. # 检查查询结果是否为空
  58. if not result:
  59. print(f"Error: No matching record found for fields={cleaned_key}, error={sub_value}")
  60. return None # 或者返回一个默认值,根据需求而定
  61. record = result[0][0]
  62. return record
  63. def insert_batch_data(conn, params):
  64. """
  65. 执行批量插入数据
  66. """
  67. query = """INSERT IGNORE INTO bid_analysis (mongoid,toptype,subtype, site, spidercode, channel,comeintime, area, city, district, score, error_type, create_time)
  68. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s,%s)"""
  69. MysqlUtil.insert_data(conn, query, params)
  70. def insert_dynamic_error_field(conn, cleaned_key, error_ids, mongoid):
  71. """
  72. 动态插入 error_ids 到相应的 cleaned_key_error 字段
  73. """
  74. # 构造动态插入 SQL 语句,更新指定的 cleaned_key_error 字段
  75. query = f"""
  76. UPDATE bid_analysis
  77. SET {cleaned_key}_error = %s
  78. WHERE mongoid = %s
  79. """
  80. # 拼接多个 error_id,用分隔符分开
  81. error_ids_str = ','.join(map(str, error_ids))
  82. params = (error_ids_str, mongoid )
  83. MysqlUtil.update_data(conn, query, params)
  84. def has_non_empty_qa(data):
  85. # 获取data字典
  86. data_dict = data.get('data', {})
  87. # 遍历所有键值对
  88. for key, value in data_dict.items():
  89. # 检查键以'_qa'结尾且值不为空
  90. if key.endswith('_qa') and value: # value不为None、空字典、空列表等
  91. return True
  92. return False
  93. def batch_load_data():
  94. """
  95. 批量数据质量检查
  96. """
  97. # 获取今天的日期(字符串格式)
  98. today_date = datetime.now().strftime("%Y-%m-%d")
  99. yesterday_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
  100. # 获取昨天 00:00:00 的时间戳
  101. start_date = int(datetime.strptime(f"{yesterday_date} 00:00:00", "%Y-%m-%d %H:%M:%S").timestamp())
  102. # print("start_date",start_date)
  103. # 获取今天 00:00:00 的时间戳
  104. end_date = int(datetime.strptime(f"{today_date} 00:00:00", "%Y-%m-%d %H:%M:%S").timestamp())
  105. # print("end_date", end_date)
  106. # 规则查询,根据必要条件 公司名称(用户ID)、版本号
  107. rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.2")
  108. print(rules_id)
  109. # 初始化mysql
  110. conn = MysqlUtil.connect_to_mysql(host='172.20.45.129', port='4000', user='root', password='=PDT49#80Z!RVv52_z',database='quality')
  111. # 获取ES数据
  112. es_query = {
  113. "query": {
  114. "bool": {
  115. "filter": [
  116. {
  117. "range": {
  118. "comeintime": {
  119. "gte": start_date,
  120. "lt": end_date
  121. }
  122. }
  123. }
  124. ]
  125. }
  126. },
  127. "sort": [
  128. {"_id": {"order": "asc"}} # 如果 comeintime 相同,再按 _id 排序
  129. ],
  130. "size": 100 # 每次返回的数据量
  131. }
  132. try:
  133. # 使用 scroll API 来分批获取数据
  134. response = es_client.search(index="bidding", body=es_query)
  135. hits = response['hits']['hits']
  136. while hits:
  137. print(f"---- 批次开始 ----")
  138. max_id = None
  139. for hit in hits:
  140. item = hit["_source"]
  141. print("------一条数据开始--------")
  142. max_id = hit["_id"]
  143. print(f"正在处理数据: {max_id}")
  144. item["_id"] = str(hit["_id"])
  145. # 质量检查逻辑
  146. result = start_quality(item, rules_id, a2s_ip, topic, timeout)
  147. print(result)
  148. code = result.get("code")
  149. if code != 200:
  150. # 数据出错,跳过
  151. continue
  152. #只将有错误的数据存库
  153. if has_non_empty_qa(result):
  154. data = result.get("data", {})
  155. # 数据插入到 MySQL
  156. toptype = item.get("toptype", "")
  157. subtype = item.get("subtype", "")
  158. site = item.get("site", "")
  159. spidercode = item.get("spidercode", "")
  160. channel = item.get("channel", "")
  161. comeintime = item.get("comeintime", "")
  162. comeintime = datetime.fromtimestamp(comeintime)
  163. area = item.get("area", "")
  164. city = item.get("city", "")
  165. district = item.get("district", "")
  166. score = item.get("score", "")
  167. error_type_data = json.dumps(data)
  168. create_time = today_date
  169. params = (item["_id"], toptype, subtype, site, spidercode,channel, comeintime, area, city, district, score, error_type_data,create_time)
  170. insert_batch_data(conn, params)
  171. print("------一条数据结束------")
  172. # 批次结束的打印信息
  173. print("---- 当前批次数据处理完成 ----")
  174. # 获取下一批数据
  175. search_after = hits[-1]["_id"] # 获取当前批次最后一条数据的 _id 作为下一批的起始点
  176. es_query["search_after"] = [search_after] # 保持 _id 类型一致
  177. response = es_client.search(index="bidding", body=es_query)
  178. hits = response['hits']['hits']
  179. # 如果没有更多数据,跳出循环
  180. if not hits:
  181. print("没有更多数据,结束批次处理")
  182. break
  183. print("数据处理完成")
  184. except Exception as e:
  185. print(f"错误: {e}")
  186. time.sleep(10)
  187. finally:
  188. if conn.is_connected():
  189. conn.close() # 确保连接关闭
  190. print("MySQL 连接已关闭")
  191. if __name__ == '__main__':
  192. batch_load_data()