123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- # -*- coding: utf-8 -*-
- """
- Created on 2024-11-11
- ---------
- @summary:
- ---------
- @author: Dzr
- """
- import time
- from concurrent.futures import ThreadPoolExecutor
- from bson import Int64
- from pymongo import MongoClient
- from pymongo.operations import UpdateOne
- import net
- import setting
- from log import logger
- def spider(task):
- _id = task.pop('_id')
- url = task['href']
- ret = net.download_detail(url, proxies=net.get_proxy())
- if ret is None:
- logger.error(f'详情数据|下载失败|{url}')
- return _id, None
- logger.info(f'详情数据|下载成功|{url}')
- data = {
- 'site': task['site'],
- 'channel': task['channel'],
- 'spidercode': task['spidercode'],
- 'area': task['area'],
- 'city': task['city'],
- 'district': task['district'],
- 'href': url,
- 'title': task['title'],
- 's_title': task['title'],
- 'contenthtml': ret['contenthtml'],
- 'detail': ret['detail'],
- 'publishtime': task['publishtime'],
- 'l_np_publishtime': Int64(task['l_np_publishtime']),
- 'comeintime': Int64(int(time.time())),
- 'T': task['T'],
- 'infoformat': task['infoformat'],
- 'sendflag': task['sendflag'],
- 'iscompete': task['iscompete'],
- '_d': task['_d'],
- 'publishdept': task['publishdept'],
- 'type': task['type'],
- 'is_mixed': task['is_mixed'],
- }
- return _id, data
- def main():
- while True:
- client = MongoClient(setting.MONGO_HOST, setting.MONGO_PORT)
- to_lst_coll = client[setting.MONGO_DB][setting.MONGO_LIST_COLL]
- to_data_coll = client[setting.MONGO_DB][setting.MONGO_DATA_COLL]
- data_count = 0
- fail_count = 0
- updates = []
- inserts = []
- q = {'isdownload': None}
- with to_lst_coll.find(q, limit=100) as cursor:
- with ThreadPoolExecutor(max_workers=4) as executor:
- fs = executor.map(spider, cursor)
- for f in fs:
- _id, result = f
- condition = {'_id': _id}
- if result is None:
- item = {'isdownload': 1, 'isfailed': 1}
- fail_count += 1
- else:
- item = {'isdownload': 1, 'isfailed': 0}
- inserts.append(result)
- data_count += 1
- updates.append(UpdateOne(condition, {'$set': item}))
- if len(inserts) == 10:
- to_data_coll.insert_many(inserts, ordered=False)
- logger.info(f'详情数据|数据下载|成功{len(inserts)}条')
- inserts = []
- if len(updates) == 10:
- to_lst_coll.bulk_write(updates, ordered=False)
- logger.info(f'详情数据|更新状态|完成{len(updates)}条')
- updates = []
- if len(inserts) > 0:
- to_data_coll.insert_many(inserts, ordered=False)
- logger.info(f'详情数据|数据下载|成功{len(inserts)}条')
- if len(updates) > 0:
- to_lst_coll.bulk_write(updates, ordered=False)
- logger.info(f'详情数据|更新状态|完成{len(updates)}条')
- logger.info(f'详情数据|数据下载|10s后执行...')
- time.sleep(10)
- if __name__ == '__main__':
- try:
- main()
- except KeyboardInterrupt:
- pass
- except Exception as e:
- net.send_wechat_warning('详情采集被中止')
- logger.exception(e)
|