Browse Source

添加多线程;添加获取自动分页;添加大周期补采[近3天|当日全量],小周期增量采集;

dzr 3 tháng trước cách đây
mục cha
commit
5595f23d64

+ 82 - 36
lzz_theme/zgzbtbggfwpt/spider_list.py

@@ -6,8 +6,11 @@ Created on 2025-04-15
 """
 import random
 import re
+import sys
 import time
 from collections import namedtuple
+from concurrent.futures import ThreadPoolExecutor, wait
+from pathlib import Path
 
 import execjs
 import requests
@@ -30,15 +33,29 @@ except AttributeError:
 
 class Spider:
 
-    def __init__(self, menus, date_kwargs=None):
-        self.py_spider = Mongo_client().py_spider
-        self.zb_list = self.py_spider.theme_list
+    def __init__(self, menus, threads=1, interval=0.5, date_items=None):
+        self.theme_list = Mongo_client()['py_spider']['theme_list']
         self.RDS = RedisFilter()
 
         self.menus = menus
-        self.kwargs = {}
-        if date_kwargs is not None:
-            self.kwargs['date_kwargs'] = date_kwargs
+        self.kwargs = {'date_items': (date_items or {})}
+
+        thread_name = Path(sys.argv[0]).name.replace('.py', '')
+        self._executor = ThreadPoolExecutor(max_workers=threads,
+                                            thread_name_prefix=thread_name)
+        self._interval = interval  # 请求延时间隔
+        self._executor_wait = wait
+        self._fs = []
+
+    def add_task(self, fn, *args, **kwargs):
+        self._fs.append(self._executor.submit(fn, *args, **kwargs))
+
+    def wait(self):
+        self._executor_wait(self._fs)
+        self._fs = []
+
+    def shutdown_spider(self):
+        self._executor.shutdown(wait=True)
 
     @staticmethod
     def get_full_href(url):
@@ -48,7 +65,7 @@ class Spider:
             return ctx.call('getFullHref')
 
     def download(self, page, menu, start_date=None, end_date=None):
-        logger.debug(f'开始采集|{menu.channel}|第{page}页')
+        logger.debug(f"网络请求|{menu.channel}|第{page}页")
         if not start_date and not end_date:
             url = "https://bulletin.cebpubservice.com/xxfbcmses/search/" + menu.rid + ".html?searchDate=2000-03-20&dates=300&word=&categoryId=" + menu.cid + "&industryName=&area=&status=&publishMedia=&sourceInfo=&showStatus=0&page=" + str(page)
         else:
@@ -60,7 +77,7 @@ class Spider:
             "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
             "Accept-Language": "zh-CN,zh;q=0.9",
             "Upgrade-Insecure-Requests": "1",
-            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
+            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
         }
 
         request_params = {
@@ -69,22 +86,20 @@ class Spider:
             "timeout": (60, 60),
             "verify": False,
         }
-
         response = requests.get(url, **request_params)
-        time.sleep(.5)
+        response.raise_for_status()
         return response
 
-    def fetch_list(self, menu, page, max_retries=3):
-        date_kwargs = self.kwargs.pop('date_kwargs', {})
-        retry_times = 0
-        while (retry_times := retry_times + 1) < max_retries:
+    def fetch_list(self, menu, page, max_retries=3, show_log=True):
+        date_items = self.kwargs["date_items"]
+        for i in range(max_retries):
             try:
-                response = self.download(page=page, menu=menu, **date_kwargs)
-                logger.debug(f"第{page}页|状态码|{response.status_code}")
-                response.raise_for_status()
-                return response
+                time.sleep(self._interval)
+                return self.download(page=page, menu=menu, **date_items)
             except Exception as e:
-                logger.error(f"访问失败|第{page}页|{e}")
+                if show_log:
+                    logger.error(f"网络请求|{menu.channel}|第{page}页|{type(e).__name__}|重试..{i + 1}")
+                time.sleep(random.random())
 
     def parse(self, html, page, menu):
         count = 0
@@ -126,33 +141,64 @@ class Spider:
                     "publishdept": "",
                     "_d": "comeintime",
                 }
-                self.zb_list.insert_one(item)
+                self.theme_list.insert_one(item)
                 self.RDS.data_save_redis(dedup)
                 count += 1
 
         logger.info(f'采集成功|{menu.channel}|第{page}页|发布{len(root[1:])}条|入库{count}条')
 
+    def _spider(self, menu, page, max_retries):
+        try:
+            response = self.fetch_list(menu, page, max_retries)
+            if response is not None:
+                html = response.content.decode()
+                self.parse(html, page, menu)
+        except Exception as why:
+            logger.error(f"采集失败|{menu.channel}|第{page}页|原因|{type(why).__name__}")
+
+    def fetch_max_pagination(self, menu):
+        logger.info(f"自动获取最大分页|{menu.channel}")
+        c = 0
+        while True:
+            try:
+                response = self.fetch_list(menu, 1, 3, show_log=False)
+                if response is not None:
+                    html = response.content.decode()
+                    pagination = Selector(html).xpath(
+                        '//div[@class="pagination"]/label/text()'
+                    ).extract_first()
+                    if pagination is not None and str(pagination).isdigit():
+                        return int(pagination)
+            except Exception as e:
+                logger.error(f"网络请求|{menu.channel}|{type(e).__name__}|重试..{c + 1}")
+
     def start(self):
         logger.debug("********** 任务开始 **********")
-        for menu in self.menus:
-            page = menu.crawl_page
-            for page in range(1, page + 1):
-                try:
-                    response = self.fetch_list(menu, page, 10)
-                    if response is not None:
-                        html = response.content.decode()
-                        self.parse(html, page, menu)
-                        time.sleep(random.random())
-                except Exception as why:
-                    logger.error(f"采集失败|{menu.channel}|第{page}页|原因|{why}")
-
-        logger.debug("********** 任务结束 **********")
+        try:
+            for menu in self.menus:
+                auto_paginate = getattr(menu, 'auto_paginate', False)
+                if not auto_paginate:
+                    max_page = menu.crawl_page
+                else:
+                    max_page = self.fetch_max_pagination(menu)
+                    if max_page <= 0:
+                        logger.warning(f"请注意栏目|{menu.channel}|数据发布量异常|最大分页小于1")
+                        continue
+
+                max_page = max(max_page, 1)
+                for page in range(1, max_page + 1):
+                    self.add_task(self._spider, menu, page, max_retries=10)
+
+            self.wait()
+        finally:
+            self.shutdown_spider()
+            logger.debug("********** 任务结束 **********")
 
 
 if __name__ == '__main__':
     Menu = namedtuple('Menu', ['channel', 'spidercode', 'cid', 'rid', 'crawl_page'])
 
-    menus = [
+    menu_lst = [
         Menu('招标公告', 'a_zgzbtbggfwpt_zbgg2', '88', 'bulletin', 1),
         Menu('更正公告公示', 'a_zgzbtbggfwpt_gzgggs2', '89', 'change', 1),
         Menu('中标结果公示', 'a_zgzbtbggfwpt_zhbjggs2', '90', 'result', 1),
@@ -160,5 +206,5 @@ if __name__ == '__main__':
         Menu('资格预审公告', 'a_zgzbtbggfwpt_zgysgg2', '92', 'qualify', 1),
     ]
 
-    # Spider(menus, date_kwargs={'start_date': '2025-04-15', 'end_date': '2025-04-15'}).start()
-    Spider(menus).start()
+    # Spider(menu_lst, threads=2, date_items={'start_date': '2025-04-28', 'end_date': '2025-04-28'}).start()
+    Spider(menu_lst, threads=2).start()

+ 21 - 0
lzz_theme/zgzbtbggfwpt/zgzbtbggfwpt_list_b.py

@@ -0,0 +1,21 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2025-04-29
+---------
+@summary: 中国招标投标公共服务平台 - 大周期补采 - 列表页[已按规范]
+"""
+from collections import namedtuple
+
+from spider_list import Spider
+
+Menu = namedtuple('Menu', ['channel', 'spidercode', 'cid', 'rid', 'auto_paginate'])
+
+if __name__ == '__main__':
+    menus = [
+        Menu('招标公告', 'a_zgzbtbggfwpt_zbgg2', '88', 'bulletin', True),
+        Menu('更正公告公示', 'a_zgzbtbggfwpt_gzgggs2', '89', 'change', True),
+        Menu('中标结果公示', 'a_zgzbtbggfwpt_zhbjggs2', '90', 'result', True),
+        Menu('中标候选人公示', 'a_zgzbtbggfwpt_zhbhxrgs2', '91', 'candidate', True),
+        Menu('资格预审公告', 'a_zgzbtbggfwpt_zgysgg2', '92', 'qualify', True),
+    ]
+    Spider(menus, threads=10, interval=0.8).start()

