client_spider_mongo.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. # coding:utf-8
  2. import time
  3. from a2s.tools import json_serialize, json_deserialize
  4. from a2s.a2s_client import a2s_execute
  5. from docs.config import ReluMongodb
  6. from util.mogodb_helper import MongoDBInterface
  7. from pymongo import MongoClient
  8. from util.mysql_tool import MysqlUtil
  9. import json
  10. from datetime import datetime, timedelta
  11. from elasticsearch import Elasticsearch
  12. #直接给es数据打分,并将结果存入mongo库,不在拉取数据打分
  13. ReluClient = MongoDBInterface(ReluMongodb)
  14. # 评估服务配置
  15. a2s_ip = "192.168.3.240:9090"
  16. # a2s_ip = "172.17.0.11:9090"
  17. topic = "quality_bid"
  18. #本地测试用的主题
  19. # topic = "test_quality_bid"
  20. timeout = 180
  21. # 获取当前时间
  22. now = datetime.now()
  23. # current_datetime = int(now.timestamp())
  24. # 计算昨天的时间
  25. yesterday = now - timedelta(days=1)
  26. # 获取昨天早上8点的时间
  27. yesterday_8_am = datetime(yesterday.year, yesterday.month, yesterday.day, 8, 0, 0)
  28. # 转换为时间戳(秒级)
  29. current_datetime = int(yesterday_8_am.timestamp())
  30. # 时间段
  31. start_date = int(datetime(2025, 1, 23, 8, 0, 0).timestamp()) # 2025-01-20 00:00:00
  32. end_date = int(datetime(2025, 1, 23, 12, 0, 0).timestamp()) # 2025-01-20 23:59:59
  33. # ES 连接配置
  34. es_host = "http://127.0.0.1:19800"
  35. es_username = "jianyuGr"
  36. es_password = "we3g8glKfe#"
  37. # 初始化 Elasticsearch 客户端
  38. es_client = Elasticsearch(es_host,http_auth=(es_username, es_password),retry_on_timeout=True) # 使用基本认证
  39. # 开始评估
  40. def start_quality(data: dict, rules_id: int, a2s_ip, topic, timeout, retry=3):
  41. # 本次不使用SSL,所以channel是不安全的
  42. row = {"data": data, "rules_id": rules_id}
  43. bytes_data = json_serialize(row)
  44. for t in range(retry):
  45. print("topic",topic)
  46. try:
  47. resp_data = a2s_execute(a2s_ip, topic, timeout, bytes_data)
  48. if resp_data is None:
  49. continue
  50. result = json_deserialize(resp_data)
  51. return result
  52. except Exception as e:
  53. print(e)
  54. return {}
  55. # 获取规则ID
  56. def get_rule(company, version):
  57. rule_id = ReluClient.find_rule_by_company(ReluMongodb["col"], company, version)
  58. return rule_id
  59. def insert_batch_data_mongo(collection,params):
  60. """
  61. 执行批量插入数据到 MongoDB
  62. """
  63. # 将参数转换为字典列表
  64. documents=["mongoid","toptype","subtype","site","spidercode","channel","comeintime", "area","city","district","error_type","spider_modified_time","spider_important","site_important","create_time"]
  65. doc={}
  66. for indx,param in enumerate(params):
  67. doc[documents[indx]] =param
  68. print(f"{doc}数据")
  69. # 插入数据
  70. try:
  71. collection.insert_one(doc) # `ordered=False` 忽略重复数据错误
  72. print(f"{doc}数据已成功插入到 MongoDB")
  73. except Exception as e:
  74. print("插入数据时发生错误:", e)
  75. def get_last_processed_id():
  76. """
  77. 获取上次处理的最大 ID (例如从数据库或文件中读取)
  78. """
  79. # 这里假设从文件读取中断 ID,你也可以从数据库或 Redis 等存储获取
  80. try:
  81. with open('docs/last_processed_id.txt', 'r') as f:
  82. last_id = f.read().strip()
  83. if last_id:
  84. return last_id
  85. else:
  86. return None
  87. except FileNotFoundError:
  88. return None
  89. def save_last_processed_id(last_id):
  90. """
  91. 保存当前处理的最大 ID,用于恢复
  92. """
  93. with open('docs/last_processed_id.txt', 'w') as f:
  94. f.write(str(last_id))
  95. def batch_load_data():
  96. """
  97. 批量数据质量检查
  98. """
  99. # 规则查询,根据必要条件 公司名称(用户ID)、版本号
  100. rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.2")
  101. print(rules_id)
  102. # 初始化爬虫代码库
  103. collection_spider = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["lua_logs_auditor"]
  104. # 初始化爬虫config代码库
  105. collection_config = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["luaconfig"]
  106. # 初始化site代码库
  107. collection_site = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["site"]
  108. #初始化mongo库
  109. conn = MongoClient('192.168.3.149', 27180, unicode_decode_error_handler="ignore").data_quality
  110. coll_user = conn["bid_analysis"]
  111. # 获取上次处理的 ID,如果没有,则从头开始
  112. last_processed_id = get_last_processed_id()
  113. print(f"上次处理的 ID: {last_processed_id}")
  114. # 获取ES数据
  115. es_query = {
  116. "query": {
  117. "bool": {
  118. "filter": [
  119. {
  120. "range": {
  121. "comeintime": {
  122. "gte": start_date,
  123. "lt": end_date
  124. }
  125. }
  126. }
  127. ]
  128. }
  129. },
  130. "sort": [
  131. {"_id": {"order": "asc"}} # 如果 comeintime 相同,再按 _id 排序
  132. ],
  133. "size": 100 # 每次返回的数据量
  134. }
  135. # 如果有上次处理的 ID,使用 `search_after` 进行分页
  136. if last_processed_id:
  137. es_query["search_after"] = [last_processed_id] # 确保传入的是字符串类型的 _id
  138. try:
  139. # 使用 scroll API 来分批获取数据
  140. response = es_client.search(index="bidding", body=es_query, size=100)
  141. hits = response['hits']['hits']
  142. while hits:
  143. print(f"---- 批次开始 ----")
  144. max_id = None
  145. for hit in hits:
  146. item = hit["_source"]
  147. print("------一条数据开始--------")
  148. max_id = hit["_id"]
  149. print(f"正在处理数据: {max_id}")
  150. item["_id"] = str(hit["_id"])
  151. # 质量检查逻辑
  152. result = start_quality(item, rules_id, a2s_ip, topic, timeout)
  153. print(result)
  154. code = result.get("code")
  155. if code != 200:
  156. # 数据出错,跳过
  157. continue
  158. data = result.get("data", {})
  159. # 数据插入到 mongo
  160. toptype = item.get("toptype", "")
  161. subtype = item.get("subtype", "")
  162. site = item.get("site", "")
  163. spidercode = item.get("spidercode", "")
  164. channel = item.get("channel", "")
  165. comeintime = item.get("comeintime", "")
  166. area = item.get("area", "")
  167. city = item.get("city", "")
  168. district = item.get("district", "")
  169. spider_modified_time = current_datetime
  170. spider_important = False
  171. site_important = 0
  172. create_time = current_datetime
  173. info = collection_spider.find_one({"code": spidercode})
  174. if info:
  175. spider_modified_time = info.get("modifytime", "")
  176. info_config=collection_config.find_one({"code": spidercode})
  177. if info_config:
  178. spider_important = info_config.get("spiderimportant","")
  179. info_site = collection_site.find_one({"site": site})
  180. if info_site:
  181. site_important = info_site.get("important","")
  182. params = (item["_id"], toptype, subtype, site, spidercode,channel, comeintime, area, city, district, data,spider_modified_time,spider_important,site_important,create_time)
  183. insert_batch_data_mongo(coll_user, params)
  184. print("------一条数据结束------")
  185. # 保存当前批次处理的最大 ID
  186. if max_id:
  187. save_last_processed_id(max_id)
  188. print(f"保存当前处理的最大 ID: {max_id}")
  189. # 批次结束的打印信息
  190. print("---- 当前批次数据处理完成 ----")
  191. # 获取下一批数据
  192. search_after = hits[-1]["_id"] # 获取当前批次最后一条数据的 _id 作为下一批的起始点
  193. es_query["search_after"] = [search_after] # 保持 _id 类型一致
  194. response = es_client.search(index="bidding", body=es_query, size="100")
  195. hits = response['hits']['hits']
  196. # 如果没有更多数据,跳出循环
  197. if not hits:
  198. print("没有更多数据,结束批次处理")
  199. break
  200. print("数据处理完成")
  201. except Exception as e:
  202. print(f"错误: {e}")
  203. time.sleep(10)
  204. finally:
  205. if conn.is_connected():
  206. conn.close() # 确保连接关闭
  207. print("MySQL 连接已关闭")
  208. if __name__ == '__main__':
  209. batch_load_data()