news_detail.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. # coding:utf-8
  2. import re
  3. import time
  4. import warnings
  5. from concurrent.futures import ThreadPoolExecutor, wait
  6. from urllib.parse import urlparse
  7. import httpx
  8. import requests
  9. import urllib3
  10. from gne import GeneralNewsExtractor
  11. from loguru import logger
  12. from tools import news_list_coll, news_detail_coll
  13. from tools import ua
  14. warnings.simplefilter("ignore", UserWarning)
  15. urllib3.disable_warnings()
  16. # gne
  17. extractor = GeneralNewsExtractor()
  18. def extract_chinese(text):
  19. pattern = re.compile(r'[\u4e00-\u9fff]+') # 匹配Unicode中文范围
  20. return True if re.findall(pattern, text) else False
  21. def date_to_timestamp(publish_time):
  22. time_array = time.strptime(publish_time, "%Y-%m-%d")
  23. return int(time.mktime(time_array))
  24. def get_detail_by_httpx(info):
  25. url = info["url"] if str(info["url"]).count("https") else str(info["url"]).replace("http", "https")
  26. headers = {
  27. "Accept": "text/html,application/xhtml+xml, application/xml;q=0.9, image/avif, image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
  28. "Accept-Language": "zh-CN,zh;q=0.9",
  29. "Cache-Control": "no-cache",
  30. "Connection": "keep-alive",
  31. "User-Agent": ua.random,
  32. }
  33. req = httpx.get(str(url), timeout=10, headers=headers)
  34. if req.status_code == 200:
  35. html = req.content.decode()
  36. result = extractor.extract(html, with_body_html=False)
  37. url_parser = urlparse(url)
  38. item = {}
  39. item["title"] = result["title"]
  40. try:
  41. item["list_title"] = info["list_title"]
  42. except:
  43. pass
  44. item["detail"] = result["content"]
  45. item["contenthtml"] = html
  46. new_pubulishtime = str(result["publish_time"]).split(" ")[0].replace("年", "-").replace("月", "-").replace("日","").replace( "/", "-")
  47. item["pubulishtime"] = new_pubulishtime.split("T")[0] if len(new_pubulishtime) > 11 else new_pubulishtime
  48. item["infourl"] = url
  49. item["domain"] = url_parser.netloc
  50. item["searchwords"] = info["searchwords"]
  51. item["searchengine"] = "baidu"
  52. item["comeintime"] = int(time.time())
  53. item["type"] = True
  54. news_detail_coll.insert_one(item)
  55. logger.info(f"下载信息: {item['title']}")
  56. def get_detail_by_requests(info):
  57. url = info["url"]
  58. headers = {
  59. "Accept": "application/json",
  60. "Accept-Language": "zh-CN,zh;q=0.9",
  61. "Cache-Control": "no-cache",
  62. "Connection": "keep-alive",
  63. "User-Agent": ua.random,
  64. }
  65. try:
  66. req = requests.get(url, headers=headers, timeout=10, verify=False)
  67. req.encoding = req.apparent_encoding
  68. if req.status_code == 200:
  69. url_parser = urlparse(url)
  70. html = req.content.decode()
  71. result = extractor.extract(html, with_body_html=False)
  72. item = {}
  73. item["title"] = result["title"]
  74. try:
  75. item["list_title"] = info["list_title"]
  76. except:
  77. pass
  78. item["detail"] = result["content"]
  79. item["contenthtml"] = html
  80. new_pubulishtime = str(result["publish_time"]).split(" ")[0].replace("年", "-").replace("月", "-").replace("日","").replace("/", "-")
  81. item["pubulishtime"] = new_pubulishtime.split("T")[0] if len(new_pubulishtime) > 11 else new_pubulishtime
  82. item["infourl"] = url
  83. item["domain"] = url_parser.netloc
  84. item["searchwords"] = info["searchwords"]
  85. item["searchengine"] = "baidu"
  86. item["comeintime"] = int(time.time())
  87. item["site"] = info["site"]
  88. item["type"] = True
  89. news_detail_coll.insert_one(item)
  90. logger.info(f"下载信息:{item['title']}")
  91. except:
  92. logger.error(f"下载失败:{info['list_title']}")
  93. def run(task):
  94. if task["url"].count("baijiahao"):
  95. get_detail_by_httpx(task)
  96. else:
  97. get_detail_by_requests(task)
  98. news_list_coll.delete_one({"_id": task["_id"]})
  99. def spider(workers):
  100. with ThreadPoolExecutor(max_workers=workers) as p:
  101. fs = [p.submit(run, task) for task in news_list_coll.find()]
  102. wait(fs)
  103. def Isvalid():
  104. q = {"isvalid": {"$exists": 0}}
  105. f = {"contenthtml": 0, "detail": 0}
  106. with news_detail_coll.find(q, projection=f, no_cursor_timeout=True) as cursor:
  107. for info in cursor:
  108. # 标题乱码
  109. seqs = ["...", "…"]
  110. title = info["title"]
  111. if list(filter(lambda x: x in seqs, title)) or not extract_chinese(title):
  112. news_detail_coll.update_one({"_id": info["_id"]}, {"$set": {"title": info["list_title"]}})
  113. # 发布时间大于2023/7/1
  114. isvalid = False
  115. try:
  116. if 1688140800 <= date_to_timestamp(info["pubulishtime"]) <= int(time.time()):
  117. isvalid = True
  118. except ValueError:
  119. pass
  120. news_detail_coll.update_one({"_id": info["_id"]}, {"$set": {"isvalid": isvalid}})
  121. logger.info(f"数据校验:{info['title']}")
  122. if __name__ == '__main__':
  123. while 1:
  124. spider(workers=10)
  125. Isvalid()
  126. logger.info("本轮执行完成, 将延时5分钟后执行.")
  127. time.sleep(300)