浏览代码

更新数据库配置

dongzhaorui 1 年之前
父节点
当前提交
6441f8470f
共有 2 个文件被更改,包括 62 次插入66 次删除
  1. 8 9
      ybw/list_spider.py
  2. 54 57
      ybw/list_spider2.py

+ 8 - 9
ybw/list_spider.py

@@ -10,10 +10,11 @@ from lxml.html import fromstring, HtmlElement
 from config.load import crawler_url, region
 from crawler.crawl_scheduler import Scheduler
 from crawler.login import login, load_login_cookies, login_session_check
-from utils.databases import mongo_table, int2long, es_query, redis_client
+from utils.RedisDB import RedisFilter
+from utils.databases import mongo_table, int2long, es_query
 from utils.execptions import CrawlError, YbwCrawlError
 from utils.log import logger
-from utils.tools import sha1,clean_title
+from utils.tools import clean_title
 
 CrawlMenu = namedtuple('CrawlMenu', ['channel', 'spidercode', 'table_type'])
 
@@ -36,8 +37,8 @@ class ListSpider:
         self.history_user = None
         self.user = None
         self.session = None
-        self.r = redis_client()
-        self.redis_key = 'ybw_2024'
+
+        self.dedup = RedisFilter()  # 默认过期时间1年
 
     def crawl_request(self, url: str, refer: str, **kwargs):
         menu = kwargs.pop('menu')
@@ -143,13 +144,12 @@ class ListSpider:
 
         insert_items = []
         for item in results:
-            sign = sha1(item['competehref'])
-            if not self.r.hexists(self.redis_key, sign):
+            if not self.dedup.get(item['competehref']):
                 item['count'] = es_query(item["title"], item["l_np_publishtime"])
                 item['crawl'] = False
                 # print(f'>>> {title} - {competehref}')
                 insert_items.append(item)
-                self.r.hset(self.redis_key, sign, '')
+                self.dedup.add(item['competehref'])
 
         if len(insert_items) > 0:
             self.crawl_tab.insert_many(insert_items)
@@ -244,8 +244,7 @@ class ListSpider:
 
     def start(self):
         for menu in self.crawl_menus:
-            with Scheduler(site='元博网', crawl_type='list',
-                           channel=menu.channel) as scheduler:
+            with Scheduler(site='元博网', crawl_type='list', channel=menu.channel) as scheduler:
                 if scheduler.crawl_start:
                     self.user = scheduler.user
                     self.crawl_spider(scheduler, menu)

+ 54 - 57
ybw/list_spider2.py

@@ -8,40 +8,32 @@ Created on 2024-02-27
 """
 
 import datetime
-import time
 from collections import namedtuple
 from concurrent.futures import ThreadPoolExecutor, wait
-from threading import Thread
 from urllib.parse import urljoin
 
 import requests
-from loguru import logger
 from pymongo import MongoClient
 
+import setting
+import utils.tools as tools
 from config.load import region
+from utils.RedisDB import RedisFilter
 from utils.databases import int2long, es_query
+from utils.log import logger
 from utils.socks5 import get_proxy
 
-from utils.RedisDB import rexists, radd, RedisFilter
-
-
-def date_to_timestamp(date, time_format="%Y-%m-%d %H:%M:%S"):
-    timestamp = time.mktime(time.strptime(date, time_format))
-    return int(timestamp)
-
 
 def spider(collection, dedup, page, task):
+    """
+    采集器
 
