es_tools.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # author : zhaolongyue
  4. #date : 2023-07-03
  5. from pymongo import MongoClient
  6. from lib import mongo_tools
  7. from elasticsearch import Elasticsearch
  8. class EsUtil:
  9. @staticmethod
  10. def es_query_save(query, **kwargs):
  11. """
  12. es直接到mongo
  13. :param query:
  14. :param kwargs:
  15. :return:
  16. """
  17. coon = mongo_tools.CoonUtil.get_coon(**kwargs)
  18. es = EsUtil.get_es(**kwargs)
  19. result_all = EsUtil.get_es_result(es, query, **kwargs)
  20. # print(result_all)
  21. total = result_all['hits']['total']["value"]
  22. results = result_all['hits']['hits']
  23. scroll_id = result_all['_scroll_id']
  24. print("数据总量:", total)
  25. count = 0
  26. for i in range(0, int(total / kwargs['size']) + 1):
  27. # scroll参数必须指定否则会报错
  28. query_scroll = EsUtil.get_es_scroll(es, scroll_id, **kwargs)
  29. results += query_scroll
  30. save_bulk = []
  31. for res in results:
  32. count += 1
  33. es_result = res["_source"]
  34. save_bulk.append(es_result)
  35. results = []
  36. mongo_tools.MongoSentence.insert_many(coon, save_bulk)
  37. print(count, "数据已保存")
  38. @staticmethod
  39. def get_es(**kwargs):
  40. """
  41. 获取es连接
  42. :param kwargs:
  43. :return:
  44. """
  45. es = Elasticsearch(
  46. [{"host": kwargs["es_host"], "http_auth": kwargs["es_http_auth"], "port": kwargs["es_port"]}])
  47. # print(es)
  48. return es
  49. @staticmethod
  50. def get_es_count(query, **kwargs):
  51. """
  52. 直接查询
  53. :param es:
  54. :param query:
  55. :param kwargs:
  56. :return:
  57. """
  58. es = EsUtil.get_es(**kwargs)
  59. result_all_count = es.count(index=kwargs["index"], body=query, request_timeout=kwargs["timeout"])
  60. return result_all_count
  61. @staticmethod
  62. def get_es_result(es, query, **kwargs):
  63. """
  64. 直接查询
  65. :param es:
  66. :param query:
  67. :param kwargs:
  68. :return:
  69. """
  70. result_all = es.search(index=kwargs["index"], body=query, request_timeout=kwargs["timeout"],
  71. scroll='2m', size=kwargs["size"])
  72. return result_all
  73. @staticmethod
  74. def get_es_scroll(es, scroll_id, **kwargs):
  75. """
  76. 游标scroll_id
  77. :param es:
  78. :param scroll_id:
  79. :param kwargs:
  80. :return:
  81. """
  82. query_scroll = es.scroll(scroll_id=scroll_id, scroll='2m',
  83. request_timeout=kwargs["timeout"])['hits']['hits']
  84. return query_scroll
  85. esutil=EsUtil()