# -*- coding: utf-8 -*- """ Created on 2024-06-18 --------- @summary: 在发布时间范围内精准检索标题 --------- @author: Dzr """ import datetime import warnings from concurrent.futures import ThreadPoolExecutor, as_completed from elasticsearch import Elasticsearch from pymongo import MongoClient import setting import utils.tools as tool from log import logger warnings.filterwarnings('ignore') def es_client(): cfg = { "host": setting.ES_IP, "port": setting.ES_PORT, "usename": setting.ES_USERNAME, "pwd": setting.ES_PASSWORD, } hosts = [{"host": cfg['host'], "port": cfg['port']}] return Elasticsearch(hosts, http_auth=(cfg['usename'], cfg['pwd'])) def es_search(title, publish_time): """ 查询es :param str title: 标题 :param int publish_time: 发布时间 :return: """ dt = datetime.datetime.fromtimestamp(publish_time) day_0am = datetime.datetime(dt.year, dt.month, dt.day) gte = int(day_0am.timestamp()) lt = int(day_0am.timestamp()) + 24 * 3600 with es_client() as client: body = { "query": { "bool": { "must": [ {"term": {"title.mtitle": title}}, {"range": {"publishtime": {"gte": gte, "lt": lt}}} ] } }, "from": 0, "size": 10 } result = client.search(index=setting.ES_INDEX, body=body, request_timeout=100) return int(result['hits']['total']['value']) def retrieval(item): ts = tool.date_to_timestamp(item["publishtime"], "%Y-%m-%d %H:%M:%S") try: es_count = es_search(title=item["title"], publish_time=ts) except Exception as e: logger.exception(e) es_count = 0 item["es_count"] = es_count return item def start(threads): logger.info("es数据检索") to_mongo = MongoClient(setting.MONGO_IP, setting.MONGO_PORT) collection = to_mongo[setting.MONGO_DB]["ybw_list"] p = {"crawl_status": {"$exists": False}, "es_count": {"$exists": False}} with collection.find(p, no_cursor_timeout=True) as cursor: data_lst = [item for item in cursor] data_count = 0 with ThreadPoolExecutor(max_workers=threads) as executor: fs = [executor.submit(retrieval, data) for data in data_lst] for f in as_completed(fs): data_count += 1 ret = f.result() es_count = ret["es_count"] collection.update_one( {"_id": ret["_id"]}, {"$set": {"es_count": es_count}} ) to_mongo.close() logger.info(f"本次es共检索数据 {data_count} 条") if __name__ == '__main__': start(threads=1)