Browse Source

更换去重redis

dzr 4 months ago
parent
commit
8a69c7f396
6 changed files with 194 additions and 87 deletions
  1. 85 0
      db/RedisDB.py
  2. 9 0
      db/__init__.py
  3. 12 1
      docker-compose.yml
  4. 40 35
      news_detail.py
  5. 37 25
      news_list.py
  6. 11 26
      tools.py

+ 85 - 0
db/RedisDB.py

@@ -0,0 +1,85 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2024-02-27
+---------
+@summary: redis 去重
+---------
+@author: Lzz
+"""
+import hashlib
+
+import redis
+
+
+class RedisFilter:
+
+    def __init__(self, url, expire_time=None):
+        self.redis_db = redis.StrictRedis.from_url(url)
+        self._ex = expire_time or 86400 * 365 * 1  # 1年 = 86400 * 365 * 1
+
+    def __repr__(self):
+        return "<RedisFilter: {}>".format(self.redis_db)
+
+    def exists(self, key):
+        """全量检索"""
+        if self.redis_db.exists(key) > 0:
+            return True
+        return False
+
+    def add(self, keys):
+        """
+        添加数据
+
+        @param keys: 检查关键词在 redis 中是否存在,支持列表批量
+        @return: list / 单个值(添加失败返回False, 添加成功返回True)
+        """
+        is_list = isinstance(keys, list)
+        keys = keys if is_list else [keys]
+
+        is_added = []
+        for key in keys:
+            pkey = "baidu_" + self.fingerprint(key)
+            if not self.exists(pkey):
+                is_added.append(self.redis_db.set(pkey, 1, ex=self._ex))
+            else:
+                is_added.append(False)
+
+        return is_added if is_list else is_added[0]
+
+    def get(self, keys):
+        """
+        检查数据是否存在
+        @param keys: list / 单个值
+        @return: list / 单个值 (存在返回True 不存在返回False)
+        """
+        is_list = isinstance(keys, list)
+        keys = keys if is_list else [keys]
+
+        is_exist = []
+        for key in keys:
+            pkey = "baidu_" + self.fingerprint(key)
+            is_exist.append(self.exists(pkey))
+
+        # 判断数据本身是否重复
+        temp_set = set()
+        for i, key in enumerate(keys):
+            if key in temp_set:
+                is_exist[i] = True
+            else:
+                temp_set.add(key)
+
+        return is_exist if is_list else is_exist[0]
+
+    def fingerprint(self, *args):
+        """
+        @summary: 获取唯一的64位值,获取唯一数据指纹
+        ---------
+        @param args: 去重数据集合
+        ---------
+        @result: 5580c91ea29bf5bd963f4c08dfcacd983566e44ecea1735102bc380576fd6f30
+        """
+        args = sorted(args)
+        sha256 = hashlib.sha256()
+        for arg in args:
+            sha256.update(str(arg).encode())
+        return sha256.hexdigest()

+ 9 - 0
db/__init__.py

@@ -0,0 +1,9 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2025-03-18 
+---------
+@summary:  
+---------
+@author: Dzr
+"""
+from .RedisDB import RedisFilter

+ 12 - 1
docker-compose.yml

@@ -7,7 +7,6 @@ services:
       - /mnt/news_baidu:/mnt
     restart: always
     privileged: true
-    shm_size: 4GB
     logging:
       driver: "json-file"
       options:
@@ -18,6 +17,8 @@ services:
         limits:
           memory: 4G
     command: 'python3 news_list.py'
+    networks:
+      - baidu_network
 
   crawl-detail:
     container_name: baidu_detail
@@ -37,3 +38,13 @@ services:
         limits:
           memory: 10G
     command: 'python3 news_detail.py'
+    networks:
+      - baidu_network
+
+networks:
+  baidu_network:
+    driver: bridge
+    ipam:
+      driver: default
+      config:
+        - subnet: 172.21.0.0/16

+ 40 - 35
news_detail.py

@@ -6,16 +6,19 @@ from concurrent.futures import ThreadPoolExecutor, wait
 from urllib.parse import urlparse
 
 import httpx
+import requests
 import urllib3
 from gne import GeneralNewsExtractor
 from loguru import logger
 
