send_data.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-02-21
  4. ---------
  5. @summary: 数据上传(redis到mongo),采集数据同步服务
  6. ---------
  7. @author: dzr
  8. """
  9. import ast
  10. import re
  11. import time
  12. from concurrent.futures import ThreadPoolExecutor, wait
  13. from typing import Dict
  14. import redis
  15. from bson import int64
  16. from elasticsearch import Elasticsearch
  17. from func_timeout import func_set_timeout
  18. from func_timeout.exceptions import FunctionTimedOut
  19. from pymongo import MongoClient
  20. from redis._compat import unicode, long, basestring
  21. from redis.connection import Encoder as RedisEncoder
  22. from redis.exceptions import DataError
  23. from log import logger
  24. class Encoder(RedisEncoder):
  25. def encode(self, value):
  26. "Return a bytestring or bytes-like representation of the value"
  27. if isinstance(value, (bytes, memoryview)):
  28. return value
  29. # elif isinstance(value, bool):
  30. # # special case bool since it is a subclass of int
  31. # raise DataError(
  32. # "Invalid input of type: 'bool'. Convert to a "
  33. # "bytes, string, int or float first."
  34. # )
  35. elif isinstance(value, float):
  36. value = repr(value).encode()
  37. elif isinstance(value, (int, long)):
  38. # python 2 repr() on longs is '123L', so use str() instead
  39. value = str(value).encode()
  40. elif isinstance(value, (list, dict, tuple)):
  41. value = unicode(value)
  42. elif not isinstance(value, basestring):
  43. # a value we don't know how to deal with. throw an error
  44. typename = type(value).__name__
  45. raise DataError(
  46. "Invalid input of type: '%s'. Convert to a "
  47. "bytes, string, int or float first." % typename
  48. )
  49. if isinstance(value, unicode):
  50. value = value.encode(self.encoding, self.encoding_errors)
  51. return value
  52. # redis
  53. redis.connection.Encoder = Encoder
  54. REDIS_HOST = "172.17.4.232"
  55. REDIS_PORT = 7361
  56. REDISDB_USER_PASS = "k5ZJR5KV4q7DRZ92DQ"
  57. REDIS_DB = 10
  58. pool = redis.ConnectionPool(
  59. host=REDIS_HOST,
  60. port=REDIS_PORT,
  61. password=REDISDB_USER_PASS,
  62. db=REDIS_DB
  63. )
  64. rcli = redis.StrictRedis(connection_pool=pool, decode_responses=True)
  65. redis_prefix = "savemongo"
  66. # mongo
  67. MONGO_HOST = "172.17.4.87"
  68. MONGO_PORT = 27080
  69. MONGO_DB = "py_spider"
  70. mcli = MongoClient(MONGO_HOST, MONGO_PORT)
  71. mongodb = mcli[MONGO_DB]
  72. # es
  73. ES_HOST = "172.17.145.178"
  74. ES_PORT = 9200
  75. ES_INDEX = "biddingall"
  76. try:
  77. ecli = Elasticsearch([{"host": ES_HOST, "port": ES_PORT}])
  78. except ConnectionRefusedError as e:
  79. logger.error(f"es服务拒绝访问,原因:{e}")
  80. ecli = None
  81. def err_msg(worker):
  82. err = worker.exception()
  83. if err:
  84. logger.exception("[Send]worker err: {}".format(err))
  85. return worker
  86. def literal_eval(node_or_string):
  87. """反序列化数据"""
  88. try:
  89. return ast.literal_eval(node_or_string)
  90. except ValueError as e:
  91. if "malformed node or string" in e.args[0]:
  92. from bson import Code, ObjectId # eval变量作用域,ObjectId参数
  93. import datetime
  94. return eval(node_or_string)
  95. else:
  96. raise e
  97. def date2ts(date_str):
  98. """日期转时间戳"""
  99. if ":" in date_str:
  100. ts = int(time.mktime(time.strptime(date_str, "%Y-%m-%d %H:%M:%S")))
  101. else:
  102. ts = int(time.mktime(time.strptime(date_str, "%Y-%m-%d")))
  103. return ts
  104. def es_query(title, publish_time):
  105. """
  106. 查询es
  107. :param title: 标题
  108. :param publish_time: 发布时间
  109. :return:
  110. """
  111. if not ecli:
  112. return 0 # 如果es检索服务异常,保证数据正常推送
  113. publish_time = date2ts(publish_time)
  114. stime = publish_time - 432000 # 往前推5天
  115. etime = publish_time + 432000
  116. # 通过发布标题和发布时间范围查询
  117. query = {
  118. "query": {
  119. "bool": {
  120. "must": [
  121. {
  122. "multi_match": {
  123. "query": title,
  124. "type": "phrase",
  125. "fields": ["title"]
  126. }
  127. },
  128. {"range": {'publishtime': {"from": stime, "to": etime}}}
  129. ]
  130. }
  131. }
  132. }
  133. result = ecli.search(body=query, index=ES_INDEX, request_timeout=100)
  134. total = int(result["hits"]["total"]['value'])
  135. return total
  136. def rpush(name, values, is_redis_cluster=False):
  137. """“将“values”推到列表“name”的尾部”"""
  138. if isinstance(values, list):
  139. pipe = rcli.pipeline()
  140. if not is_redis_cluster:
  141. pipe.multi()
  142. for value in values:
  143. pipe.rpush(name, value)
  144. pipe.execute()
  145. else:
  146. return rcli.rpush(name, values)
  147. def handle_big_document(item):
  148. if "contenthtml" in item:
  149. item["contenthtml"] = re.sub("<img[^>]*>", "<br>", item["contenthtml"])
  150. def insert_one(table, item: Dict):
  151. """MongoDB 单条入库"""
  152. table = "".join(table.split(f"{redis_prefix}:"))
  153. if item is not None:
  154. item.pop("_id", "")
  155. if item.get("comeintime"):
  156. item["comeintime"] = int64.Int64(item["comeintime"])
  157. try:
  158. title = item.get("title")
  159. result = mongodb[table].insert_one(item)
  160. logger.info(f"[Send]{table}-{str(result.inserted_id)}-{title}--上传成功")
  161. except Exception as e:
  162. if "BSON document too large" in ''.join(e.args):
  163. handle_big_document(item) # MongoDB文档保存要求 BSON 大小限制 16 MB
  164. # rpush(get_redis_key(table), item)
  165. rpush(table, item)
  166. logger.error(f"[Send]{table}--推送失败,原因:{''.join(e.args)}")
  167. def delay_push_to_db(table_name, data, delay_time=43200):
  168. """
  169. 第三方数据,需延时入库,推送爬虫生产库
  170. @param table_name: 表名
  171. @param data: 延时的数据
  172. @param delay_time: 延时时长,单位:秒
  173. @return:
  174. """
  175. site = data.get("item").get("site")
  176. title = data.get("item").get("title")
  177. time_diff = int(time.time()) - data.get("comeintime")
  178. if time_diff <= delay_time:
  179. rpush(table_name, data)
  180. logger.info(f"[Send]{site}-{title}-等待{time_diff}秒--延时推送")
  181. else:
  182. logger.info(f"[Send]{site}-{title}-等待{time_diff}秒--延时入库")
  183. insert_one(table_name, data)
  184. return True
  185. def es_retrieval_push_to_db(table_name, data):
  186. """
  187. 通过es(近3月增量数据)进行数据去重,推送爬虫生产库
  188. @param table_name: 表名
  189. @param data: 判重数据
  190. @return:
  191. """
  192. site = data.get("item").get("site")
  193. title = data.get("item").get("title")
  194. pt = data.get("item").get("publishtime")
  195. if not title or not pt: # es检索必须提供标题和发布时间,否则数据按照垃圾数据丢弃处理
  196. return False
  197. count = es_query(title.strip(), pt)
  198. if count == 0:
  199. insert_one(table_name, data)
  200. logger.info(f"[Send]{site}-{title}-检索到{count}条--ES检索")
  201. return True
  202. def mixture_process_push_to_db(table_name, data, delay_time=43200):
  203. """
  204. 延时 + es检索 混合检索数据,推送爬虫生产库
  205. @param table_name: 表名
  206. @param data: 判重数据
  207. @param delay_time: 延时时长,单位:秒
  208. @return:
  209. """
  210. site = data.get("item").get("site")
  211. title = data.get("item").get("title")
  212. pt = data.get("item").get("publishtime")
  213. if not title and not pt: # es检索必须提供标题和发布时间,否则数据按照垃圾数据丢弃处理
  214. return False
  215. is_continue = False
  216. time_diff = int(time.time()) - data.get("comeintime")
  217. count = es_query(title.strip(), pt)
  218. if count == 0:
  219. if time_diff <= delay_time:
  220. rpush(table_name, data)
  221. else:
  222. insert_one(table_name, data)
  223. is_continue = True
  224. msg = "保持轮询检索" if is_continue else "删除重复数据"
  225. logger.info(f"[Send]{site}-{title}-{msg}--混合检索")
  226. return True
  227. def sync_data(table: str):
  228. """
  229. 保存数据
  230. @param table:
  231. @return:
  232. """
  233. redis_key = table
  234. total = rcli.llen(redis_key)
  235. logger.info(f"[Send]同步数据表名:{table},推送总数:{total}")
  236. for _ in range(total):
  237. obj = rcli.lpop(redis_key)
  238. if obj is None:
  239. logger.warning(f"[Send]{table} 错误数据:{obj}")
  240. continue
  241. try:
  242. item = literal_eval(obj)
  243. if all([not table.endswith(char) for char in ["mgp_list", "bidding"]]):
  244. insert_one(table, item)
  245. else:
  246. is_delay = item.get("is_delay") # 延时推送
  247. is_es_retrieval = item.get("if_es") # es检索
  248. if is_delay and is_es_retrieval:
  249. mixture_process_push_to_db(table, item)
  250. elif is_delay and not is_es_retrieval:
  251. delay_push_to_db(table, item)
  252. elif not is_delay and is_es_retrieval:
  253. es_retrieval_push_to_db(table, item)
  254. else:
  255. insert_one(table, item)
  256. except Exception as e:
  257. rpush(table, obj)
  258. logger.error(f"[Send]{table}--推送失败,原因:{''.join(e.args)}")
  259. @func_set_timeout(60 * 20)
  260. def main():
  261. logger.info("[Send]同步数据开始")
  262. with ThreadPoolExecutor() as threadPool:
  263. futures = []
  264. for key in rcli.keys(f"{redis_prefix}:*"):
  265. table = key.decode()
  266. f = threadPool.submit(sync_data, table)
  267. f.add_done_callback(err_msg)
  268. futures.append(f)
  269. wait(futures)
  270. logger.info("[Send]同步数据结束")
  271. if __name__ == '__main__':
  272. try:
  273. main()
  274. except FunctionTimedOut:
  275. logger.warning("[Send]同步数据超时")