Эх сурвалжийг харах

新增详情采集脚本;重采爬虫添加多线程;

dzr 3 сар өмнө
parent
commit
cbc5c3fb01

+ 2 - 2
lzz_theme/zgzbtbggfwpt/detail_start.sh

@@ -7,5 +7,5 @@
 #nohup python3 zgzbtbggfwpt_details2.py > log/zgzbtbggfwpt_details2.out 2>&1 &
 #nohup python3 zgzbtbggfwpt_details3.py > log/zgzbtbggfwpt_details3.out 2>&1 &
 
-ps -ef |grep "spider_detail.py" |grep -v grep |awk '{print $2}' |xargs kill -9 2> /dev/null
-nohup python3 spider_detail.py > /dev/null &
+ps -ef |grep "zgzbtbggfwpt_details.py" |grep -v grep |awk '{print $2}' |xargs kill -9 2> /dev/null
+nohup python3 zgzbtbggfwpt_details.py > /dev/null &

+ 23 - 8
lzz_theme/zgzbtbggfwpt/spider_detail.py

@@ -7,8 +7,10 @@ import copy
 import json
 import random
 import re
+import sys
 import time
 from concurrent.futures import ThreadPoolExecutor, wait
+from pathlib import Path
 from threading import Timer
 from urllib.parse import urlencode
 
@@ -70,9 +72,11 @@ class Spider:
             '必联电子招标投标平台'
         }
 
-        self._executor = ThreadPoolExecutor(max_workers=threads)  # 线程池
+        thread_name = Path(sys.argv[0]).name.replace('.py', '')
+        self._executor = ThreadPoolExecutor(max_workers=threads,
+                                            thread_name_prefix=thread_name)  # 线程池
         self._executor_wait = wait
-        self._downloader = AttachmentDownloader()
+        self._downloader = AttachmentDownloader(max_retries=1)
         self._sizes = sizes  # 任务条数
         self._interval = interval  # 任务执行间隔
 
@@ -216,8 +220,10 @@ class Spider:
 
         return self.download_file(uid, secret_key, title, proxies=proxies)
 
-    def deal_request(self, item, max_retries=3):
+    def deal_request(self, item, proxies=None, max_retries=3, recurse=True):
         logger.debug(f'开始处理|{item["title"]}')
+        change_proxy = True if proxies is None else False
+
         item_c = copy.deepcopy(item)  # 复制对象,原始数据保存平台原始发布地址
         item_c['contenthtml'] = '详情请访问原网页!'
 
@@ -230,10 +236,12 @@ class Spider:
                 item['href'] = notice_url
                 item['bulletin_source'] = bulletin_source
                 if bulletin_source in self._ignore_sites:
-                    # 竞品不采集
+                    # 跳过不采集
                     return False
 
-                proxies = get_proxy(socks5h=True)
+                if proxies is None:
+                    proxies = get_proxy(socks5h=True)
+
                 files = self.download_pdf_by_secret_key(title, notice_url, uid, proxies)
                 if not files:
                     raise FileNotFoundError('附件下载失败!')
@@ -247,9 +255,16 @@ class Spider:
                 self.data_bak.insert_one(item_c)
                 logger.info(f"采集成功|{item['title']}")
                 return True
-            except Exception:
-                logger.error(f"采集失败|{item['title']}|重试..{i + 1}")
+            except Exception as e:
+                logger.error(f"采集失败|{item['title']}|{type(e).__name__}|重试..{i + 1}")
+                if change_proxy:
+                    proxies = None
                 time.sleep(random.randint(5, 10))
+        else:
+            if recurse is True:
+                proxies = get_QGIP()
+                callback = dict(max_retries=2, recurse=False)
+                return self.deal_request(item, proxies=proxies, **callback)
 
     @staticmethod
     def count_second():
@@ -333,4 +348,4 @@ class Spider:
 
 
 if __name__ == "__main__":
-    Spider(sizes=240, threads=3).start()
+    Spider(sizes=240, threads=2).start()

+ 3 - 2
lzz_theme/zgzbtbggfwpt/spider_detail_retry.py

@@ -18,7 +18,8 @@ class RetrySpider(Spider):
             "failed": True,
             "is_crawl": False
         }
-        with self.theme_list.find(query, limit=sizes) as cursor:
+        sort = [("_id", -1)]
+        with self.theme_list.find(query, limit=sizes, sort=sort) as cursor:
             for item in cursor:
                 if show_debug:
                     logger.debug(item)
@@ -44,4 +45,4 @@ class RetrySpider(Spider):
 
 
 if __name__ == "__main__":
-    RetrySpider(sizes=1000).start()
+    RetrySpider(sizes=10000, threads=20).start()

+ 2 - 241
lzz_theme/zgzbtbggfwpt/zgzbtbggfwpt_details.py