-    t_name = Thread().getName()
-    headers = {
-        'Accept': 'application/json, text/javascript, */*; q=0.01',
-        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,sq;q=0.7',
-        'Cache-Control': 'no-cache',
-        'Connection': 'keep-alive',
-        'Pragma': 'no-cache',
-        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
-        'X-Requested-With': 'XMLHttpRequest',
-    }
+    @param pymongo.collection.Collection collection:
+    @param utils.RedisDB.RedisFilter dedup:
+    @param page: 页码
+    @param task: 任务
+
+    """
     page_size = 1000  # 单页大小
     date = datetime.datetime.now().strftime('%Y-%m-%d')
     params = {
@@ -59,32 +51,42 @@ def spider(collection, dedup, page, task):
         'rp': page_size,
         'usrecord_id': '',
     }
+    headers = {
+        'Accept': 'application/json, text/javascript, */*; q=0.01',
+        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,sq;q=0.7',
+        'Cache-Control': 'no-cache',
+        'Connection': 'keep-alive',
+        'Pragma': 'no-cache',
+        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
+        'X-Requested-With': 'XMLHttpRequest',
+    }
+    url = 'https://www.chinabidding.cn/302e302e7379675f73736f/datax/json/gj_zbcg_daylimit'
     response = requests.get(
-        'https://www.chinabidding.cn/302e302e7379675f73736f/datax/json/gj_zbcg_daylimit',
-        params=params,
+        url,
         headers=headers,
+        timeout=60,
+        params=params,
         proxies=get_proxy(),
-        timeout=60
     )
-    logger.debug(response)
+    logger.debug(f'第{page}页,响应状态:{response}')
     data_count = 0
     if response.status_code == 200:
         resp_json = response.json()
         data_items = resp_json['result']['list']
-        logger.info(f"[{t_name}]第{page}页{len(data_items)}条数据")
+        logger.info(f"第{page}页,检索到{len(data_items)}条数据")
 
         for items in data_items:
             title = items['fields']['title']
             publish_date = items['fields']['publish_date']
-            l_np_publishtime = date_to_timestamp(publish_date)
+            l_np_publishtime = tools.date_to_timestamp(publish_date)
             url = urljoin('https://www.chinabidding.cn', items['fields']['url'])
-            if title and not rexists(dedup, url):
+            if title and not dedup.get(url):
                 data = {
                     "site": "元博网(采购与招标网)",
                     "channel": task.channel,
                     "area": task.area if task.area != '跨省' else '全国',
                     "_d": "comeintime",
-                    "comeintime": int2long(int(time.time())),
+                    "comeintime": int2long(tools.get_current_timestamp()),
                     "T": "bidding",
                     "sendflag": "false",
                     "spidercode": task.spidercode,
@@ -97,27 +99,26 @@ def spider(collection, dedup, page, task):
                     "href": "#",
                     "publishtime": publish_date,
                     "l_np_publishtime": int2long(l_np_publishtime),
-                    "crawl": False
+                    "crawl": False  # 任务中心派发任务标识
                 }
+
                 try:
-                    count = es_query(title, l_np_publishtime)
+                    data['count'] = es_query(title, l_np_publishtime)
                 except:
-                    count = 0
-
-                data['count'] = count
+                    data['count'] = 0  # 剑鱼es检索相似数据数量
 
-                collection.insert_one(data)
+                collection.insert_one(data)  # 数据入库
+                dedup.add(url)  # 去重
                 data_count += 1
-                radd(dedup, url)
                 if data_count % 100 == 0:
-                    logger.info(f"[{t_name}]已保存 {data_count} 条数据")
+                    logger.info(f"已保存 {data_count} 条数据")
 
-    logger.info(f'[{t_name}]完成第{page}页数据采集')
+    logger.info(f'完成第{page}页数据采集')
 
 
 def get_tasks():
-    Menu = namedtuple('CrawlMenu', ['channel', 'spidercode', 'table_type'])
-    Task = namedtuple('Task', ['channel', 'spidercode', 'table_type', 'areaid', 'area'])
+    menu_field_names = ['channel', 'spidercode', 'table_type']
+    Menu = namedtuple('CrawlMenu', menu_field_names)
     menus = [
         Menu('政府采购', 'a_ybwcgyzbw_zfcg', '6'),
         Menu('招标预告', 'a_ybwcgyzbw_zbyg', '5'),
@@ -126,41 +127,37 @@ def get_tasks():
         Menu('货物招标', 'a_ybwcgyzbw_hwzb', '2'),
         Menu('工程招标', 'a_ybwcgyzbw_gczb', '1'),
     ]
+
+    task_field_names = [*menu_field_names, 'areaid', 'area']
+    Task = namedtuple('Task', task_field_names)
+
     tasks = []
     for menu in menus:
         for i, n in region.items():
-            tasks.append(Task(
-                **menu._asdict(),
-                areaid=i,
-                area=n
-            ))
+            tasks.append(Task(**menu._asdict(), areaid=i, area=n))
 
     return tasks
 
 
-def error(future):
+def spider_error(future):
     err = future.exception()
     if err:
-        logger.exception(f'[{Thread().getName()}]{err}')
+        logger.exception(f'异常线程,原因:{err}')
 
 
 def main():
     tasks = get_tasks()
+    page = 1  # 页码
 
-    expire_time = 86400 * 365 * 1  # 1年 = 86400 * 365 * 1
-    dedup = RedisFilter(
-        # redis_url='redis://default:top@123@192.168.3.165:8165/5',
-        redis_url='redis://:k5ZJR5KV4q7DRZ92DQ@172.17.4.240:8361/0',
-        expire_time=expire_time)  # 默认过期时间1年
-
-    to_mongodb = MongoClient('172.17.4.87', 27080)
-    collection = to_mongodb['py_spider']['ybw_list']
+    dedup = RedisFilter()  # 默认过期时间1年
+    to_mongodb = MongoClient(setting.MONGO_IP, setting.MONGO_PORT)
+    collection = to_mongodb[setting.MONGO_DB]['ybw_list']
 
-    with ThreadPoolExecutor(max_workers=4) as tpool:
+    with ThreadPoolExecutor(max_workers=5) as tpool:
         fs = []
         for task in tasks:
-            f = tpool.submit(spider, collection, dedup, 1, task)
-            f.add_done_callback(error)
+            f = tpool.submit(spider, collection, dedup, page, task)
+            f.add_done_callback(spider_error)
             fs.append(f)
         wait(fs)