# -*- coding: utf-8 -*- """ Created on 2024-11-11 --------- @summary: --------- @author: Dzr """ import time from concurrent.futures import ThreadPoolExecutor from bson import Int64 from pymongo import MongoClient from pymongo.operations import UpdateOne import net import setting from log import logger def spider(task): _id = task.pop('_id') url = task['href'] ret = net.download_detail(url, proxies=net.get_proxy()) if ret is None: logger.error(f'详情数据|下载失败|{url}') return _id, None logger.info(f'详情数据|下载成功|{url}') data = { 'site': task['site'], 'channel': task['channel'], 'spidercode': task['spidercode'], 'area': task['area'], 'city': task['city'], 'district': task['district'], 'href': url, 'title': task['title'], 's_title': task['title'], 'contenthtml': ret['contenthtml'], 'detail': ret['detail'], 'publishtime': task['publishtime'], 'l_np_publishtime': Int64(task['l_np_publishtime']), 'comeintime': Int64(int(time.time())), 'T': task['T'], 'infoformat': task['infoformat'], 'sendflag': task['sendflag'], 'iscompete': task['iscompete'], '_d': task['_d'], 'publishdept': task['publishdept'], 'type': task['type'], 'is_mixed': task['is_mixed'], } return _id, data def main(): while True: client = MongoClient(setting.MONGO_HOST, setting.MONGO_PORT) to_lst_coll = client[setting.MONGO_DB][setting.MONGO_LIST_COLL] to_data_coll = client[setting.MONGO_DB][setting.MONGO_DATA_COLL] data_count = 0 fail_count = 0 updates = [] inserts = [] q = {'isdownload': None} with to_lst_coll.find(q, limit=100) as cursor: with ThreadPoolExecutor(max_workers=4) as executor: fs = executor.map(spider, cursor) for f in fs: _id, result = f condition = {'_id': _id} if result is None: item = {'isdownload': 1, 'isfailed': 1} fail_count += 1 else: item = {'isdownload': 1, 'isfailed': 0} inserts.append(result) data_count += 1 updates.append(UpdateOne(condition, {'$set': item})) if len(inserts) == 10: to_data_coll.insert_many(inserts, ordered=False) logger.info(f'详情数据|数据下载|成功{len(inserts)}条') inserts = [] if len(updates) == 10: to_lst_coll.bulk_write(updates, ordered=False) logger.info(f'详情数据|更新状态|完成{len(updates)}条') updates = [] if len(inserts) > 0: to_data_coll.insert_many(inserts, ordered=False) logger.info(f'详情数据|数据下载|成功{len(inserts)}条') if len(updates) > 0: to_lst_coll.bulk_write(updates, ordered=False) logger.info(f'详情数据|更新状态|完成{len(updates)}条') logger.info(f'详情数据|数据下载|10s后执行...') time.sleep(10) if __name__ == '__main__': try: main() except KeyboardInterrupt: pass except Exception as e: net.send_wechat_warning('详情采集被中止') logger.exception(e)