|
@@ -9,6 +9,7 @@ import json
|
|
|
import random
|
|
|
import re
|
|
|
import sys
|
|
|
+import threading
|
|
|
import time
|
|
|
from concurrent.futures import ThreadPoolExecutor, wait
|
|
|
from pathlib import Path
|
|
@@ -47,6 +48,9 @@ def ecb_decrypto(ciphertext, key, punctuation=None):
|
|
|
return ctx.call('decryptByDES', ciphertext, key)
|
|
|
|
|
|
|
|
|
+_lock = threading.Lock()
|
|
|
+
|
|
|
+
|
|
|
class Spider:
|
|
|
|
|
|
def __init__(self, sizes=100, threads=1, interval=0.5, ignore_sites=None):
|
|
@@ -80,10 +84,13 @@ class Spider:
|
|
|
self._executor = ThreadPoolExecutor(max_workers=threads,
|
|
|
thread_name_prefix=thread_name)
|
|
|
self._executor_wait = wait
|
|
|
- self._downloader = AttachmentDownloader()
|
|
|
+ self._downloader = AttachmentDownloader(max_retries=1)
|
|
|
self._sizes = sizes # 任务条数
|
|
|
self._interval = interval # 请求延时间隔
|
|
|
|
|
|
+ self.success = 0
|
|
|
+ self.fail = 0
|
|
|
+
|
|
|
@staticmethod
|
|
|
def get_full_href(url, method):
|
|
|
if method not in ['1017', '2652']:
|
|
@@ -277,17 +284,20 @@ class Spider:
|
|
|
|
|
|
return "".join(re.findall('uuid=(.*?)&', url))
|
|
|
|
|
|
- def deal_request(self, item, proxies=None, max_retries=3):
|
|
|
+ def deal_request(self, item, proxies=None, max_retries=3, recurse=True):
|
|
|
logger.debug(f'开始处理|{item["title"]}')
|
|
|
item_c = copy.deepcopy(item) # 复制对象,原始数据保存平台原始发布地址
|
|
|
item_c["contenthtml"] = "详情请访问原网页!"
|
|
|
|
|
|
+ change_proxy = True if proxies is None else False
|
|
|
+
|
|
|
title = item['title']
|
|
|
href = item['parse_url'] if 'parse_url' in item else item['href']
|
|
|
uuid = self.get_uuid(href, extract=True)
|
|
|
for i in range(max_retries):
|
|
|
try:
|
|
|
# notice
|
|
|
+ time.sleep(self._interval)
|
|
|
notice_url, bulletin_source, *args = self.fetch_notice(uuid)
|
|
|
item['href'] = notice_url
|
|
|
item['bulletin_source'] = bulletin_source
|
|
@@ -317,7 +327,14 @@ class Spider:
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
logger.error(f"采集失败|{item['title']}|{e.__class__.__name__}|重试..{i + 1}")
|
|
|
+ if change_proxy:
|
|
|
+ proxies = None
|
|
|
time.sleep(random.randint(5, 10))
|
|
|
+ else:
|
|
|
+ if recurse is True:
|
|
|
+ proxies = get_QGIP()
|
|
|
+ callback = dict(max_retries=2, recurse=False)
|
|
|
+ return self.deal_request(item, proxies=proxies, **callback)
|
|
|
|
|
|
@staticmethod
|
|
|
def count_second():
|
|
@@ -345,7 +362,8 @@ class Spider:
|
|
|
'is_crawl': False,
|
|
|
'failed': False,
|
|
|
}
|
|
|
- with self.theme_list.find(query, limit=sizes) as cursor:
|
|
|
+ sort = [('_id', -1)]
|
|
|
+ with self.theme_list.find(query, limit=sizes, sort=sort) as cursor:
|
|
|
for item in cursor:
|
|
|
if show_debug:
|
|
|
logger.debug(item)
|
|
@@ -362,10 +380,17 @@ class Spider:
|
|
|
retry = task['retry']
|
|
|
|
|
|
ret = self.deal_request(task, max_retries=3)
|
|
|
+ time.sleep(random.random())
|
|
|
if ret is True:
|
|
|
+ with _lock:
|
|
|
+ self.success += 1
|
|
|
+
|
|
|
update = {'is_crawl': True, 'failed': False, 'href': task['href']}
|
|
|
|
|
|
elif ret is False:
|
|
|
+ with _lock:
|
|
|
+ self.fail += 1
|
|
|
+
|
|
|
update = {
|
|
|
'is_crawl': True,
|
|
|
'failed': False,
|
|
@@ -375,6 +400,9 @@ class Spider:
|
|
|
}
|
|
|
|
|
|
else:
|
|
|
+ with _lock:
|
|
|
+ self.fail += 1
|
|
|
+
|
|
|
retry += 1
|
|
|
update = {'failed': True, 'retry': retry}
|
|
|
|
|
@@ -399,7 +427,8 @@ class Spider:
|
|
|
logger.debug("********** 详情页采集结束 **********")
|
|
|
self.shutdown_spider()
|
|
|
|
|
|
+ logger.debug(f"成功|{self.success}|失败|{self.fail}|代理成功率|{self.success / (self.success + self.fail)}")
|
|
|
+
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
- # 0/15 6-23 * * *
|
|
|
- Spider(sizes=1000, threads=4).start()
|
|
|
+ Spider(sizes=2000, threads=50).start()
|