-from tools import news_list, news_detail, UserAgent, requests
+from tools import news_list_coll, news_detail_coll
+from tools import ua
 
 warnings.simplefilter("ignore", UserWarning)
 urllib3.disable_warnings()
+
+# gne
 extractor = GeneralNewsExtractor()
-UA = UserAgent()
 
 
 def extract_chinese(text):
@@ -23,28 +26,28 @@ def extract_chinese(text):
     return True if re.findall(pattern, text) else False
 
 
-def date_to_timestamp(pubulishtime):
-    timeArray = time.strptime(pubulishtime, "%Y-%m-%d")
-    timestamp = int(time.mktime(timeArray))
-    return timestamp
-
+def date_to_timestamp(publish_time):
+    time_array = time.strptime(publish_time, "%Y-%m-%d")
+    return int(time.mktime(time_array))
 
-def httpx_get_url(info):
-    info["url"] = info["url"] if str(info["url"]).count("https") else str(info["url"]).replace("http", "https")
 
+def get_detail_by_httpx(info):
+    url = info["url"] if str(info["url"]).count("https") else str(info["url"]).replace("http", "https")
     headers = {
         "Accept": "text/html,application/xhtml+xml, application/xml;q=0.9, image/avif, image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
         "Accept-Language": "zh-CN,zh;q=0.9",
         "Cache-Control": "no-cache",
         "Connection": "keep-alive",
-        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36",
+        "User-Agent": ua.random,
     }
 
-    req = httpx.get(str(info["url"]), timeout=10, headers=headers)
+    req = httpx.get(str(url), timeout=10, headers=headers)
     if req.status_code == 200:
+        html = req.content.decode()
+        result = extractor.extract(html, with_body_html=False)
+        url_parser = urlparse(url)
+
         item = {}
-        res = urlparse(info["url"])
-        result = extractor.extract(req.text, with_body_html=False)
         item["title"] = result["title"]
         try:
             item["list_title"] = info["list_title"]
@@ -52,35 +55,37 @@ def httpx_get_url(info):
             pass
 
         item["detail"] = result["content"]
-        item["contenthtml"] = req.text
+        item["contenthtml"] = html
         new_pubulishtime = str(result["publish_time"]).split(" ")[0].replace("年", "-").replace("月", "-").replace("日","").replace( "/", "-")
         item["pubulishtime"] = new_pubulishtime.split("T")[0] if len(new_pubulishtime) > 11 else new_pubulishtime
-        item["infourl"] = info["url"]
-        item["domain"] = res.netloc
+        item["infourl"] = url
+        item["domain"] = url_parser.netloc
         item["searchwords"] = info["searchwords"]
         item["searchengine"] = "baidu"
         item["comeintime"] = int(time.time())
-        # item["project"] = info["project"]
         item["type"] = True
-        news_detail.insert_one(item)
+        news_detail_coll.insert_one(item)
         logger.info(f"下载信息: {item['title']}")
 
 
-def get_url(info):
+def get_detail_by_requests(info):
+    url = info["url"]
     headers = {
         "Accept": "application/json",
         "Accept-Language": "zh-CN,zh;q=0.9",
         "Cache-Control": "no-cache",
         "Connection": "keep-alive",
-        "User-Agent": UA.random,
+        "User-Agent": ua.random,
     }
     try:
-        req = requests.get(info["url"],  headers=headers, timeout=10, verify=False)
+        req = requests.get(url, headers=headers, timeout=10, verify=False)
         req.encoding = req.apparent_encoding
         if req.status_code == 200:
+            url_parser = urlparse(url)
+            html = req.content.decode()
+            result = extractor.extract(html, with_body_html=False)
+
             item = {}
-            res = urlparse(info["url"])
-            result = extractor.extract(req.text, with_body_html=False)
             item["title"] = result["title"]
             try:
                 item["list_title"] = info["list_title"]
@@ -88,17 +93,17 @@ def get_url(info):
                 pass
 
             item["detail"] = result["content"]
