123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- # coding:utf-8
- import re
- import time
- import warnings
- from concurrent.futures import ThreadPoolExecutor, wait
- from urllib.parse import urlparse
- import httpx
- import requests
- import urllib3
- from gne import GeneralNewsExtractor
- from loguru import logger
- from tools import news_list_coll, news_detail_coll
- from tools import ua
- warnings.simplefilter("ignore", UserWarning)
- urllib3.disable_warnings()
- # gne
- extractor = GeneralNewsExtractor()
- def extract_chinese(text):
- pattern = re.compile(r'[\u4e00-\u9fff]+') # 匹配Unicode中文范围
- return True if re.findall(pattern, text) else False
- def date_to_timestamp(publish_time):
- time_array = time.strptime(publish_time, "%Y-%m-%d")
- return int(time.mktime(time_array))
- def get_detail_by_httpx(info):
- url = info["url"] if str(info["url"]).count("https") else str(info["url"]).replace("http", "https")
- headers = {
- "Accept": "text/html,application/xhtml+xml, application/xml;q=0.9, image/avif, image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Cache-Control": "no-cache",
- "Connection": "keep-alive",
- "User-Agent": ua.random,
- }
- req = httpx.get(str(url), timeout=10, headers=headers)
- if req.status_code == 200:
- html = req.content.decode()
- result = extractor.extract(html, with_body_html=False)
- url_parser = urlparse(url)
- item = {}
- item["title"] = result["title"]
- try:
- item["list_title"] = info["list_title"]
- except:
- pass
- item["detail"] = result["content"]
- item["contenthtml"] = html
- new_pubulishtime = str(result["publish_time"]).split(" ")[0].replace("年", "-").replace("月", "-").replace("日","").replace( "/", "-")
- item["pubulishtime"] = new_pubulishtime.split("T")[0] if len(new_pubulishtime) > 11 else new_pubulishtime
- item["infourl"] = url
- item["domain"] = url_parser.netloc
- item["searchwords"] = info["searchwords"]
- item["searchengine"] = "baidu"
- item["comeintime"] = int(time.time())
- item["type"] = True
- news_detail_coll.insert_one(item)
- logger.info(f"下载信息: {item['title']}")
- def get_detail_by_requests(info):
- url = info["url"]
- headers = {
- "Accept": "application/json",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Cache-Control": "no-cache",
- "Connection": "keep-alive",
- "User-Agent": ua.random,
- }
- try:
- req = requests.get(url, headers=headers, timeout=10, verify=False)
- req.encoding = req.apparent_encoding
- if req.status_code == 200:
- url_parser = urlparse(url)
- html = req.content.decode()
- result = extractor.extract(html, with_body_html=False)
- item = {}
- item["title"] = result["title"]
- try:
- item["list_title"] = info["list_title"]
- except:
- pass
- item["detail"] = result["content"]
- item["contenthtml"] = html
- new_pubulishtime = str(result["publish_time"]).split(" ")[0].replace("年", "-").replace("月", "-").replace("日","").replace("/", "-")
- item["pubulishtime"] = new_pubulishtime.split("T")[0] if len(new_pubulishtime) > 11 else new_pubulishtime
- item["infourl"] = url
- item["domain"] = url_parser.netloc
- item["searchwords"] = info["searchwords"]
- item["searchengine"] = "baidu"
- item["comeintime"] = int(time.time())
- item["site"] = info["site"]
- item["type"] = True
- news_detail_coll.insert_one(item)
- logger.info(f"下载信息:{item['title']}")
- except:
- logger.error(f"下载失败:{info['list_title']}")
- def run(task):
- if task["url"].count("baijiahao"):
- get_detail_by_httpx(task)
- else:
- get_detail_by_requests(task)
- news_list_coll.delete_one({"_id": task["_id"]})
- def spider(workers):
- with ThreadPoolExecutor(max_workers=workers) as p:
- fs = [p.submit(run, task) for task in news_list_coll.find()]
- wait(fs)
- def Isvalid():
- q = {"isvalid": {"$exists": 0}}
- f = {"contenthtml": 0, "detail": 0}
- with news_detail_coll.find(q, projection=f, no_cursor_timeout=True) as cursor:
- for info in cursor:
- # 标题乱码
- seqs = ["...", "…"]
- title = info["title"]
- if list(filter(lambda x: x in seqs, title)) or not extract_chinese(title):
- news_detail_coll.update_one({"_id": info["_id"]}, {"$set": {"title": info["list_title"]}})
- # 发布时间大于2023/7/1
- isvalid = False
- try:
- if 1688140800 <= date_to_timestamp(info["pubulishtime"]) <= int(time.time()):
- isvalid = True
- except ValueError:
- pass
- news_detail_coll.update_one({"_id": info["_id"]}, {"$set": {"isvalid": isvalid}})
- logger.info(f"数据校验:{info['title']}")
- if __name__ == '__main__':
- while 1:
- spider(workers=10)
- Isvalid()
- logger.info("本轮执行完成, 将延时5分钟后执行.")
- time.sleep(300)
|