send_data.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-02-21
  4. ---------
  5. @summary: 数据上传(redis到mongo),采集数据同步服务
  6. ---------
  7. @author: dzr
  8. """
  9. import ast
  10. import time
  11. from concurrent.futures import ThreadPoolExecutor, wait
  12. from typing import Dict
  13. import redis
  14. from bson import int64
  15. from elasticsearch import Elasticsearch
  16. from func_timeout import func_set_timeout
  17. from func_timeout.exceptions import FunctionTimedOut
  18. from pymongo import MongoClient
  19. from redis._compat import unicode, long, basestring
  20. from redis.connection import Encoder as RedisEncoder
  21. from redis.exceptions import DataError
  22. from log import logger
  23. # mongo
  24. MONGO_HOST = "172.17.4.87"
  25. MONGO_PORT = 27080
  26. MONGO_DB = "py_spider"
  27. mcli = MongoClient(MONGO_HOST, MONGO_PORT)
  28. mongodb = mcli[MONGO_DB]
  29. # redis
  30. class Encoder(RedisEncoder):
  31. def encode(self, value):
  32. "Return a bytestring or bytes-like representation of the value"
  33. if isinstance(value, (bytes, memoryview)):
  34. return value
  35. # elif isinstance(value, bool):
  36. # # special case bool since it is a subclass of int
  37. # raise DataError(
  38. # "Invalid input of type: 'bool'. Convert to a "
  39. # "bytes, string, int or float first."
  40. # )
  41. elif isinstance(value, float):
  42. value = repr(value).encode()
  43. elif isinstance(value, (int, long)):
  44. # python 2 repr() on longs is '123L', so use str() instead
  45. value = str(value).encode()
  46. elif isinstance(value, (list, dict, tuple)):
  47. value = unicode(value)
  48. elif not isinstance(value, basestring):
  49. # a value we don't know how to deal with. throw an error
  50. typename = type(value).__name__
  51. raise DataError(
  52. "Invalid input of type: '%s'. Convert to a "
  53. "bytes, string, int or float first." % typename
  54. )
  55. if isinstance(value, unicode):
  56. value = value.encode(self.encoding, self.encoding_errors)
  57. return value
  58. REDIS_HOST = "172.17.4.232"
  59. REDIS_PORT = 7361
  60. REDISDB_USER_PASS = "k5ZJR5KV4q7DRZ92DQ"
  61. REDIS_DB = 10
  62. redis.connection.Encoder = Encoder
  63. pool = redis.ConnectionPool(
  64. host=REDIS_HOST,
  65. port=REDIS_PORT,
  66. password=REDISDB_USER_PASS,
  67. db=REDIS_DB
  68. )
  69. rcli = redis.StrictRedis(connection_pool=pool, decode_responses=True)
  70. # es
  71. ES_HOST = '172.17.145.178'
  72. ES_PORT = 9800
  73. ES_INDEX = 'biddingall'
  74. ecli = Elasticsearch([{"host": ES_HOST, "port": ES_PORT}])
  75. # 延时间隔
  76. DELAY = 43200
  77. def literal_eval(node_or_string):
  78. try:
  79. return ast.literal_eval(node_or_string)
  80. except ValueError as e:
  81. if 'malformed node or string' in e.args[0]:
  82. from bson import Code, ObjectId # eval变量作用域,ObjectId参数
  83. return eval(node_or_string)
  84. else:
  85. raise e
  86. def date2ts(date_str):
  87. """日期转时间戳"""
  88. if ":" in date_str:
  89. ts = int(time.mktime(time.strptime(date_str, "%Y-%m-%d %H:%M:%S")))
  90. else:
  91. ts = int(time.mktime(time.strptime(date_str, "%Y-%m-%d")))
  92. return ts
  93. def es_query(title, publish_time):
  94. """
  95. 查询es
  96. :param title: 标题
  97. :param publish_time: 发布时间
  98. :return:
  99. """
  100. publish_time = date2ts(publish_time)
  101. stime = publish_time - 432000 # 往前推5天
  102. etime = publish_time + 432000
  103. # 通过发布标题和发布时间范围查询
  104. query = {
  105. "query": {
  106. "bool": {
  107. "must": [
  108. {
  109. "multi_match": {
  110. "query": title,
  111. "type": "phrase",
  112. "fields": ["title"]
  113. }
  114. },
  115. {"range": {'publishtime': {"from": stime, "to": etime}}}
  116. ]
  117. }
  118. }
  119. }
  120. result = ecli.search(body=query, index=ES_INDEX, request_timeout=100)
  121. # print(result['hits']['total'])
  122. total = int(result['hits']['total'])
  123. return total
  124. def get_redis_key(table, prefix="savemongo:"):
  125. return prefix + table
  126. def rpush(name, values, is_redis_cluster=False):
  127. """“将“values”推到列表“name”的尾部”"""
  128. if isinstance(values, list):
  129. pipe = rcli.pipeline()
  130. if not is_redis_cluster:
  131. pipe.multi()
  132. for value in values:
  133. pipe.rpush(name, value)
  134. pipe.execute()
  135. else:
  136. return rcli.rpush(name, values)
  137. def insert_one(table, item: Dict):
  138. """MongoDB 单条入库"""
  139. if item is not None:
  140. item.pop('_id', '')
  141. if item.get("comeintime"):
  142. item['comeintime'] = int64.Int64(item['comeintime'])
  143. try:
  144. title = item.get('title')
  145. result = mongodb[table].insert_one(item)
  146. logger.info(f'{table}-{str(result.inserted_id)}-{title}--上传成功')
  147. except Exception as e:
  148. rpush(get_redis_key(table), item)
  149. logger.error(table + f"--推送失败,原因:{''.join(e.args)}")
  150. def sync_data(table):
  151. redis_key = get_redis_key(table)
  152. total = rcli.llen(redis_key)
  153. logger.info(f"同步表名:{table},推送总数:{total}")
  154. for _ in range(total):
  155. obj = rcli.lpop(redis_key)
  156. if obj is None:
  157. logger.warning(f'{table} 错误数据:{obj}')
  158. continue
  159. try:
  160. item = literal_eval(obj)
  161. if table != 'mgp_list':
  162. insert_one(table, item)
  163. else:
  164. title = item.get("item").get("title")
  165. # 延时推送流程
  166. if item.get("is_delay"):
  167. site = item.get("item").get("site")
  168. t_diff = int(time.time()) - item.get("comeintime")
  169. if t_diff <= DELAY:
  170. rpush(redis_key, item)
  171. logger.info(f"{site}-{title}-等待{t_diff}秒--延时入库")
  172. # es检索流程
  173. elif item.get("if_es"):
  174. pt = item.get("item").get("publishtime")
  175. if title is not None and es_query(title.strip(), pt) == 0:
  176. insert_one(table, item)
  177. else:
  178. insert_one(table, item)
  179. except Exception as e:
  180. # print(e)
  181. # print(f'{table} {type(obj)} >>>>> {repr(obj)}')
  182. rpush(redis_key, obj)
  183. def err_msg(worker):
  184. err = worker.exception()
  185. if err:
  186. logger.exception("worker err: {}".format(err))
  187. return worker
  188. @func_set_timeout(60 * 20)
  189. def main():
  190. logger.info("数据同步开始")
  191. tables = ["mgp_list", "data_bak", "spider_heartbeat", "njpc_list", "data_njpc", "listdata_err"]
  192. with ThreadPoolExecutor() as pool:
  193. futures = []
  194. for table in tables:
  195. f = pool.submit(sync_data, table)
  196. f.add_done_callback(err_msg)
  197. futures.append(f)
  198. wait(futures)
  199. logger.info("数据同步结束")
  200. if __name__ == '__main__':
  201. try:
  202. main()
  203. except FunctionTimedOut:
  204. logger.warning("数据同步超时")