es_tools.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # author : zhaolongyue
  4. #date : 2023-07-03
  5. from pymongo import MongoClient
  6. from lib import mongo_tools
  7. from elasticsearch import Elasticsearch
  8. class EsUtil:
  9. @staticmethod
  10. def es_query_save(query, **kwargs):
  11. """
  12. es直接到mongo
  13. :param query:
  14. :param kwargs:
  15. :return:
  16. """
  17. coon = mongo_tools.CoonUtil.get_coon(**kwargs)
  18. es = EsUtil.get_es(**kwargs)
  19. result_all = EsUtil.get_es_result(es, query, **kwargs)
  20. # print(result_all)
  21. total = result_all['hits']['total']["value"]
  22. results = result_all['hits']['hits']
  23. scroll_id = result_all['_scroll_id']
  24. print("数据总量:", total)
  25. count = 0
  26. for i in range(0, int(total / kwargs['size']) + 1):
  27. # scroll参数必须指定否则会报错
  28. query_scroll = EsUtil.get_es_scroll(es, scroll_id, **kwargs)
  29. results += query_scroll
  30. save_bulk = []
  31. for res in results:
  32. count += 1
  33. es_result = res["_source"]
  34. save_bulk.append(es_result)
  35. results = []
  36. mongo_tools.MongoSentence.insert_many(coon, save_bulk)
  37. print(count, "数据已保存")
  38. def get_es(**kwargs):
  39. """
  40. 获取es连接
  41. :param kwargs:
  42. :return:
  43. """
  44. es = Elasticsearch(
  45. [{"host": kwargs["es_host"], "http_auth": kwargs["es_http_auth"], "port": kwargs["es_port"]}])
  46. # print(es)
  47. return es
  48. @staticmethod
  49. def get_es_count(es, query, **kwargs):
  50. """
  51. 直接查询
  52. :param es:
  53. :param query:
  54. :param kwargs:
  55. :return:
  56. """
  57. result_all_count = es.count(index=kwargs["index"], body=query, request_timeout=kwargs["timeout"])
  58. return result_all_count
  59. @staticmethod
  60. def get_es_result(es, query, **kwargs):
  61. """
  62. 直接查询
  63. :param es:
  64. :param query:
  65. :param kwargs:
  66. :return:
  67. """
  68. result_all = es.search(index=kwargs["index"], body=query, request_timeout=kwargs["timeout"],
  69. scroll='2m', size=kwargs["size"])
  70. return result_all
  71. @staticmethod
  72. def get_es_scroll(es, scroll_id, **kwargs):
  73. """
  74. 游标scroll_id
  75. :param es:
  76. :param scroll_id:
  77. :param kwargs:
  78. :return:
  79. """
  80. query_scroll = es.scroll(scroll_id=scroll_id, scroll='2m',
  81. request_timeout=kwargs["timeout"])['hits']['hits']
  82. return query_scroll
  83. esutil=EsUtil()