client_spider.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. # coding:utf-8
  2. import time
  3. from a2s.tools import json_serialize, json_deserialize
  4. from a2s.a2s_client import a2s_execute
  5. from docs.config import ReluMongodb
  6. from util.mogodb_helper import MongoDBInterface
  7. from pymongo import MongoClient
  8. from util.mysql_tool import MysqlUtil
  9. import json
  10. from datetime import datetime, timedelta
  11. from elasticsearch import Elasticsearch
  12. ReluClient = MongoDBInterface(ReluMongodb)
  13. # 评估服务配置
  14. a2s_ip = "192.168.3.240:9090"
  15. # a2s_ip = "172.17.0.11:9090"
  16. topic = "quality_bid"
  17. #本地测试用的主题
  18. # topic = "test_quality_bid"
  19. timeout = 180
  20. # 获取当前时间
  21. now = datetime.now()
  22. current_datetime = now.strftime("%Y-%m-%d %H:%M:%S")
  23. # 获取今天的日期
  24. today = datetime.today()
  25. # 获取昨天的日期
  26. yesterday = today - timedelta(days=1)
  27. # 获取昨天0点的时间
  28. yesterday_midnight = datetime(yesterday.year, yesterday.month, yesterday.day)
  29. # 获取今天0点的时间
  30. today_midnight = datetime(today.year, today.month, today.day)
  31. # 转换为Unix时间戳
  32. start_date = int(yesterday_midnight.timestamp())
  33. end_date = int(today_midnight.timestamp())
  34. # ES 连接配置
  35. es_host = "http://127.0.0.1:19800"
  36. es_username = "jianyuGr"
  37. es_password = "we3g8glKfe#"
  38. # 初始化 Elasticsearch 客户端
  39. es_client = Elasticsearch(es_host,http_auth=(es_username, es_password),retry_on_timeout=True) # 使用基本认证
  40. # 开始评估
  41. def start_quality(data: dict, rules_id: int, a2s_ip, topic, timeout, retry=3):
  42. # 本次不使用SSL,所以channel是不安全的
  43. row = {"data": data, "rules_id": rules_id}
  44. bytes_data = json_serialize(row)
  45. for t in range(retry):
  46. print("topic",topic)
  47. try:
  48. resp_data = a2s_execute(a2s_ip, topic, timeout, bytes_data)
  49. if resp_data is None:
  50. continue
  51. result = json_deserialize(resp_data)
  52. return result
  53. except Exception as e:
  54. print(e)
  55. return {}
  56. # 获取规则ID
  57. def get_rule(company, version):
  58. rule_id = ReluClient.find_rule_by_company(ReluMongodb["col"], company, version)
  59. return rule_id
  60. def find_error_id(conn, cleaned_key, sub_value):
  61. """
  62. 查找 error_dict 中的 id
  63. """
  64. query = """SELECT id FROM error_dict WHERE fields = %s AND error = %s"""
  65. params = (cleaned_key, sub_value)
  66. result = MysqlUtil.query_data(conn, query, params)
  67. #[(10,)]
  68. # 检查查询结果是否为空
  69. if not result:
  70. print(f"Error: No matching record found for fields={cleaned_key}, error={sub_value}")
  71. return None # 或者返回一个默认值,根据需求而定
  72. record = result[0][0]
  73. return record
  74. def insert_batch_data(conn, params):
  75. """
  76. 执行批量插入数据
  77. """
  78. query = """INSERT IGNORE INTO bid_analysis (mongoid, site, spidercode, comeintime, area, city, district, score, error_type, spider_modified_time)
  79. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
  80. MysqlUtil.insert_data(conn, query, params)
  81. def insert_dynamic_error_field(conn, cleaned_key, error_ids, mongoid):
  82. """
  83. 动态插入 error_ids 到相应的 cleaned_key_error 字段
  84. """
  85. # 构造动态插入 SQL 语句,更新指定的 cleaned_key_error 字段
  86. query = f"""
  87. UPDATE bid_analysis
  88. SET {cleaned_key}_error = %s
  89. WHERE mongoid = %s
  90. """
  91. # 拼接多个 error_id,用分隔符分开
  92. error_ids_str = ','.join(map(str, error_ids))
  93. params = (error_ids_str, mongoid )
  94. MysqlUtil.update_data(conn, query, params)
  95. def get_last_processed_id():
  96. """
  97. 获取上次处理的最大 ID (例如从数据库或文件中读取)
  98. """
  99. # 这里假设从文件读取中断 ID,你也可以从数据库或 Redis 等存储获取
  100. try:
  101. with open('docs/last_processed_id.txt', 'r') as f:
  102. last_id = f.read().strip()
  103. if last_id:
  104. return last_id
  105. else:
  106. return None
  107. except FileNotFoundError:
  108. return None
  109. def save_last_processed_id(last_id):
  110. """
  111. 保存当前处理的最大 ID,用于恢复
  112. """
  113. with open('docs/last_processed_id.txt', 'w') as f:
  114. f.write(str(last_id))
  115. def batch_load_data():
  116. """
  117. 批量数据质量检查
  118. """
  119. # 规则查询,根据必要条件 公司名称(用户ID)、版本号
  120. rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.2")
  121. print(rules_id)
  122. # 初始化mysql
  123. # conn = MysqlUtil.connect_to_mysql(host='192.168.3.14', port='4000', user='DataScBi', password='DT#Sc20221123Ht',database='quality')
  124. conn = MysqlUtil.connect_to_mysql(host='192.168.3.217', port='4000', user='root', password='=PDT49#80Z!RVv52_z',database='quality')
  125. # 初始化爬虫代码库
  126. collection = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["lua_logs_auditor"]
  127. # 获取上次处理的 ID,如果没有,则从头开始
  128. last_processed_id = get_last_processed_id()
  129. print(f"上次处理的 ID: {last_processed_id}")
  130. # 获取ES数据
  131. es_query = {
  132. "query": {
  133. "bool": {
  134. "filter": [
  135. {
  136. "range": {
  137. "comeintime": {
  138. "gte": start_date,
  139. "lt": end_date
  140. }
  141. }
  142. }
  143. ]
  144. }
  145. },
  146. "sort": [
  147. {"_id": {"order": "asc"}} # 如果 comeintime 相同,再按 _id 排序
  148. ],
  149. "size": 100 # 每次返回的数据量
  150. }
  151. # 如果有上次处理的 ID,使用 `search_after` 进行分页
  152. if last_processed_id:
  153. es_query["search_after"] = [last_processed_id] # 确保传入的是字符串类型的 _id
  154. try:
  155. # 使用 scroll API 来分批获取数据
  156. response = es_client.search(index="bidding", body=es_query, size=100)
  157. hits = response['hits']['hits']
  158. while hits:
  159. print(f"---- 批次开始 ----")
  160. max_id = None
  161. for hit in hits:
  162. item = hit["_source"]
  163. print("------一条数据开始--------")
  164. max_id = hit["_id"]
  165. print(f"正在处理数据: {max_id}")
  166. item["_id"] = str(hit["_id"])
  167. # 质量检查逻辑
  168. result = start_quality(item, rules_id, a2s_ip, topic, timeout)
  169. print(result)
  170. code = result.get("code")
  171. if code != 200:
  172. # 数据出错,跳过
  173. continue
  174. data = result.get("data", {})
  175. # 数据插入到 MySQL
  176. site = item.get("site", "")
  177. spidercode = item.get("spidercode", "")
  178. comeintime = item.get("comeintime", "")
  179. comeintime = datetime.fromtimestamp(comeintime)
  180. area = item.get("area", "")
  181. city = item.get("city", "")
  182. district = item.get("district", "")
  183. score = item.get("score", "")
  184. error_type_data = json.dumps(data)
  185. spider_modified_time = current_datetime
  186. info = collection.find_one({"code": spidercode})
  187. if info:
  188. spider_modified_time = info.get("modifytime", "")
  189. spider_modified_time = datetime.fromtimestamp(spider_modified_time)
  190. params = (item["_id"], site, spidercode, comeintime, area, city, district, score, error_type_data,spider_modified_time)
  191. insert_batch_data(conn, params)
  192. # 遍历错误原因字典并提取非空字典中的值
  193. for key, value in data.items():
  194. error_ids = []
  195. if isinstance(value, dict) and value:
  196. cleaned_key = key[:-3] if key.endswith('_qa') else key # 去除 '_qa' 后缀
  197. for sub_key, sub_value in value.items():
  198. error_id = find_error_id(conn, cleaned_key, sub_value)
  199. if error_id:
  200. error_ids.append(error_id)
  201. print(f" {sub_key}: {sub_value}")
  202. # 插入错误ID到cleaned_key_error字段
  203. insert_dynamic_error_field(conn, cleaned_key, error_ids, item["_id"])
  204. print("------一条数据结束------")
  205. # 保存当前批次处理的最大 ID
  206. if max_id:
  207. save_last_processed_id(max_id)
  208. print(f"保存当前处理的最大 ID: {max_id}")
  209. # 批次结束的打印信息
  210. print("---- 当前批次数据处理完成 ----")
  211. # 获取下一批数据
  212. search_after = hits[-1]["_id"] # 获取当前批次最后一条数据的 _id 作为下一批的起始点
  213. es_query["search_after"] = [search_after] # 保持 _id 类型一致
  214. response = es_client.search(index="bidding", body=es_query, scroll="1m")
  215. hits = response['hits']['hits']
  216. # 如果没有更多数据,跳出循环
  217. if not hits:
  218. print("没有更多数据,结束批次处理")
  219. break
  220. print("数据处理完成")
  221. except Exception as e:
  222. print(f"错误: {e}")
  223. time.sleep(10)
  224. finally:
  225. if conn.is_connected():
  226. conn.close() # 确保连接关闭
  227. print("MySQL 连接已关闭")
  228. if __name__ == '__main__':
  229. batch_load_data()