Jelajahi Sumber

新增列表页代理爬虫采集

dongzhaorui 1 tahun lalu
induk
melakukan
d593816917
5 mengubah file dengan 309 tambahan dan 55 penghapusan
  1. 0 17
      ybw/config/conf.yaml
  2. 0 2
      ybw/config/load.py
  3. 172 0
      ybw/list_spider2.py
  4. 106 0
      ybw/utils/RedisDB.py
  5. 31 36
      ybw/utils/socks5.py

+ 0 - 17
ybw/config/conf.yaml

@@ -1,19 +1,12 @@
-# mongo
 mongo:
   host: 172.17.4.87
   port: !!int 27080
-#  host: 127.0.0.1
-#  port: !!int 27017
 
 
-# redis
 redis:
   host: 172.17.162.28
   port: !!int 7361
   pwd: "k5ZJR5KV4q7DRZ92DQ"
-#  host: 127.0.0.1
-#  port: !!int 6379
-#  pwd: ""
   db: !!int 1
 
 
@@ -25,16 +18,6 @@ es:
   db: biddingall # es库别名
 
 
-# 阿里oss
-ali_oss:
-  key_id: LTAI4G5x9aoZx8dDamQ7vfZi
-  key_secret: Bk98FsbPYXcJe72n1bG3Ssf73acuNh
-#  endpoint: oss-cn-beijing.aliyuncs.com    # 公网使用
-  endpoint: oss-cn-beijing-internal.aliyuncs.com    # 内网使用
-  bucket_name: jy-datafile
-
-
-# 代理
 proxy:
   socks5:
     url: http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch

+ 0 - 2
ybw/config/load.py

