ybw_esquery.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2024-06-18
  4. ---------
  5. @summary: 在发布时间范围内精准检索标题
  6. ---------
  7. @author: Dzr
  8. """
  9. import datetime
  10. import warnings
  11. from concurrent.futures import ThreadPoolExecutor, as_completed
  12. from elasticsearch import Elasticsearch
  13. from pymongo import MongoClient
  14. import setting
  15. import utils.tools as tool
  16. from log import logger
  17. warnings.filterwarnings('ignore')
  18. def es_client():
  19. cfg = {
  20. "host": setting.ES_IP,
  21. "port": setting.ES_PORT,
  22. "usename": setting.ES_USERNAME,
  23. "pwd": setting.ES_PASSWORD,
  24. }
  25. hosts = [{"host": cfg['host'], "port": cfg['port']}]
  26. return Elasticsearch(hosts, http_auth=(cfg['usename'], cfg['pwd']))
  27. def es_search(title, publish_time):
  28. """
  29. 查询es
  30. :param str title: 标题
  31. :param int publish_time: 发布时间
  32. :return:
  33. """
  34. dt = datetime.datetime.fromtimestamp(publish_time)
  35. day_0am = datetime.datetime(dt.year, dt.month, dt.day)
  36. gte = int(day_0am.timestamp())
  37. lt = int(day_0am.timestamp()) + 24 * 3600
  38. with es_client() as client:
  39. body = {
  40. "query": {
  41. "bool": {
  42. "must": [
  43. {"term": {"title.mtitle": title}},
  44. {"range": {"publishtime": {"gte": gte, "lt": lt}}}
  45. ]
  46. }
  47. },
  48. "from": 0,
  49. "size": 10
  50. }
  51. result = client.search(index=setting.ES_INDEX, body=body, request_timeout=100)
  52. return int(result['hits']['total']['value'])
  53. def retrieval(item):
  54. ts = tool.date_to_timestamp(item["publishtime"], "%Y-%m-%d %H:%M:%S")
  55. try:
  56. es_count = es_search(title=item["title"], publish_time=ts)
  57. except Exception as e:
  58. logger.exception(e)
  59. es_count = 0
  60. item["es_count"] = es_count
  61. return item
  62. def start(threads):
  63. logger.info("es数据检索")
  64. to_mongo = MongoClient(setting.MONGO_IP, setting.MONGO_PORT)
  65. collection = to_mongo[setting.MONGO_DB]["ybw_list"]
  66. p = {"crawl_status": {"$exists": False}, "es_count": {"$exists": False}}
  67. with collection.find(p, no_cursor_timeout=True) as cursor:
  68. data_lst = [item for item in cursor]
  69. data_count = 0
  70. with ThreadPoolExecutor(max_workers=threads) as executor:
  71. fs = [executor.submit(retrieval, data) for data in data_lst]
  72. for f in as_completed(fs):
  73. data_count += 1
  74. ret = f.result()
  75. es_count = ret["es_count"]
  76. collection.update_one(
  77. {"_id": ret["_id"]},
  78. {"$set": {"es_count": es_count}}
  79. )
  80. to_mongo.close()
  81. logger.info(f"本次es共检索数据 {data_count} 条")
  82. if __name__ == '__main__':
  83. start(threads=1)