es_tools.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # author : zhaolongyue
  4. #date : 2023-07-03
  5. from pymongo import MongoClient
  6. from lib import mongo_tools
  7. from elasticsearch import Elasticsearch
  8. class EsUtil:
  9. @staticmethod
  10. def es_query_save(query, **kwargs):
  11. """
  12. es直接到mongo
  13. :param query:
  14. :param kwargs:
  15. :return:
  16. """
  17. coon = mongo_tools.CoonUtil.get_coon(**kwargs)
  18. es = EsUtil.get_es(**kwargs)
  19. result_all = EsUtil.get_es_result(es, query, **kwargs)
  20. # print(result_all)
  21. total = result_all['hits']['total']["value"]
  22. results = result_all['hits']['hits']
  23. scroll_id = result_all['_scroll_id']
  24. print("数据总量:", total)
  25. count = 0
  26. for i in range(0, int(total / kwargs['size']) + 1):
  27. # scroll参数必须指定否则会报错
  28. query_scroll = EsUtil.get_es_scroll(es, scroll_id, **kwargs)
  29. results += query_scroll
  30. save_bulk = []
  31. for res in results:
  32. count += 1
  33. es_result = res["_source"]
  34. save_bulk.append(es_result)
  35. results = []
  36. mongo_tools.MongoSentence.insert_many(coon, save_bulk)
  37. print(count, "数据已保存")
  38. def get_es(**kwargs):
  39. """
  40. 获取es连接
  41. :param kwargs:
  42. :return:
  43. """
  44. es = Elasticsearch(
  45. [{"host": kwargs["es_host"], "http_auth": kwargs["es_http_auth"], "port": kwargs["es_port"]}])
  46. # print(es)
  47. return es
  48. @staticmethod
  49. def get_es_count(query, **kwargs):
  50. """
  51. 直接查询
  52. :param es:
  53. :param query:
  54. :param kwargs:
  55. :return:
  56. """
  57. es = EsUtil.get_es(**kwargs)
  58. result_all_count = es.count(index=kwargs["index"], body=query, request_timeout=kwargs["timeout"])
  59. return result_all_count
  60. @staticmethod
  61. def get_es_result(es, query, **kwargs):
  62. """
  63. 直接查询
  64. :param es:
  65. :param query:
  66. :param kwargs:
  67. :return:
  68. """
  69. result_all = es.search(index=kwargs["index"], body=query, request_timeout=kwargs["timeout"],
  70. scroll='2m', size=kwargs["size"])
  71. return result_all
  72. @staticmethod
  73. def get_es_scroll(es, scroll_id, **kwargs):
  74. """
  75. 游标scroll_id
  76. :param es:
  77. :param scroll_id:
  78. :param kwargs:
  79. :return:
  80. """
  81. query_scroll = es.scroll(scroll_id=scroll_id, scroll='2m',
  82. request_timeout=kwargs["timeout"])['hits']['hits']
  83. return query_scroll
  84. esutil=EsUtil()