@@ -5,7 +5,6 @@ import yaml
 __all__ = [
     'mongo_conf',
     'redis_conf',
-    'oss_conf',
     'es_conf',
     'constants',
     'headers',
@@ -26,7 +25,6 @@ with open(_yaml_conf, encoding="utf-8") as f:
     mongo_conf = conf['mongo']
     redis_conf = conf['redis']
     es_conf: dict = conf['es']
-    oss_conf: dict = conf['ali_oss']
     jy_proxy: dict = conf['proxy']
 
 

+ 172 - 0
ybw/list_spider2.py

@@ -0,0 +1,172 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2024-02-27
+---------
+@summary:
+---------
+@author: Dzr
+"""
+
+import datetime
+import time
+from collections import namedtuple
+from concurrent.futures import ThreadPoolExecutor, wait
+from threading import Thread
+from urllib.parse import urljoin
+
+import requests
+from loguru import logger
+from pymongo import MongoClient
+
+from config.load import region
+from utils.databases import int2long, es_query
+from utils.socks5 import get_proxy
+
+from utils.RedisDB import rexists, radd, RedisFilter
+
+
+def date_to_timestamp(date, time_format="%Y-%m-%d %H:%M:%S"):
+    timestamp = time.mktime(time.strptime(date, time_format))
+    return int(timestamp)
+
+
+def spider(collection, dedup, page, task):
+
+    t_name = Thread().getName()
+    headers = {
+        'Accept': 'application/json, text/javascript, */*; q=0.01',
+        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,sq;q=0.7',
+        'Cache-Control': 'no-cache',
+        'Connection': 'keep-alive',
+        'Pragma': 'no-cache',
+        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
+        'X-Requested-With': 'XMLHttpRequest',
+    }
+    page_size = 1000  # 单页大小
+    date = datetime.datetime.now().strftime('%Y-%m-%d')
+    params = {
+        'device': 'es',
+        'cpcode': 'es001',
+        'keywords': '',
+        'table_type': f'{task.table_type},',
+        'search_type': 'CONTEXT',
+        'areaid': f'{task.areaid},',
+        'categoryid': '',
+        'b_date': 'custom',
+        'time_start': date,
+        'time_end': date,
+        'page': page,
+        'rp': page_size,
+        'usrecord_id': '',
+    }
+    response = requests.get(
+        'https://www.chinabidding.cn/302e302e7379675f73736f/datax/json/gj_zbcg_daylimit',
+        params=params,
+        headers=headers,
+        proxies=get_proxy(),
+        timeout=60
+    )
+    logger.debug(response)
+    data_count = 0
+    if response.status_code == 200:
+        resp_json = response.json()
+        data_items = resp_json['result']['list']
+        logger.info(f"[{t_name}]第{page}页{len(data_items)}条数据")
+
+        for items in data_items:
+            title = items['fields']['title']
+            publish_date = items['fields']['publish_date']
+            l_np_publishtime = date_to_timestamp(publish_date)
+            url = urljoin('https://www.chinabidding.cn', items['fields']['url'])
+            if title and not rexists(dedup, url):
+                data = {
+                    "site": "元博网(采购与招标网)",
+                    "channel": task.channel,
+                    "area": task.area if task.area != '跨省' else '全国',
+                    "_d": "comeintime",
+                    "comeintime": int2long(int(time.time())),
+                    "T": "bidding",
+                    "sendflag": "false",
+                    "spidercode": task.spidercode,
+                    "city": "",
+                    "infoformat": 1,
+                    "type": "",
+                    "publishdept": "",
+                    "title": title,
+                    "competehref": url,
+                    "href": "#",
+                    "publishtime": publish_date,
+                    "l_np_publishtime": int2long(l_np_publishtime),
+                    "crawl": False
+                }
+                try:
+                    count = es_query(title, l_np_publishtime)
+                except:
+                    count = 0
+
+                data['count'] = count
+
+                collection.insert_one(data)
+                data_count += 1
+                radd(dedup, url)
+                if data_count % 100 == 0:
+                    logger.info(f"[{t_name}]已保存 {data_count} 条数据")
+
+    logger.info(f'[{t_name}]完成第{page}页数据采集')
+
+
+def get_tasks():
+    Menu = namedtuple('CrawlMenu', ['channel', 'spidercode', 'table_type'])
+    Task = namedtuple('Task', ['channel', 'spidercode', 'table_type', 'areaid', 'area'])
+    menus = [
+        Menu('政府采购', 'a_ybwcgyzbw_zfcg', '6'),
+        Menu('招标预告', 'a_ybwcgyzbw_zbyg', '5'),
+        Menu('中标公示', 'a_ybwcgyzbw_zbgs', '4'),
+        Menu('服务招标', 'a_ybwcgyzbw_fwzb', '3'),
+        Menu('货物招标', 'a_ybwcgyzbw_hwzb', '2'),
+        Menu('工程招标', 'a_ybwcgyzbw_gczb', '1'),
+    ]
+    tasks = []
+    for menu in menus:
+        for i, n in region.items():
+            tasks.append(Task(
+                **menu._asdict(),
+                areaid=i,
+                area=n
+            ))
+
+    return tasks
+
+
+def error(future):
+    err = future.exception()
+    if err:
+        logger.exception(f'[{Thread().getName()}]{err}')
+
+
+def main():
+    tasks = get_tasks()
+
+    expire_time = 86400 * 365 * 1  # 1年 = 86400 * 365 * 1
+    dedup = RedisFilter(
+        # redis_url='redis://default:top@123@192.168.3.165:8165/5',
+        redis_url='redis://:k5ZJR5KV4q7DRZ92DQ@172.17.4.240:8361/0',
+        expire_time=expire_time)  # 默认过期时间1年
+
+    to_mongodb = MongoClient('172.17.4.87', 27080)
+    collection = to_mongodb['py_spider']['ybw_list']
+
+    with ThreadPoolExecutor(max_workers=4) as tpool:
+        fs = []
+        for task in tasks:
+            f = tpool.submit(spider, collection, dedup, 1, task)
+            f.add_done_callback(error)
+            fs.append(f)
+        wait(fs)
+
+    to_mongodb.close()
+    logger.info("列表页采集结束")
+
+
+if __name__ == '__main__':
+    main()

+ 106 - 0
ybw/utils/RedisDB.py

@@ -0,0 +1,106 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2024-02-27
+---------
+@summary: redis 去重
+---------
+@author: Lzz
+"""
+import hashlib
+
+import redis
+
+
+class RedisFilter:
+    redis_db = None
+
+    def __init__(self, redis_url=None, expire_time=None):
+
+        self.__class__.redis_db = redis.StrictRedis.from_url(redis_url)  # 单机
+
+        self._ex = expire_time or 86400 * 365 * 1  # 1年 = 86400 * 365 * 1
+
+    def __repr__(self):
+        return "<RedisFilter: {}>".format(self.redis_db)
+
+    def exists(self, key):
+        """全量检索"""
+        if self.redis_db.exists(key) > 0:
+            return True
+        return False
+
+    def add(self, keys, *args, **kwargs):
+        """
+        添加数据  删除数据:redis_db.delete("pylist_" + key)
+        @param keys: 检查关键词在 redis 中是否存在,支持列表批量
+        @return: list / 单个值(如果数据已存在 返回 False 否则返回 True, 可以理解为是否添加成功)
+        """
+        is_list = isinstance(keys, list)
+        keys = keys if is_list else [keys]
+
+        is_added = []
+        for key in keys:
+            if not self.exists(key):
+                is_added.append(self.redis_db.set(key, 1, ex=self._ex))
+            else:
+                is_added.append(False)
+
+        return is_added if is_list else is_added[0]
+
+    def get(self, keys):
+        """
+        检查数据是否存在
+        @param keys: list / 单个值
+        @return: list / 单个值 (存在返回True 不存在返回False)
+        """
+        is_list = isinstance(keys, list)
+        keys = keys if is_list else [keys]
+
+        is_exist = []
+        for key in keys:
+            is_exist.append(self.exists(key))
+
+        # 判断数据本身是否重复
+        temp_set = set()
+        for i, key in enumerate(keys):
+            if key in temp_set:
+                is_exist[i] = True
+            else:
+                temp_set.add(key)
+
+        return is_exist if is_list else is_exist[0]
+
+
+def get_sha256(*args):
+    """
+    @summary: 获取唯一的64位值, 用于获取唯一的id
+    ---------
+    @param *args: 参与联合去重的值
+    ---------
+    @result: 5580c91ea29bf5bd963f4c08dfcacd983566e44ecea1735102bc380576fd6f30
+    """
+
+    sha256 = hashlib.sha256()
+    for arg in args:
+        sha256.update(str(arg).encode())
+    return sha256.hexdigest()  # 64位
+
+
+def rexists(dedup, data):
+    data = [data] if not isinstance(data, list) else data
+    args = sorted(data)
+    pykey = "pylist_" + get_sha256(*args)
+    if dedup.get(pykey):
+        ''' 存在 '''
+        return True
+    else:
+        ''' 不存在 '''
+        return False
+
+
+def radd(dedup, data):
+    data = [data] if not isinstance(data, list) else data
+    args = sorted(data)
+    pykey = "pylist_" + get_sha256(*args)
+    state = dedup.add(pykey)
+    return state

+ 31 - 36
ybw/utils/socks5.py

@@ -1,41 +1,36 @@
-import threading
-
+# -*- coding: utf-8 -*-
+"""
+Created on 2024-02-27
+---------
+@summary: socks5 proxy代理
+---------
+@author: Dzr
+"""
 import requests
 
-from utils.log import logger
 from config.load import jy_proxy
 
-__all__ = ['Proxy']
-
-
-class Socks5Proxy:
-
-    def __init__(self):
-        self._lock = threading.RLock()
-        self._enable_proxy = False
-        self._url = jy_proxy['socks5']['url']
-        self._auth = jy_proxy['socks5']['auth']
-        self._proxies = None
-
-    @property
-    def proxies(self):
-        return self._proxies
-
-    def switch(self):
-        with self._lock:
-            if self._enable_proxy:
-                self._proxies = self._fetch_proxies()
-
-    def _fetch_proxies(self):
-        proxy = requests.get(self._url, headers=self._auth).json()
-        return proxy.get("data")
-
-    def __call__(self, enable_proxy: bool = False, *args, **kwargs):
-        self._enable_proxy = enable_proxy
-        if self._enable_proxy:
-            logger.info("[加载socks5代理]")
-            self._proxies = self._fetch_proxies()
-        return self
-
 
-Proxy = Socks5Proxy()
+def get_proxy(scheme=None, default=None, socks5h=False):
+    headers = jy_proxy['socks5']['auth']
+    url = jy_proxy['socks5']['url']
+    try:
+        proxy_res = requests.get(url, headers=headers).json()
+        proxies = proxy_res.get('data')
+        if proxy_res and proxies:
+            if socks5h:
+                proxyh = {}
+                proxy_items = proxies.get("http")
+                proxyh["http"] = proxy_items.replace("socks5", "socks5h")
+                proxyh["https"] = proxy_items.replace("socks5", "socks5h")
+                proxies = proxyh
+            # print(f"切换代理:{proxies}")
+            if not scheme:
+                return proxies
+            else:
+                return proxies.get(scheme, default)
+        else:
+            print("暂无代理...")
+    except Exception:
+        pass
+    return None