-            item["contenthtml"] = req.text
+            item["contenthtml"] = html
             new_pubulishtime = str(result["publish_time"]).split(" ")[0].replace("年", "-").replace("月", "-").replace("日","").replace("/", "-")
             item["pubulishtime"] = new_pubulishtime.split("T")[0] if len(new_pubulishtime) > 11 else new_pubulishtime
-            item["infourl"] = info["url"]
-            item["domain"] = res.netloc
+            item["infourl"] = url
+            item["domain"] = url_parser.netloc
             item["searchwords"] = info["searchwords"]
             item["searchengine"] = "baidu"
             item["comeintime"] = int(time.time())
             item["site"] = info["site"]
             item["type"] = True
-            news_detail.insert_one(item)
+            news_detail_coll.insert_one(item)
             logger.info(f"下载信息:{item['title']}")
     except:
         logger.error(f"下载失败:{info['list_title']}")
@@ -106,29 +111,29 @@ def get_url(info):
 
 def run(task):
     if task["url"].count("baijiahao"):
-        httpx_get_url(task)
+        get_detail_by_httpx(task)
     else:
-        get_url(task)
+        get_detail_by_requests(task)
 
-    news_list.delete_one({"_id": task["_id"]})
+    news_list_coll.delete_one({"_id": task["_id"]})
 
 
-def spider(workers=1):
+def spider(workers):
     with ThreadPoolExecutor(max_workers=workers) as p:
-        fs = [p.submit(run, task) for task in news_list.find()]
+        fs = [p.submit(run, task) for task in news_list_coll.find()]
         wait(fs)
 
 
 def Isvalid():
     q = {"isvalid": {"$exists": 0}}
     f = {"contenthtml": 0, "detail": 0}
-    with news_detail.find(q, projection=f, no_cursor_timeout=True) as cursor:
+    with news_detail_coll.find(q, projection=f, no_cursor_timeout=True) as cursor:
         for info in cursor:
             # 标题乱码
             seqs = ["...", "…"]
             title = info["title"]
             if list(filter(lambda x: x in seqs, title)) or not extract_chinese(title):
-                news_detail.update_one({"_id": info["_id"]}, {"$set": {"title": info["list_title"]}})
+                news_detail_coll.update_one({"_id": info["_id"]}, {"$set": {"title": info["list_title"]}})
 
             # 发布时间大于2023/7/1
             isvalid = False
@@ -138,7 +143,7 @@ def Isvalid():
             except ValueError:
                 pass
 
-            news_detail.update_one({"_id": info["_id"]}, {"$set": {"isvalid": isvalid}})
+            news_detail_coll.update_one({"_id": info["_id"]}, {"$set": {"isvalid": isvalid}})
             logger.info(f"数据校验:{info['title']}")
 
 

+ 37 - 25
news_list.py

@@ -4,13 +4,16 @@ import time
 import uuid
 
 import httpx
+import requests
 from loguru import logger
 from lxml.html import fromstring
 
-from tools import client, news_list, get_proxy, r, redis_key, sha1, ua
+from tools import dedup
+from tools import get_proxy, ua
+from tools import news_keyword_coll, news_list_coll
 
 
-def analysis_info(site, page, searchword, response):
+def analysis_info(site, page, search_word, response):
     data_count = 0
 
     select_lst = []
@@ -21,29 +24,28 @@ def analysis_info(site, page, searchword, response):
     for elem in select_lst:
         title = str(elem.xpath("./@aria-label")[0]).replace("标题:", "")
         url = elem.xpath("./@href")[0]
