123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- # -*- coding: utf-8 -*-
- """
- Created on 2024-06-18
- ---------
- @summary: 在发布时间范围内精准检索标题
- ---------
- @author: Dzr
- """
- import datetime
- import warnings
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from elasticsearch import Elasticsearch
- from pymongo import MongoClient
- import setting
- import utils.tools as tool
- from log import logger
- warnings.filterwarnings('ignore')
- def es_client():
- cfg = {
- "host": setting.ES_IP,
- "port": setting.ES_PORT,
- "usename": setting.ES_USERNAME,
- "pwd": setting.ES_PASSWORD,
- }
- hosts = [{"host": cfg['host'], "port": cfg['port']}]
- return Elasticsearch(hosts, http_auth=(cfg['usename'], cfg['pwd']))
- def es_search(title, publish_time):
- """
- 查询es
- :param str title: 标题
- :param int publish_time: 发布时间
- :return:
- """
- dt = datetime.datetime.fromtimestamp(publish_time)
- day_0am = datetime.datetime(dt.year, dt.month, dt.day)
- gte = int(day_0am.timestamp())
- lt = int(day_0am.timestamp()) + 24 * 3600
- with es_client() as client:
- body = {
- "query": {
- "bool": {
- "must": [
- {"term": {"title.mtitle": title}},
- {"range": {"publishtime": {"gte": gte, "lt": lt}}}
- ]
- }
- },
- "from": 0,
- "size": 10
- }
- result = client.search(index=setting.ES_INDEX, body=body, request_timeout=100)
- return int(result['hits']['total']['value'])
- def retrieval(item):
- ts = tool.date_to_timestamp(item["publishtime"], "%Y-%m-%d %H:%M:%S")
- try:
- es_count = es_search(title=item["title"], publish_time=ts)
- except Exception as e:
- logger.exception(e)
- es_count = 0
- item["es_count"] = es_count
- return item
- def start(threads):
- logger.info("es数据检索")
- to_mongo = MongoClient(setting.MONGO_IP, setting.MONGO_PORT)
- collection = to_mongo[setting.MONGO_DB]["ybw_list"]
- p = {"crawl_status": {"$exists": False}, "es_count": {"$exists": False}}
- with collection.find(p, no_cursor_timeout=True) as cursor:
- data_lst = [item for item in cursor]
- data_count = 0
- with ThreadPoolExecutor(max_workers=threads) as executor:
- fs = [executor.submit(retrieval, data) for data in data_lst]
- for f in as_completed(fs):
- data_count += 1
- ret = f.result()
- es_count = ret["es_count"]
- collection.update_one(
- {"_id": ret["_id"]},
- {"$set": {"es_count": es_count}}
- )
- to_mongo.close()
- logger.info(f"本次es共检索数据 {data_count} 条")
- if __name__ == '__main__':
- start(threads=1)
|