12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # author : zhaolongyue
- #date : 2023-07-03
- from pymongo import MongoClient
- from lib import mongo_tools
- from elasticsearch import Elasticsearch
- class EsUtil:
- @staticmethod
- def es_query_save(query, **kwargs):
- """
- es直接到mongo
- :param query:
- :param kwargs:
- :return:
- """
- coon = mongo_tools.CoonUtil.get_coon(**kwargs)
- es = EsUtil.get_es(**kwargs)
- result_all = EsUtil.get_es_result(es, query, **kwargs)
- # print(result_all)
- total = result_all['hits']['total']["value"]
- results = result_all['hits']['hits']
- scroll_id = result_all['_scroll_id']
- print("数据总量:", total)
- count = 0
- for i in range(0, int(total / kwargs['size']) + 1):
- # scroll参数必须指定否则会报错
- query_scroll = EsUtil.get_es_scroll(es, scroll_id, **kwargs)
- results += query_scroll
- save_bulk = []
- for res in results:
- count += 1
- es_result = res["_source"]
- save_bulk.append(es_result)
- results = []
- mongo_tools.MongoSentence.insert_many(coon, save_bulk)
- print(count, "数据已保存")
- def get_es(**kwargs):
- """
- 获取es连接
- :param kwargs:
- :return:
- """
- es = Elasticsearch(
- [{"host": kwargs["es_host"], "http_auth": kwargs["es_http_auth"], "port": kwargs["es_port"]}])
- # print(es)
- return es
- @staticmethod
- def get_es_count(es, query, **kwargs):
- """
- 直接查询
- :param es:
- :param query:
- :param kwargs:
- :return:
- """
- result_all_count = es.count(index=kwargs["index"], body=query, request_timeout=kwargs["timeout"])
- return result_all_count
- @staticmethod
- def get_es_result(es, query, **kwargs):
- """
- 直接查询
- :param es:
- :param query:
- :param kwargs:
- :return:
- """
- result_all = es.search(index=kwargs["index"], body=query, request_timeout=kwargs["timeout"],
- scroll='2m', size=kwargs["size"])
- return result_all
- @staticmethod
- def get_es_scroll(es, scroll_id, **kwargs):
- """
- 游标scroll_id
- :param es:
- :param scroll_id:
- :param kwargs:
- :return:
- """
- query_scroll = es.scroll(scroll_id=scroll_id, scroll='2m',
- request_timeout=kwargs["timeout"])['hits']['hits']
- return query_scroll
- esutil=EsUtil()
|