# -*- coding: utf-8 -*-
"""
Created on 2024-04-09
---------
@summary: 主题爬虫 工具类
---------
@author: Lzz
"""
import calendar
import datetime
import functools
import hashlib
import random
import re
import time
from collections import namedtuple
import bson
import execjs
import redis
import requests
from loguru import logger
from pymongo import MongoClient
from utils.clean_html import cleaner
try:
from pymongo.errors import DuplicateKeyError
from hashlib import md5
except ImportError as e:
raise e
SearchText = namedtuple('SearchText', ['total'])
def nsssjss():
ex_js = '''
const jsdom = require("jsdom");
const {JSDOM} = jsdom;
const dom = new JSDOM(`
Hello world
`);
window = dom.window;
document = window.document;
JSEncrypt = require('jsencrypt')
function encryptByRSA(value) {
var encrypt = new JSEncrypt;
var RSAPublicKey = "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCS2TZDs5+orLYCL5SsJ54+bPCVs1ZQQwP2RoPkFQF2jcT0HnNNT8ZoQgJTrGwNi5QNTBDoHC4oJesAVYe6DoxXS9Nls8WbGE8ZNgOC5tVv1WVjyBw7k2x72C/qjPoyo/kO7TYl6Qnu4jqW/ImLoup/nsJppUznF0YgbyU/dFFNBQIDAQAB";
encrypt.setPublicKey('-----BEGIN PUBLIC KEY-----' + RSAPublicKey + '-----END PUBLIC KEY-----')
return encrypt.encrypt(value)
}
function get_njs(){
nsssjss = encryptByRSA('/freecms' + '/rest/v1/notice/selectInfoMoreChannel.do' + '$$' + new Date().getTime())
return nsssjss
}
'''
ctx = execjs.compile(ex_js)
njs = ctx.call('get_njs')
return njs
def get_QGIP():
proxy = "http://6278CF0D:41D9C796172D@tun-vdpzuj.qg.net:15254"
proxies = {
"http": proxy,
"https": proxy,
}
return proxies
def get_proxy(scheme=None, default=None, socks5h=False):
headers = {
"Authorization": "Basic amlhbnl1MDAxOjEyM3F3ZSFB"
}
while True:
proxy = requests.get("http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch", headers=headers).json()
# proxy = requests.get("http://39.106.157.58:1405/crawl/proxy/socks5/fetch", headers=headers).json()
proxies = proxy.get("data")
if proxies:
break
else:
logger.warning("暂无代理...")
time.sleep(3)
if socks5h:
proxyh = {
"http": proxies.get("http").replace("socks5", "socks5h"),
"https": proxies.get("http").replace("socks5", "socks5h")
}
proxies = proxyh
logger.info(f"切换代理: {proxies}")
if not scheme:
return proxies
else:
return proxies.get(scheme, default)
def Mongo_client():
client = MongoClient("172.17.4.87", 27080)
# client = MongoClient("172.20.45.130", 27017)
return client
def Redis_client():
_pool = redis.ConnectionPool(
host='172.17.162.28',
port=7361,
password='k5ZJR5KV4q7DRZ92DQ',
db=1
)
# _pool = redis.ConnectionPool(
# host='172.20.45.129',
# password='jianyu@python',
# port=3379,
# db=1
# )
r = redis.Redis(connection_pool=_pool, decode_responses=True)
return r
def int2long(param: int):
"""int 转换成 long """
return bson.int64.Int64(param)
def get_current_date(date_format="%Y-%m-%d %H:%M:%S"):
return datetime.datetime.now().strftime(date_format)
def date_to_timestamp(date, time_format="%Y-%m-%d %H:%M:%S"):
"""
@summary:
---------
@param date:将"2011-09-28 10:00:00"时间格式转化为时间戳
@param time_format:时间格式
---------
@result: 返回时间戳
"""
if ":" in date:
timestamp = time.mktime(time.strptime(date, time_format))
else:
timestamp = time.mktime(time.strptime(date, "%Y-%m-%d"))
return int(timestamp)
def timestamp_to_date(timestamp, time_format="%Y-%m-%d %H:%M:%S"):
"""
@summary:
---------
@param timestamp: 将时间戳转化为日期
@param time_format: 日期格式
---------
@result: 返回日期
"""
if timestamp is None:
raise ValueError("timestamp is null")
date = time.localtime(timestamp)
return time.strftime(time_format, date)
def get_sha1(*args):
"""
@summary: 获取唯一的40位值, 用于获取唯一的id
---------
@param *args: 参与联合去重的值
---------
@result: ba4868b3f277c8e387b55d9e3d0be7c045cdd89e
"""
sha1 = hashlib.sha1()
for arg in args:
sha1.update(str(arg).encode())
return sha1.hexdigest() # 40位
def get_sha256(*args):
"""
@summary: 获取唯一的64位值, 用于获取唯一的id
---------
@param *args: 参与联合去重的值
---------
@result: 5580c91ea29bf5bd963f4c08dfcacd983566e44ecea1735102bc380576fd6f30
"""
sha256 = hashlib.sha256()
for arg in args:
sha256.update(str(arg).encode())
return sha256.hexdigest() # 64位
def md5value(val):
md5 = hashlib.md5()
if isinstance(val, bytes):
md5.update(str(val).encode("utf-8"))
elif isinstance(val, str):
md5.update(val.encode("utf-8"))
return md5.hexdigest()
def ensure_int64(n):
"""
>>> ensure_int64(None)
0
>>> ensure_float(False)
0
>>> ensure_float(12)
12
>>> ensure_float("72")
72
"""
if not n:
return bson.int64.Int64(0)
return bson.int64.Int64(n)
def get_today_of_day(day_offset=0):
return str(datetime.date.today() + datetime.timedelta(days=day_offset))
def get_current_timestamp():
return int(time.time())
def add_zero(n):
return "%02d" % n
def sup_zero(indate):
deal = indate.split(' ')
head = deal[0].split('-')
tail = ""
if len(deal) == 2:
tail = " " + deal[1]
year = int(head[0])
month = int(head[1])
day = int(head[2])
fdate = datetime.datetime(year=year, month=month, day=day)
formatted_date = fdate.strftime("%Y-%m-%d") + tail
return formatted_date
def get_days_of_month(year, month):
"""
返回天数
"""
return calendar.monthrange(year, month)[1]
def get_year_month_and_days(month_offset=0):
"""
@summary:
---------
@param month_offset: 月份偏移量
---------
@result: ('2019', '04', '30')
"""
today = datetime.datetime.now()
year, month = today.year, today.month
this_year = int(year)
this_month = int(month)
total_month = this_month + month_offset
if month_offset >= 0:
if total_month <= 12:
days = str(get_days_of_month(this_year, total_month))
total_month = add_zero(total_month)
return (year, total_month, days)
else:
i = total_month // 12
j = total_month % 12
if j == 0:
i -= 1
j = 12
this_year += i
days = str(get_days_of_month(this_year, j))
j = add_zero(j)
return (str(this_year), str(j), days)
else:
if (total_month > 0) and (total_month < 12):
days = str(get_days_of_month(this_year, total_month))
total_month = add_zero(total_month)
return (year, total_month, days)
else:
i = total_month // 12
j = total_month % 12
if j == 0:
i -= 1
j = 12
this_year += i
days = str(get_days_of_month(this_year, j))
j = add_zero(j)
return (str(this_year), str(j), days)
def get_month(month_offset=0):
"""''
获取当前日期前后N月的日期
if month_offset>0, 获取当前日期前N月的日期
if month_offset<0, 获取当前日期后N月的日期
date format = "YYYY-MM-DD"
"""
today = datetime.datetime.now()
day = add_zero(today.day)
(y, m, d) = get_year_month_and_days(month_offset)
arr = (y, m, d)
if int(day) < int(d):
arr = (y, m, day)
return "-".join("%s" % i for i in arr)
def extract_file_type(file_name="附件名", file_url="附件地址", file_type_list=None):
"""
抽取附件类型
Args:
file_name: 附件名
file_url: 附件地址
file_type_list: 其他附件后缀
Returns: 附件类型
"""
if file_type_list is None:
file_type_list = []
if file_name and file_url:
file_name = file_name.strip()
file_types = ['zip', 'docx', 'ftp', 'pdf', 'doc', 'rar', 'gzzb', 'hzzbs',
'jpg', 'png', 'zbid', 'xls', 'xlsx', 'swp', 'dwg']
if file_type_list:
ftp_list = list(map(lambda x: x.lower(), file_type_list))
file_types.extend(ftp_list)
file_type = file_url.split('?')[0].split('.')[-1].lower()
if file_type not in file_types:
file_type = file_url.split('?')[-1].split('.')[-1].lower()
if file_type in file_types:
return file_type
else:
for ftp in file_types:
file_type = re.search(ftp, file_name) or re.search("\." + ftp, file_url)
if file_type:
return file_type.group(0).replace('.', '')
else:
return file_type
return None
def remove_htmldata(remove_info_list: list, html: str, response):
"""
过滤详情页无效数据
Args:
remove_info_list: 需删除内容的xpath或文本 -> list [xpath,re,str] eg:['data:image/(.*?)"',]
html: 待清洗文本
response: 原文响应体
Returns: 清洗后的文本
"""
if html and remove_info_list:
for extra_item in remove_info_list:
if re.search('^//.*', extra_item):
extra_html_list = response.xpath(extra_item).extract()
for extra_html in extra_html_list:
if extra_html:
html = html.replace(extra_html, '')
elif re.search('^.*', extra_item):
extra_item = extra_item.replace('', '')
extra_html_list = re.findall(f'{extra_item}', html, re.S | re.I | re.M)
if extra_html_list:
for exhtml in extra_html_list:
html = html.replace(exhtml, '')
else:
extra_html = extra_item
if extra_html:
html = html.replace(extra_html, '')
return html
def text_search(content: str) -> SearchText:
"""
中文检索
:param content: 文本
:return: 中文数量
"""
if not content:
return SearchText(0)
results = re.findall('[\u4e00-\u9fa5]', content, re.S)
# 列表长度即是中文的字数
return SearchText(len(results))
def clean_title(title):
'''清洗标题'''
if title:
rule_list = [
'\(\d{1,20}\)',
'\[[\u4e00-\u9fa5]{1,9}\]',
'【[\u4e00-\u9fa5]{1,9}】',
]
for rule in rule_list:
title = re.sub(rule, '', title)
return title
def substitute(html_str, special=None, completely=False):
"""HTML 替换"""
html_str = cleaner(html=html_str, special=special, completely=completely)
return html_str
def handle_publish_time(publishtime):
'''处理发布时间'''
try:
time_str = get_current_date().split(' ')[-1]
if ':' not in publishtime:
publishtime = publishtime + ' ' + time_str
else:
if '00:00:00' in publishtime:
publishtime = publishtime.split(' ')[0] + ' ' + time_str
l_np_publishtime = int2long(date_to_timestamp(publishtime))
publishtime, l_np_publishtime = handle_publish_time_overdue(publishtime, l_np_publishtime)
return publishtime, l_np_publishtime
except:
raise EOFError("publishtime 格式错误!")
def handle_publish_time_overdue(publishtime, l_np_publishtime):
"""处理超期发布时间"""
if l_np_publishtime and l_np_publishtime > get_current_timestamp():
logger.warning("发布时间大于当前时间,已设置当前时间为发布时间!")
publishtime = get_current_date()
l_np_publishtime = ensure_int64(date_to_timestamp(publishtime))
return publishtime, l_np_publishtime
def handle_page_html(item):
'''检测正文'''
title = item.get('title')
publishtime = item.get('publishtime')
href = item.get('href')
if href == "#":
href = item.get('competehref')
contenthtml = item.get('contenthtml')
detail = item.get('detail')
if not contenthtml:
logger.warning(f"页面源码不能为空!\n 发布地址:{href}\n 发布时间:{publishtime}\n 标题:{title}")
raise ValueError("无效正文!")
else:
if text_search(detail).total == 0:
logger.warning("无内容数据,数据不入保存服务!")
item['sendflag'] = "true"
def check_data_validity(item):
'''检测基础字段是否完整'''
title = item.get('title')
publishtime = item.get('publishtime')
href = item.get('href')
if href == "#":
href = item.get('competehref')
if not title or not publishtime or not href:
logger.error(f"基础数据不能为空!\n 发布地址:{href}\n 发布时间:{publishtime}\n 标题:{title}")
raise ValueError("基础数据异常")
_fields = {
'title', 'publishtime', 'spidercode', 'infoformat', 'site',
'channel', 'area', 'city', 'jsondata', 'district', 'href',
'is_mixed', 'comeintime', 's_title', 'l_np_publishtime',
'contenthtml', 'competehref', 'detail', 'iscompete', 'sendflag',
'_d', 'publishdept', 'type', 'T', 'projectinfo', 'is_theme'
}
def clean_fields(item, special_fields=None):
special_fields = special_fields or _fields
rm_fields = []
for key, val in item.items(): # 过滤非必须字段
if key not in special_fields:
rm_fields.append(key)
for field in rm_fields:
del item[field]
def join_fields(item, special_fields=None, **kwargs):
special_fields = special_fields or _fields
for k, v in kwargs.items():
if k in special_fields:
item[k] = v
else:
logger.error(f"{k} 入库字段未定义!")
def format_fields(item, callback=handle_publish_time, **kwargs):
""" 格式化入库字段(bidding) """
clean_fields(item)
if callable(callback):
time_str, timestamp = callback(item.get('publishtime'))
item['publishtime'] = time_str
item['l_np_publishtime'] = timestamp
item['detail'] = substitute(item.get('contenthtml'))
item['s_title'] = item.get('s_title') or item.get('title')
item['infoformat'] = 1
item['iscompete'] = True
item['sendflag'] = 'false'
item['_d'] = 'comeintime'
item['publishdept'] = ''
item['type'] = ''
item['T'] = 'bidding'
join_fields(item, **kwargs)
handle_page_html(item)
check_data_validity(item)
item['comeintime'] = int2long(int(time.time()))
return item
def format_fields_njpc(item, callback=handle_publish_time, **kwargs):
""" 格式化入库字段(拟建爬虫) """
req_fields = {
'site', 'approvenumber', 'method', 'project_scale', 'area', 'is_mixed',
'competehref',
'air_conditioner', 'funds', 'scale', 'construction_area', 'channel',
'contenthtml', 'elevator',
'building_floors', 'ownertel', 'parking', 'building', 'spidercode',
'title',
'detail', 'projectinfo', 'exterior', 'constructionunit', 'owner_info',
'approvetime',
'project_startdate', 'investment', 'heating', 'district',
'constructionunitperson',
'designunitperson', 'publishtime', 'system', 'pace', 'total',
'project_scale_info', 'passive',
'phone', 'construction', 'parking_pace', 'floors', 'freshair_system',
'other_project_scale',
'conditioner', 'wall', 'designunit', 'owneraddr',
'prefabricated_building', 'materials',
'constructionunitaddr', 'constructionunit_info', 'project_person',
'approvecontent',
'constructionunittel', 'floor', 'person', 'city', 'floor_area',
'project', 'approvestatus',
'project_completedate', 'completedate', 'ownerperson', 'sendflag',
'comeintime',
'steel_structure', 'projectaddr', 'freshair', 'T', 'startdate', 'house',
'projectname',
'exterior_wall_materials', 'other', 'passive_house', 'jsondata', 'air',
'prefabricated',
'designunit_info', 'approvedept', 'total_investment', 'infoformat',
'project_phone',
'owner', 'designunittel', 'projecttype', 'approvecode', 'steel',
'is_theme', 'designunitaddr',
'heating_method', 'href', 'projectperiod', 'structure'
}
clean_fields(item, special_fields=req_fields)
if callable(callback):
_, timestamp = callback(item.get('publishtime'))
item['publishtime'] = timestamp
item['detail'] = substitute(item.get('contenthtml'))
item['title'] = item.get('title') or item.get('projectname')
item['infoformat'] = 2
item['sendflag'] = "false"
item['T'] = "bidding"
join_fields(item, special_fields=req_fields, **kwargs)
handle_page_html(item)
check_data_validity(item)
item['comeintime'] = int2long(int(time.time()))
return item
def search(pattern, string):
result = re.search(pattern, string)
if result:
return result.groups()[0]
def sleep_time(start_time: int, end_time=0, step=-1):
time.sleep(random.random())
for i in range(start_time, end_time, step):
print(f"\r *** 休眠中... {i} 秒 *** ", end='')
time.sleep(1)
print("\r <* 休眠结束 *> ", end='')
# 装饰器
class Singleton(object):
def __init__(self, cls):
self._cls = cls
self._instance = {}
def __call__(self, *args, **kwargs):
if self._cls not in self._instance:
self._instance[self._cls] = self._cls(*args, **kwargs)
return self._instance[self._cls]
def down_load_image(proxy=None):
img_url = 'https://gdgpo.czt.gd.gov.cn/freecms/verify/verifyCode.do?createTypeFlag=n'
header = {
"Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9",
"Connection": "keep-alive",
"Referer": "https://gdgpo.czt.gd.gov.cn/cms-gd/site/guangdong/qwjsy/index.html?",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
}
res = requests.get(img_url, headers=header, proxies=proxy, timeout=30, verify=False)
upload_address = "http://pycaptcha.spdata.jianyu360.com/v1/images/verify"
content = {'file': res.content}
# with open('image.jpg', 'wb+') as f:
# f.write(res.content)
headers = {'accept': 'application/json'}
json_resp = requests.post(upload_address, headers=headers, files=content, stream=True).json()
if "msg" in json_resp and "success" == json_resp["msg"]:
code = json_resp["r"]["code"]
if len(code) == 4:
return code
return None
def _pack_file(file):
"""包装验证码格式"""
if isinstance(file, str) and file.startswith("data:image"):
img_file = {"file": file}
elif isinstance(file, bytes):
img_file = {"file": file}
else:
with open(file, "rb") as f:
img_bytes = f.read()
img_file = {"file": img_bytes}
return img_file
def simple_captcha(file):
"""
普通验证码
@param file: 验证码 - 可以是图片或者图片base64编码
@return:
"""
url = "http://pycaptcha.spdata.jianyu360.com/v1/images/verify"
files = _pack_file(file)
r = requests.post(url, headers={"accept": "application/json"}, files=files, stream=True, timeout=10)
rp_json = r.json()
if "msg" in rp_json and "success" == rp_json["msg"]:
return str(rp_json["r"]["code"])
return None
def retry_on_exception(retries=1, timeout=1):
def decorate(func):
@functools.wraps(func)
def warp(*args, **kwargs):
for _ in range(retries):
try:
return func(*args, **kwargs)
except Exception as e:
print(f"执行[{func.__name__}]失败, args:{args}, kwargs:{kwargs} 异常:{e}")
time.sleep(timeout)
raise RuntimeError(f"执行[{func.__name__}]达到最大重试次数")
return warp
return decorate
class PySpiderError(Exception):
def __init__(self, *args, **kwargs):
if 'code' not in kwargs and 'reason' not in kwargs:
kwargs['code'] = 10000
kwargs['reason'] = '未知爬虫错误,请手动处理'
for key, val in kwargs.items():
setattr(self, key, val)
super(PySpiderError, self).__init__(*args, kwargs)
class AttachmentNullError(PySpiderError):
def __init__(self, code: int = 10004, reason: str = '附件下载异常'):
super(AttachmentNullError, self).__init__(code=code, reason=reason)
class CustomError(Exception):
def __init__(self, ErrorInfo):
self.ErrorInfo = ErrorInfo
def __str__(self):
return self.ErrorInfo
format_fileds = format_fields
format_fileds_njpc = format_fields_njpc