|
@@ -4,7 +4,6 @@ Created on 2025-04-25
|
|
|
---------
|
|
|
@summary: 全国招标公告公示搜索引擎 - 列表页[地区]
|
|
|
"""
|
|
|
-
|
|
|
import time
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
from urllib.parse import quote
|
|
@@ -18,9 +17,6 @@ class Spider(ListSpider):
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
super(Spider, self).__init__(*args, **kwargs)
|
|
|
|
|
|
- self._province_stop_events = {} # 存储每个省份的停止事件
|
|
|
- self._province_stop_pages = {} # 存储每个省份的停止页码
|
|
|
-
|
|
|
def fetch_list(self, page, max_retries=3, **kwargs):
|
|
|
province = kwargs.pop('province')
|
|
|
quote_str = quote(province)
|
|
@@ -29,42 +25,21 @@ class Spider(ListSpider):
|
|
|
try:
|
|
|
return self.download(url, max_retries=5)
|
|
|
except (KeyError, ValueError, AttributeError, IOError) as e:
|
|
|
- logger.error(f'下载失败|{province}|第{page}页|{e.__class__.__name__}|重试..{i + 1}')
|
|
|
+ logger.error(f'下载失败|{province}|第{page}页|{type(e).__name__}|重试..{i + 1}')
|
|
|
time.sleep(3)
|
|
|
|
|
|
def spider(self, page, **kwargs):
|
|
|
province = kwargs.pop('province')
|
|
|
-
|
|
|
- # 接收信号,判端page大小,是否发送请求
|
|
|
- # stop_event = self._province_stop_events.get(province, threading.Event())
|
|
|
- stop_page = self._province_stop_pages.get(province, float('inf'))
|
|
|
-
|
|
|
- # 检查是否需要停止
|
|
|
- # if page >= stop_page or stop_event.is_set():
|
|
|
- if page >= stop_page:
|
|
|
- logger.info(f'采集中止|{province}|第{page}页')
|
|
|
- return None, page
|
|
|
-
|
|
|
try:
|
|
|
items, page_size = self.fetch_list(page, max_retries=5, province=province)
|
|
|
save_count = self.parse(items)
|
|
|
- self.success += 1
|
|
|
logger.info(f'采集成功|{province}|第{page}页|共{page_size}条|入库{save_count}条')
|
|
|
-
|
|
|
- # if save_count < page_size or page_size == 0:
|
|
|
- if page_size == 0:
|
|
|
- return False, page
|
|
|
- else:
|
|
|
- return True, page
|
|
|
+ return True, page
|
|
|
except Exception as e:
|
|
|
- logger.error(f'采集失败|{province}|第{page}页|{e.__class__.__name__}')
|
|
|
- self.fail += 1
|
|
|
+ logger.error(f'采集失败|{province}|第{page}页|{type(e).__name__}')
|
|
|
+ return False, page
|
|
|
|
|
|
def worker(self, province, max_page):
|
|
|
- # 初始化省份的停止事件和停止页码
|
|
|
- # self._province_stop_events[province] = threading.Event()
|
|
|
- self._province_stop_pages[province] = float('inf')
|
|
|
-
|
|
|
with ThreadPoolExecutor(max_workers=1, thread_name_prefix='area') as executor:
|
|
|
fs = []
|
|
|
for page in range(1, max_page + 1):
|
|
@@ -72,21 +47,14 @@ class Spider(ListSpider):
|
|
|
|
|
|
for f in as_completed(fs):
|
|
|
try:
|
|
|
- result = f.result()
|
|
|
- if result is not None and result[0] is False:
|
|
|
- if result[1] < self._province_stop_pages[province]:
|
|
|
- # 发送页码,通知其它线程准备中止采集
|
|
|
- self._province_stop_pages[province] = result[1]
|
|
|
- # 发送信号, 通知其它线程准备中止采集
|
|
|
- # if not self._province_stop_events[province].is_set():
|
|
|
- # self._province_stop_events[province].set()
|
|
|
+ state, _ = f.result()
|
|
|
+ if state is True:
|
|
|
+ self.success += 1
|
|
|
+ else:
|
|
|
+ self.fail += 1
|
|
|
|
|
|
except Exception as e:
|
|
|
- logger.error(f"执行任务时出现异常|{type(e).__name__}|{e}")
|
|
|
-
|
|
|
- # 清除已完成省份的停止事件和停止页码
|
|
|
- del self._province_stop_pages[province]
|
|
|
- del self._province_stop_events[province]
|
|
|
+ logger.error(f'执行任务时出现异常|{type(e).__name__}|{e}')
|
|
|
|
|
|
def start(self):
|
|
|
logger.debug("********** 列表页开始 **********")
|
|
@@ -111,6 +79,8 @@ class Spider(ListSpider):
|
|
|
self.shutdown_spider()
|
|
|
logger.debug("********** 列表页结束 **********")
|
|
|
|
|
|
+ logger.debug(f"成功|{self.success}|失败|{self.fail}|代理成功率|{self.success / (self.success + self.fail)}")
|
|
|
+
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
- Spider(pages=100, threads=2, interval=0.4).start()
|
|
|
+ Spider(pages=100, threads=5, interval=0.4).start()
|