+ 36 - 0
lzz_theme/zgzbtbggfwpt/zgzbtbggfwpt_list_date.py

@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2025-04-29
+---------
+@summary: 中国招标投标公共服务平台 - 大周期补采[近3天] - 列表页[已按规范]
+"""
+from collections import namedtuple
+from datetime import date, timedelta
+
+from spider_list import Spider
+
+Menu = namedtuple('Menu', ['channel', 'spidercode', 'cid', 'rid', 'auto_paginate'])
+
+if __name__ == '__main__':
+    menus = [
+        Menu('招标公告', 'a_zgzbtbggfwpt_zbgg2', '88', 'bulletin', True),
+        Menu('资格预审公告', 'a_zgzbtbggfwpt_zgysgg2', '92', 'qualify', True),
+        Menu('中标候选人公示', 'a_zgzbtbggfwpt_zhbhxrgs2', '91', 'candidate', True),
+        Menu('更正公告公示', 'a_zgzbtbggfwpt_gzgggs2', '89', 'change', True),
+        Menu('中标结果公示', 'a_zgzbtbggfwpt_zhbjggs2', '90', 'result', True),
+    ]
+
+    today = date.today()
+    today_str = today.strftime("%Y-%m-%d")
+    before_day_3 = today - timedelta(days=3)
+    before_day_3_str = before_day_3.strftime("%Y-%m-%d")
+
+    Spider(
+        menus,
+        threads=10,
+        interval=1,
+        date_items={
+            'start_date': before_day_3_str,
+            'end_date': today_str
+        }
+    ).start()

+ 21 - 0
lzz_theme/zgzbtbggfwpt/zgzbtbggfwpt_list_f.py

@@ -0,0 +1,21 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2025-04-29
+---------
+@summary: 中国招标投标公共服务平台 - 列表页[已按规范]  - 小周期
+"""
+from collections import namedtuple
+
+from spider_list import Spider as Crawl_Zgzb
+
+Menu = namedtuple('Menu', ['channel', 'spidercode', 'cid', 'rid', 'crawl_page'])
+
+if __name__ == '__main__':
+    menus = [
+        Menu('招标公告', 'a_zgzbtbggfwpt_zbgg2', '88', 'bulletin', 3),
+        Menu('更正公告公示', 'a_zgzbtbggfwpt_gzgggs2', '89', 'change', 1),
+        Menu('中标结果公示', 'a_zgzbtbggfwpt_zhbjggs2', '90', 'result', 3),
+        Menu('中标候选人公示', 'a_zgzbtbggfwpt_zhbhxrgs2', '91', 'candidate', 5),
+        Menu('资格预审公告', 'a_zgzbtbggfwpt_zgysgg2', '92', 'qualify', 2),
+    ]
+    Crawl_Zgzb(menus, threads=5).start()