123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- # -*- coding: utf-8 -*-
- """
- Created on 2024-10-10
- ---------
- @summary:
- ---------
- @author: Dzr
- """
- import json
- import time
- import bson
- from loguru import logger
- from pymongo import MongoClient
- from pymongo.operations import UpdateOne
- import net
- from clean_html import cleaner, drop_tree_by_lxml
- from pathlib import Path
- from login import auto_login, account_pool
- Int64 = bson.int64.Int64
- def setup_cfg(username):
- file = (Path(__file__).parent / f'account/{username}.json').absolute()
- with open(file, encoding='utf-8') as rp:
- json_data = json.load(rp)
- net.set_cookies(ck=json_data['cookies'])
- net.set_headers(h=json_data['headers'])
- net.set_proxies(p=json_data['proxies'])
- def bulk_update(collection, id_lst, update):
- """
- 批量更新任务状态
- :param pymongo.collection.Collection collection:
- :param id_lst:
- :param dict update:更新条件
- :return:
- """
- count = 0
- update_lst = []
- for id_ in id_lst:
- update['updatetime'] = Int64(int(time.time())) # 更新任务时间
- update_lst.append(UpdateOne({'_id': id_}, {'$set': update}))
- if len(update_lst) == 50:
- results = collection.bulk_write(update_lst, ordered=False)
- count += results.modified_count
- update_lst = []
- if len(update_lst) > 0:
- results = collection.bulk_write(update_lst, ordered=False)
- count += results.modified_count
- return count
- def finalize(insert_lst, update_dict, data_coll, lst_coll):
- if len(insert_lst) > 0:
- data_coll.insert_many(insert_lst, ordered=False)
- success_ids = update_dict['success']
- if bulk_update(lst_coll, success_ids, {'isdownload': True}):
- logger.info(f'批量更新[采集成功{len(success_ids)}条]任务状态')
- failed_ids = update_dict['failed']
- if bulk_update(lst_coll, failed_ids, {'isdownload': True, 'isfailed': True}):
- logger.info(f'批量更新[采集失败{len(failed_ids)}条]任务状态')
- def spider(username, password, task_lst, data_coll, lst_coll):
- setup_cfg(username)
- update_dict = {'success': [], 'failed': []}
- insert_lst = []
- def handle_task(task, ret):
- if len(ret) == 0:
- update_dict['failed'].append(task['_id'])
- logger.error(f'下载失败|{href}')
- else:
- html = drop_tree_by_lxml(ret['content'], '//*[contains(text(), "企业信息")]')
- insert_lst.append({
- 'site': task['site'],
- 'channel': task['channel'],
- 'spidercode': task['spidercode'],
- 'area': task['area'],
- 'city': task['city'],
- 'district': task['district'],
- 'href': '#',
- 'competehref': href,
- 'title': task['title'],
- 's_title': task['title'],
- 'contenthtml': html,
- 'detail': cleaner(html),
- 'publishtime': task['publishtime'],
- 'l_np_publishtime': task['l_np_publishtime'],
- 'comeintime': Int64(int(time.time())),
- 'T': 'bidding',
- 'infoformat': 1,
- 'sendflag': 'false',
- 'repeat': 'true',
- 'iscompete': True,
- '_d': 'comeintime',
- 'publishdept': '',
- 'type': '',
- 'is_mixed': True
- })
- update_dict['success'].append(task['_id'])
- for task in task_lst:
- href = task['href']
- ret = net.download_json(href, referer=False)
- if isinstance(ret, int) and ret == 429:
- auto_login(username, password, proxy=True, headless=False, auto_quit=True, accident_url=href)
- setup_cfg(username)
- ret = net.download_json(href, referer=False)
- if input('退出:0 继续:1\n') == '0':
- finalize(insert_lst, update_dict, data_coll, lst_coll)
- return False
- if ret is False:
- logger.error(f'账号失效|{username}')
- finalize(insert_lst, update_dict, data_coll, lst_coll)
- return False
- handle_task(task, ret)
- if len(insert_lst) == 50:
- data_coll.insert_many(insert_lst, ordered=False)
- insert_lst = []
- time.sleep(.5)
- finalize(insert_lst, update_dict, data_coll, lst_coll)
- return True
- def main():
- logger.info('**** 数据采集开始 ****')
- client = MongoClient('192.168.3.182', 27017)
- data_coll = client['zjb_poc']['jy_data_bak']
- lst_coll = client['zjb_poc']['jy_data_lst']
- try:
- while True:
- if len(account_pool) == 0:
- logger.warning('账号数量已不足,请及时补充')
- break
- # q = {'isdownload': False, 'isuse': {'$in': [4]}}
- # q = {'isdownload': False, 'isuse': {'$in': [2, 3]}}
- q = {'isdownload': False, 'is_use': 0}
- with lst_coll.find(q, limit=100) as cursor:
- task_lst = [item for item in cursor]
- username, password = account_pool.pop(0)
- auto_login(username, password, proxy=True, headless=True, auto_quit=True)
- ret = spider(username, password, task_lst, data_coll, lst_coll)
- if ret is False:
- logger.info('切换账号')
- continue
- if not lst_coll.count_documents(q):
- break
- except KeyboardInterrupt:
- pass
- finally:
- logger.info('**** 数据采集结束 ****')
- if __name__ == '__main__':
- main()
|