-        href_sign = sha1(url)
-        if not r.hexists(redis_key, href_sign):
-            items = dict(
+        if not dedup.get(url):
+            item = dict(
                 _id=str(uuid.uuid4()),
                 url=url,
                 list_title=title,
                 searchengine="baidu",
-                searchwords=searchword,
+                searchwords=search_word,
                 site=site,
             )
-            news_list.insert_one(items)
-            r.hset(redis_key, href_sign, 1)
+            news_list_coll.insert_one(item)
+            dedup.add(url)
             data_count += 1
 
     tips = [
-        f"第{page}页--{searchword}",
+        f"第{page}页--{search_word}",
         f"采集量:{len(select_lst)}",
         f"入库量:{data_count}"
     ]
     logger.info(",".join(tips))
 
 
-def get_url(key, page_num, follow_redirects=False):
+def get_list_response(key, page_num, follow_redirects=False):
     url = "https://www.baidu.com/s"
     headers = {
         "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
@@ -65,37 +67,47 @@ def get_url(key, page_num, follow_redirects=False):
         "rsv_dl": "news_b_pn",
         "pn": page_num * 20
     }
+
+    try:
+        proxy = get_proxy()
+    except requests.RequestException as e:
+        logger.error(f'HTTP Proxy Exception for {e}')
+        raise e
+
     request_kwargs = dict(
         headers=headers,
         params=params,
         timeout=10,
-        follow_redirects=follow_redirects
+        proxy=proxy,
+        follow_redirects=follow_redirects  # 百度图形验证,302跳转
     )
     try:
-        proxies = get_proxy()
-        return httpx.get(url, proxies=proxies, **request_kwargs)
-    except Exception as e:
-        logger.exception(e)
+        response = httpx.get(url, **request_kwargs)
+        response.raise_for_status()
+        return response
+    except httpx.HTTPError as exc:
+        logger.error(f"HTTP Exception for {exc.request.url} - {exc}")
 
 
 def baidu_search(document):
-    searchwords = document["key"]
+    titles = document["key"]
     site = document["site"]
-
     for pn in range(0, 4):
-        response = get_url(f"intitle:{searchwords}", pn)
-        analysis_info(site, pn + 1, searchwords, response)
+        try:
+            response = get_list_response(f"intitle:{titles}", pn)
+            analysis_info(site, pn + 1, titles, response)
+        except Exception:
+            break
 
-    client.update_one({"_id": document["_id"]}, {"$set": {"down": 1}})
+    news_keyword_coll.update_one({"_id": document["_id"]}, {"$set": {"down": 1}})
 
 
 def start():
-    client.update_many({}, {"$unset": {"down": ""}})  # 重置全部 down 字段
-    search_items = [item for item in client.find({"down": {"$exists": 0}})]
-
+    news_keyword_coll.update_many({}, {"$unset": {"down": ""}})  # 重置全部 down 字段
+    search_items = [item for item in news_keyword_coll.find({"down": {"$exists": 0}})]
     while search_items:
-        items = search_items.pop(0)
-        baidu_search(items)
+        item = search_items.pop(0)
+        baidu_search(item)
 
 
 if __name__ == '__main__':

+ 11 - 26
tools.py

@@ -1,45 +1,30 @@
 # coding: utf-8
 
-import hashlib
 import logging
 
-import redis
 import requests
 from fake_useragent import UserAgent
 from pymongo import MongoClient
 
+from db import RedisFilter
+
+# UA
 ua = UserAgent()
 
+# redis
+dedup = RedisFilter('redis://:k5ZJR5KV4q7DRZ92DQ@172.17.189.142:7361/4')
 
+# mgo
 dbm = MongoClient("172.17.4.87", 27080).hp_news
-client = dbm.news_Keywords
-news_list = dbm.news_list
-news_detail = dbm.news_detail
-
-r = redis.Redis(
-    host='172.17.162.28',
-    password='k5ZJR5KV4q7DRZ92DQ',
-    port=7361,
-    db=19
-)
-redis_key = "news"
-
-
-def sha1(text: str):
-    """
-    十六进制数字字符串形式摘要值
-    @param text: 字符串文本
-    @return: 摘要值
-    """
-    _sha1 = hashlib.sha1()
-    _sha1.update(text.encode("utf-8"))
-    return _sha1.hexdigest()
+news_keyword_coll = dbm.news_Keywords
+news_list_coll = dbm.news_list
+news_detail_coll = dbm.news_detail
 
 
 def get_proxy():
-    headers = {"authorization": "Basic amlhbnl1MDAxOjEyM3F3ZSFB"}
+    headers = {"Authorization": "Basic amlhbnl1MDAxOjEyM3F3ZSFB"}
     url = "http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch"
-    proxy = requests.get(url, headers=headers, timeout=5).json()
+    proxy = requests.get(url, headers=headers, timeout=3).json()
     proxy = proxy.get("data")
     logging.info("切换代理:{}".format(proxy))
     if not proxy: