#!/usr/bin/env python # -*- coding:utf-8 -*- # author : zhaolongyue #date : 2023-07-03 from pymongo import MongoClient from lib import mongo_tools from elasticsearch import Elasticsearch class EsUtil: @staticmethod def es_query_save(query, **kwargs): """ es直接到mongo :param query: :param kwargs: :return: """ coon = mongo_tools.CoonUtil.get_coon(**kwargs) es = EsUtil.get_es(**kwargs) result_all = EsUtil.get_es_result(es, query, **kwargs) # print(result_all) total = result_all['hits']['total']["value"] results = result_all['hits']['hits'] scroll_id = result_all['_scroll_id'] print("数据总量:", total) count = 0 for i in range(0, int(total / kwargs['size']) + 1): # scroll参数必须指定否则会报错 query_scroll = EsUtil.get_es_scroll(es, scroll_id, **kwargs) results += query_scroll save_bulk = [] for res in results: count += 1 es_result = res["_source"] save_bulk.append(es_result) results = [] mongo_tools.MongoSentence.insert_many(coon, save_bulk) print(count, "数据已保存") @staticmethod def get_es(**kwargs): """ 获取es连接 :param kwargs: :return: """ es = Elasticsearch( [{"host": kwargs["es_host"], "http_auth": kwargs["es_http_auth"], "port": kwargs["es_port"]}]) # print(es) return es @staticmethod def get_es_count(query, **kwargs): """ 直接查询 :param es: :param query: :param kwargs: :return: """ es = EsUtil.get_es(**kwargs) result_all_count = es.count(index=kwargs["index"], body=query, request_timeout=kwargs["timeout"]) return result_all_count @staticmethod def get_es_result(es, query, **kwargs): """ 直接查询 :param es: :param query: :param kwargs: :return: """ result_all = es.search(index=kwargs["index"], body=query, request_timeout=kwargs["timeout"], scroll='2m', size=kwargs["size"]) return result_all @staticmethod def get_es_scroll(es, scroll_id, **kwargs): """ 游标scroll_id :param es: :param scroll_id: :param kwargs: :return: """ query_scroll = es.scroll(scroll_id=scroll_id, scroll='2m', request_timeout=kwargs["timeout"])['hits']['hits'] return query_scroll esutil=EsUtil()