@@ -6,248 +6,9 @@ Created on 2024-06-24
 ---------
 @author: Lzz
 """
-import sys
-import os
 
-sys.path.append(os.path.dirname(os.getcwd()))
-import json
-from utils.attachment import AttachmentDownloader
-from threading import Timer
-from parsel import Selector
-from utils.tools import *
+from spider_detail import Spider
 
 
-
-class Details:
-
-    def __init__(self):
-        self.proxy = get_proxy()
-        self.db_table = Mongo_client().py_spider
-        self.db_name = self.db_table.theme_list
-        self.zt_details = self.db_table.data_bak
-        self.rds = Redis_client()
-        self.redis_key = "ztpc_zgzbtbggfwpt_msg"
-        self.delete_key = ""
-        self.end_state = False
-        self.headers = {
-            "Accept": "*/*",
-            "Accept-Language": "zh-CN,zh;q=0.9",
-            "Cache-Control": "no-cache",
-            "Connection": "keep-alive",
-            "Content-Length": "0",
-            "Origin": "https://bulletin.cebpubservice.com",
-            "Pragma": "no-cache",
-            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36",
-            "X-Requested-With": "XMLHttpRequest",
-        }
-
-
-    def get_time__2652(self, page=None, cid=None, rid=None):
-        with open('./zgzbtbggfwpt_pm.js', 'r') as fr:
-            ex_js = fr.read()
-        ctx = execjs.compile(ex_js)
-
-        return ctx.call('tm', page, cid, rid)
-
-    def get_type_1017(self, page):
-        with open('./ssyq.js', 'r') as fr:
-            ex_js = fr.read()
-        ctx = execjs.compile(ex_js)
-        return ctx.call('type_1017', page)
-
-    def detail_get(self, response, item, new_url):
-        response.encoding = response.apparent_encoding
-        root = Selector(text=response.text)
-
-        if "来源渠道:必联电子招标投标平台" in response.text:
-            # pdf 带 必联 水印,不入保存服务
-            item["sendflag"] = "true"
-
-        dd = root.xpath('//div[@class="mian_list_03"]/@index').extract_first()
-
-        cookies = response.cookies.get_dict()
-        headers2 = {
-            "Accept": "*/*",
-            "Accept-Language": "zh-CN,zh;q=0.9",
-            "Cache-Control": "no-cache",
-            "Connection": "keep-alive",
-            "Content-Length": "0",
-            "Origin": "https://bulletin.cebpubservice.com",
-            "Pragma": "no-cache",
-            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36",
-            "X-Requested-With": "XMLHttpRequest",
-        }
-
-        url2 = "https://bulletin.cebpubservice.com/details/permission/getSecretKey"
-        params = {
-            "time__2652": self.get_time__2652()
-        }
-        res = requests.post(url2, headers=headers2, cookies=cookies, params=params, timeout=30, proxies=self.proxy,
-                            verify=False)
-
-        ex_js = '''
-            CryptoJS = require("crypto-js")
-
-            function decryptByDES(ciphertext, key) {
-                    var keyHex = CryptoJS.enc.Utf8.parse("Ctpsp@884*");
-                    var decrypted = CryptoJS.DES.decrypt({
-                        ciphertext: CryptoJS.enc.Base64.parse(ciphertext)
-                    }, keyHex, {
-                        mode: CryptoJS.mode.ECB,
-                        padding: CryptoJS.pad.Pkcs7
-                    });
-                    return decrypted.toString(CryptoJS.enc.Utf8);
-            }
-            '''
-        ctx = execjs.compile(ex_js)
-        pm = ctx.call('decryptByDES', res.text.replace('"', ''))
-
-        item["contenthtml"] = "详情请访问原网页!"
-        item["href"] = new_url
-
-        attachments = {}
-
-        f_org = f"/details/bulletin/getBulletin/{json.loads(pm).get('data')}/{dd}"
-
-        for i in range(5):
-            f_tm = self.get_time__2652(f_org)
-            file_url = f"https://bulletin.cebpubservice.com/details/bulletin/getBulletin/{json.loads(pm).get('data')}/{dd}?time__2652={f_tm}"
-
-            attachment = AttachmentDownloader().fetch_attachment(
-                file_name=item["title"], file_type="pdf", download_url=file_url,
-                proxies=self.proxy, is_check=True)
-            if attachment.get('size'):
-                attachments[str(len(attachments) + 1)] = attachment
-                break
-            time.sleep(random.randint(3, 6))
-            self.proxy = get_proxy(socks5h=True)
-            if i == 4:
-                raise FileNotFoundError("附件下载失败!")
-
-        if attachments:
-            item['projectinfo'] = {"attachments": attachments}
-
-        item = format_fileds(item)
-
-        try:
-            self.zt_details.insert_one(item)
-            logger.info(f"[采集成功]{item['title']}-{item['publishtime']}")
-        except DuplicateKeyError:
-            logger.info(f"[重复采集]{item['title']}-{item['publishtime']}")
-
-    def decrypt_by_des(self, text: str):
-        ex_js = '''
-        CryptoJS = require("crypto-js")
-        function en_str(t) {
-            var e = CryptoJS.enc.Utf8.parse("1qaz@wsx3e")
-              , i = CryptoJS.DES.decrypt({
-                ciphertext: CryptoJS.enc.Base64.parse(t)
-            }, e, {
-                mode: CryptoJS.mode.ECB,
-                padding: CryptoJS.pad.Pkcs7
-            });
-            return i.toString(CryptoJS.enc.Utf8)
-        }
-        '''
-        ctx = execjs.compile(ex_js)
-        data_org = ctx.call('en_str', text.replace('"', ''))
-        data_org = eval(data_org.replace('true', '1').replace('false', '1').replace('null', '1'))
-        return data_org
-
-    def get_url(self, uid):
-        headers = {
-            "Accept": "application/json, text/plain, */*",
-            "Accept-Language": "zh-CN,zh;q=0.9",
-            "Cache-Control": "no-cache",
-            "Connection": "keep-alive",
-            "Pragma": "no-cache",
-            "Referer": "https://ctbpsp.com/",
-            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
-        }
-
-        url = f"https://ctbpsp.com/cutominfoapi/bulletin/{uid}/uid/0"
-
-        retry = 0
-        while (retry := retry + 1) < 5:
-            params = {
-                "type__1017": self.get_type_1017(uid)
-            }
-            try:
-                res = requests.get(url, headers=headers, params=params, proxies=get_QGIP())
-                data_org = self.decrypt_by_des(res.text.replace('"', ""))
-                break
-            except:
-                pass
-
-
-        new_href = data_org.get('data').get('noticeUrl')
-        return new_href
-
-    def fetch_request(self, url):
-        response = requests.get(url=url, headers=self.headers,
-                                proxies=self.proxy, timeout=(30, 60), verify=False)
-        return response
-
-    def deal_request(self, item):
-        response = None
-        retry_times = 0
-        org_item = item.copy()
-        while (retry_times := retry_times + 1) < 5:
-            try:
-                new_url = self.get_url(item['parse_url'])
-                response = self.fetch_request(new_url)
-                if response is not None and response.status_code == 200:
-                    self.detail_get(response, item=item, new_url=new_url)
-                    time.sleep(random.random())
-                    return True
-            except Exception as e:
-                item = org_item
-                logger.error(f"{item['href']} 采集异常:{e}")
-                time.sleep(random.randint(5, 10))
-                self.proxy = get_proxy()
-        logger.warning(f"[采集失败]{item['href']}")
-        return False
-
-    def countSec(self):
-        for count in range(10, 0, -1):
-            print(f'\r{count} 秒 后结束任务', end='')
-            time.sleep(1)
-        print('\r任务结束')
-
-    def de_redis_key(self):
-        self.end_state = True
-        self.rds.hdel(self.redis_key, self.delete_key)
-        logger.warning("当前数据未采集成功,数据已回填!")
-        self.countSec()
-
-    def start(self, limit=1):
-        logger.debug("********** 详情页采集开始 **********")
-        time.sleep(random.random())
-        count = 0
-        ts = Timer(590, self.de_redis_key)  # 声明一个定时器,设置多少s后执行
-        ts.start()  # 启动定时器
-        with self.db_name.find({"parser_name": "ztpc_zgzbtbggfwpt", "failed": False, "is_crawl": False}) as data_lsit:
-            for item in data_lsit:
-                # logger.debug(item)
-                if self.end_state:
-                    break
-                if count >= limit:
-                    break
-                unicode_key = md5value(item.get('href') + item.get('title'))
-                if not self.rds.hexists(self.redis_key, unicode_key):  # 除 动态字段 外所有字段去重
-                    self.rds.hset(self.redis_key, unicode_key, '')
-                    self.delete_key = unicode_key
-                    count += 1
-                    update_id = item["_id"]
-                    retry = item["retry"]
-                    if self.deal_request(item):
-                        self.db_name.update_one({"_id": update_id}, {"$set": {"is_crawl": True}})
-                    else:
-                        retry += 1
-                        self.db_name.update_one({"_id": update_id}, {"$set": {"failed": True, "retry": retry}})
-
-        logger.debug("********** 详情页采集结束 **********")
-        ts.cancel()  # 脚本规定时间内正常结束,取消定时器
-
 if __name__ == "__main__":
-    Details().start(limit=100)
+    Spider(sizes=1000, threads=20).start()