123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712 |
- # -*- coding: utf-8 -*-
- """
- Created on 2024-04-09
- ---------
- @summary: 主题爬虫 工具类
- ---------
- @author: Lzz
- """
- import calendar
- import datetime
- import functools
- import hashlib
- import random
- import re
- import time
- from collections import namedtuple
- import bson
- import execjs
- import redis
- import requests
- from loguru import logger
- from pymongo import MongoClient
- from utils.clean_html import cleaner
- try:
- from pymongo.errors import DuplicateKeyError
- from hashlib import md5
- except ImportError as e:
- raise e
- SearchText = namedtuple('SearchText', ['total'])
- def nsssjss():
- ex_js = '''
- const jsdom = require("jsdom");
- const {JSDOM} = jsdom;
- const dom = new JSDOM(`<!DOCTYPE html><p>Hello world</p>`);
- window = dom.window;
- document = window.document;
- JSEncrypt = require('jsencrypt')
- function encryptByRSA(value) {
- var encrypt = new JSEncrypt;
- var RSAPublicKey = "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCS2TZDs5+orLYCL5SsJ54+bPCVs1ZQQwP2RoPkFQF2jcT0HnNNT8ZoQgJTrGwNi5QNTBDoHC4oJesAVYe6DoxXS9Nls8WbGE8ZNgOC5tVv1WVjyBw7k2x72C/qjPoyo/kO7TYl6Qnu4jqW/ImLoup/nsJppUznF0YgbyU/dFFNBQIDAQAB";
- encrypt.setPublicKey('-----BEGIN PUBLIC KEY-----' + RSAPublicKey + '-----END PUBLIC KEY-----')
- return encrypt.encrypt(value)
- }
- function get_njs(){
- nsssjss = encryptByRSA('/freecms' + '/rest/v1/notice/selectInfoMoreChannel.do' + '$$' + new Date().getTime())
- return nsssjss
- }
- '''
- ctx = execjs.compile(ex_js)
- njs = ctx.call('get_njs')
- return njs
- def get_QGIP():
- proxy = "http://6278CF0D:41D9C796172D@tun-vdpzuj.qg.net:15254"
- proxies = {
- "http": proxy,
- "https": proxy,
- }
- return proxies
- def get_proxy(scheme=None, default=None, socks5h=False):
- headers = {
- "Authorization": "Basic amlhbnl1MDAxOjEyM3F3ZSFB"
- }
- while True:
- proxy = requests.get("http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch", headers=headers).json()
- # proxy = requests.get("http://39.106.157.58:1405/crawl/proxy/socks5/fetch", headers=headers).json()
- proxies = proxy.get("data")
- if proxies:
- break
- else:
- logger.warning("暂无代理...")
- time.sleep(3)
- if socks5h:
- proxyh = {
- "http": proxies.get("http").replace("socks5", "socks5h"),
- "https": proxies.get("http").replace("socks5", "socks5h")
- }
- proxies = proxyh
- logger.info(f"切换代理: {proxies}")
- if not scheme:
- return proxies
- else:
- return proxies.get(scheme, default)
- def Mongo_client():
- client = MongoClient("172.17.4.87", 27080)
- # client = MongoClient("172.20.45.130", 27017)
- return client
- def Redis_client():
- _pool = redis.ConnectionPool(
- host='172.17.162.28',
- port=7361,
- password='k5ZJR5KV4q7DRZ92DQ',
- db=1
- )
- # _pool = redis.ConnectionPool(
- # host='172.20.45.129',
- # password='jianyu@python',
- # port=3379,
- # db=1
- # )
- r = redis.Redis(connection_pool=_pool, decode_responses=True)
- return r
- def int2long(param: int):
- """int 转换成 long """
- return bson.int64.Int64(param)
- def get_current_date(date_format="%Y-%m-%d %H:%M:%S"):
- return datetime.datetime.now().strftime(date_format)
- def date_to_timestamp(date, time_format="%Y-%m-%d %H:%M:%S"):
- """
- @summary:
- ---------
- @param date:将"2011-09-28 10:00:00"时间格式转化为时间戳
- @param time_format:时间格式
- ---------
- @result: 返回时间戳
- """
- if ":" in date:
- timestamp = time.mktime(time.strptime(date, time_format))
- else:
- timestamp = time.mktime(time.strptime(date, "%Y-%m-%d"))
- return int(timestamp)
- def timestamp_to_date(timestamp, time_format="%Y-%m-%d %H:%M:%S"):
- """
- @summary:
- ---------
- @param timestamp: 将时间戳转化为日期
- @param time_format: 日期格式
- ---------
- @result: 返回日期
- """
- if timestamp is None:
- raise ValueError("timestamp is null")
- date = time.localtime(timestamp)
- return time.strftime(time_format, date)
- def get_sha1(*args):
- """
- @summary: 获取唯一的40位值, 用于获取唯一的id
- ---------
- @param *args: 参与联合去重的值
- ---------
- @result: ba4868b3f277c8e387b55d9e3d0be7c045cdd89e
- """
- sha1 = hashlib.sha1()
- for arg in args:
- sha1.update(str(arg).encode())
- return sha1.hexdigest() # 40位
- def get_sha256(*args):
- """
- @summary: 获取唯一的64位值, 用于获取唯一的id
- ---------
- @param *args: 参与联合去重的值
- ---------
- @result: 5580c91ea29bf5bd963f4c08dfcacd983566e44ecea1735102bc380576fd6f30
- """
- sha256 = hashlib.sha256()
- for arg in args:
- sha256.update(str(arg).encode())
- return sha256.hexdigest() # 64位
- def md5value(val):
- md5 = hashlib.md5()
- if isinstance(val, bytes):
- md5.update(str(val).encode("utf-8"))
- elif isinstance(val, str):
- md5.update(val.encode("utf-8"))
- return md5.hexdigest()
- def ensure_int64(n):
- """
- >>> ensure_int64(None)
- 0
- >>> ensure_float(False)
- 0
- >>> ensure_float(12)
- 12
- >>> ensure_float("72")
- 72
- """
- if not n:
- return bson.int64.Int64(0)
- return bson.int64.Int64(n)
- def get_today_of_day(day_offset=0):
- return str(datetime.date.today() + datetime.timedelta(days=day_offset))
- def get_current_timestamp():
- return int(time.time())
- def add_zero(n):
- return "%02d" % n
- def sup_zero(indate):
- deal = indate.split(' ')
- head = deal[0].split('-')
- tail = ""
- if len(deal) == 2:
- tail = " " + deal[1]
- year = int(head[0])
- month = int(head[1])
- day = int(head[2])
- fdate = datetime.datetime(year=year, month=month, day=day)
- formatted_date = fdate.strftime("%Y-%m-%d") + tail
- return formatted_date
- def get_days_of_month(year, month):
- """
- 返回天数
- """
- return calendar.monthrange(year, month)[1]
- def get_year_month_and_days(month_offset=0):
- """
- @summary:
- ---------
- @param month_offset: 月份偏移量
- ---------
- @result: ('2019', '04', '30')
- """
- today = datetime.datetime.now()
- year, month = today.year, today.month
- this_year = int(year)
- this_month = int(month)
- total_month = this_month + month_offset
- if month_offset >= 0:
- if total_month <= 12:
- days = str(get_days_of_month(this_year, total_month))
- total_month = add_zero(total_month)
- return (year, total_month, days)
- else:
- i = total_month // 12
- j = total_month % 12
- if j == 0:
- i -= 1
- j = 12
- this_year += i
- days = str(get_days_of_month(this_year, j))
- j = add_zero(j)
- return (str(this_year), str(j), days)
- else:
- if (total_month > 0) and (total_month < 12):
- days = str(get_days_of_month(this_year, total_month))
- total_month = add_zero(total_month)
- return (year, total_month, days)
- else:
- i = total_month // 12
- j = total_month % 12
- if j == 0:
- i -= 1
- j = 12
- this_year += i
- days = str(get_days_of_month(this_year, j))
- j = add_zero(j)
- return (str(this_year), str(j), days)
- def get_month(month_offset=0):
- """''
- 获取当前日期前后N月的日期
- if month_offset>0, 获取当前日期前N月的日期
- if month_offset<0, 获取当前日期后N月的日期
- date format = "YYYY-MM-DD"
- """
- today = datetime.datetime.now()
- day = add_zero(today.day)
- (y, m, d) = get_year_month_and_days(month_offset)
- arr = (y, m, d)
- if int(day) < int(d):
- arr = (y, m, day)
- return "-".join("%s" % i for i in arr)
- def extract_file_type(file_name="附件名", file_url="附件地址", file_type_list=None):
- """
- 抽取附件类型
- Args:
- file_name: 附件名
- file_url: 附件地址
- file_type_list: 其他附件后缀
- Returns: 附件类型
- """
- if file_type_list is None:
- file_type_list = []
- if file_name and file_url:
- file_name = file_name.strip()
- file_types = ['zip', 'docx', 'ftp', 'pdf', 'doc', 'rar', 'gzzb', 'hzzbs',
- 'jpg', 'png', 'zbid', 'xls', 'xlsx', 'swp', 'dwg']
- if file_type_list:
- ftp_list = list(map(lambda x: x.lower(), file_type_list))
- file_types.extend(ftp_list)
- file_type = file_url.split('?')[0].split('.')[-1].lower()
- if file_type not in file_types:
- file_type = file_url.split('?')[-1].split('.')[-1].lower()
- if file_type in file_types:
- return file_type
- else:
- for ftp in file_types:
- file_type = re.search(ftp, file_name) or re.search("\." + ftp, file_url)
- if file_type:
- return file_type.group(0).replace('.', '')
- else:
- return file_type
- return None
- def remove_htmldata(remove_info_list: list, html: str, response):
- """
- 过滤详情页无效数据
- Args:
- remove_info_list: 需删除内容的xpath或文本 -> list [xpath,re,str] eg:['<re>data:image/(.*?)"',]
- html: 待清洗文本
- response: 原文响应体
- Returns: 清洗后的文本
- """
- if html and remove_info_list:
- for extra_item in remove_info_list:
- if re.search('^//.*', extra_item):
- extra_html_list = response.xpath(extra_item).extract()
- for extra_html in extra_html_list:
- if extra_html:
- html = html.replace(extra_html, '')
- elif re.search('^<re>.*', extra_item):
- extra_item = extra_item.replace('<re>', '')
- extra_html_list = re.findall(f'{extra_item}', html, re.S | re.I | re.M)
- if extra_html_list:
- for exhtml in extra_html_list:
- html = html.replace(exhtml, '')
- else:
- extra_html = extra_item
- if extra_html:
- html = html.replace(extra_html, '')
- return html
- def text_search(content: str) -> SearchText:
- """
- 中文检索
- :param content: 文本
- :return: 中文数量
- """
- if not content:
- return SearchText(0)
- results = re.findall('[\u4e00-\u9fa5]', content, re.S)
- # 列表长度即是中文的字数
- return SearchText(len(results))
- def clean_title(title):
- '''清洗标题'''
- if title:
- rule_list = [
- '\(\d{1,20}\)',
- '\[[\u4e00-\u9fa5]{1,9}\]',
- '【[\u4e00-\u9fa5]{1,9}】',
- ]
- for rule in rule_list:
- title = re.sub(rule, '', title)
- return title
- def substitute(html_str, special=None, completely=False):
- """HTML 替换"""
- html_str = cleaner(html=html_str, special=special, completely=completely)
- return html_str
- def handle_publish_time(publishtime):
- '''处理发布时间'''
- try:
- time_str = get_current_date().split(' ')[-1]
- if ':' not in publishtime:
- publishtime = publishtime + ' ' + time_str
- else:
- if '00:00:00' in publishtime:
- publishtime = publishtime.split(' ')[0] + ' ' + time_str
- l_np_publishtime = int2long(date_to_timestamp(publishtime))
- publishtime, l_np_publishtime = handle_publish_time_overdue(publishtime, l_np_publishtime)
- return publishtime, l_np_publishtime
- except:
- raise EOFError("publishtime 格式错误!")
- def handle_publish_time_overdue(publishtime, l_np_publishtime):
- """处理超期发布时间"""
- if l_np_publishtime and l_np_publishtime > get_current_timestamp():
- logger.warning("发布时间大于当前时间,已设置当前时间为发布时间!")
- publishtime = get_current_date()
- l_np_publishtime = ensure_int64(date_to_timestamp(publishtime))
- return publishtime, l_np_publishtime
- def handle_page_html(item):
- '''检测正文'''
- title = item.get('title')
- publishtime = item.get('publishtime')
- href = item.get('href')
- if href == "#":
- href = item.get('competehref')
- contenthtml = item.get('contenthtml')
- detail = item.get('detail')
- if not contenthtml:
- logger.warning(f"页面源码不能为空!\n 发布地址:{href}\n 发布时间:{publishtime}\n 标题:{title}")
- raise ValueError("无效正文!")
- else:
- if text_search(detail).total == 0:
- logger.warning("无内容数据,数据不入保存服务!")
- item['sendflag'] = "true"
- def check_data_validity(item):
- '''检测基础字段是否完整'''
- title = item.get('title')
- publishtime = item.get('publishtime')
- href = item.get('href')
- if href == "#":
- href = item.get('competehref')
- if not title or not publishtime or not href:
- logger.error(f"基础数据不能为空!\n 发布地址:{href}\n 发布时间:{publishtime}\n 标题:{title}")
- raise ValueError("基础数据异常")
- _fields = {
- 'title', 'publishtime', 'spidercode', 'infoformat', 'site',
- 'channel', 'area', 'city', 'jsondata', 'district', 'href',
- 'is_mixed', 'comeintime', 's_title', 'l_np_publishtime',
- 'contenthtml', 'competehref', 'detail', 'iscompete', 'sendflag',
- '_d', 'publishdept', 'type', 'T', 'projectinfo', 'is_theme'
- }
- def clean_fields(item, special_fields=None):
- special_fields = special_fields or _fields
- rm_fields = []
- for key, val in item.items(): # 过滤非必须字段
- if key not in special_fields:
- rm_fields.append(key)
- for field in rm_fields:
- del item[field]
- def join_fields(item, special_fields=None, **kwargs):
- special_fields = special_fields or _fields
- for k, v in kwargs.items():
- if k in special_fields:
- item[k] = v
- else:
- logger.error(f"{k} 入库字段未定义!")
- def format_fields(item, callback=handle_publish_time, **kwargs):
- """ 格式化入库字段(bidding) """
- clean_fields(item)
- if callable(callback):
- time_str, timestamp = callback(item.get('publishtime'))
- item['publishtime'] = time_str
- item['l_np_publishtime'] = timestamp
- item['detail'] = substitute(item.get('contenthtml'))
- item['s_title'] = item.get('s_title') or item.get('title')
- item['infoformat'] = 1
- item['iscompete'] = True
- item['sendflag'] = 'false'
- item['_d'] = 'comeintime'
- item['publishdept'] = ''
- item['type'] = ''
- item['T'] = 'bidding'
- join_fields(item, **kwargs)
- handle_page_html(item)
- check_data_validity(item)
- item['comeintime'] = int2long(int(time.time()))
- return item
- def format_fields_njpc(item, callback=handle_publish_time, **kwargs):
- """ 格式化入库字段(拟建爬虫) """
- req_fields = {
- 'site', 'approvenumber', 'method', 'project_scale', 'area', 'is_mixed',
- 'competehref',
- 'air_conditioner', 'funds', 'scale', 'construction_area', 'channel',
- 'contenthtml', 'elevator',
- 'building_floors', 'ownertel', 'parking', 'building', 'spidercode',
- 'title',
- 'detail', 'projectinfo', 'exterior', 'constructionunit', 'owner_info',
- 'approvetime',
- 'project_startdate', 'investment', 'heating', 'district',
- 'constructionunitperson',
- 'designunitperson', 'publishtime', 'system', 'pace', 'total',
- 'project_scale_info', 'passive',
- 'phone', 'construction', 'parking_pace', 'floors', 'freshair_system',
- 'other_project_scale',
- 'conditioner', 'wall', 'designunit', 'owneraddr',
- 'prefabricated_building', 'materials',
- 'constructionunitaddr', 'constructionunit_info', 'project_person',
- 'approvecontent',
- 'constructionunittel', 'floor', 'person', 'city', 'floor_area',
- 'project', 'approvestatus',
- 'project_completedate', 'completedate', 'ownerperson', 'sendflag',
- 'comeintime',
- 'steel_structure', 'projectaddr', 'freshair', 'T', 'startdate', 'house',
- 'projectname',
- 'exterior_wall_materials', 'other', 'passive_house', 'jsondata', 'air',
- 'prefabricated',
- 'designunit_info', 'approvedept', 'total_investment', 'infoformat',
- 'project_phone',
- 'owner', 'designunittel', 'projecttype', 'approvecode', 'steel',
- 'is_theme', 'designunitaddr',
- 'heating_method', 'href', 'projectperiod', 'structure'
- }
- clean_fields(item, special_fields=req_fields)
- if callable(callback):
- _, timestamp = callback(item.get('publishtime'))
- item['publishtime'] = timestamp
- item['detail'] = substitute(item.get('contenthtml'))
- item['title'] = item.get('title') or item.get('projectname')
- item['infoformat'] = 2
- item['sendflag'] = "false"
- item['T'] = "bidding"
- join_fields(item, special_fields=req_fields, **kwargs)
- handle_page_html(item)
- check_data_validity(item)
- item['comeintime'] = int2long(int(time.time()))
- return item
- def search(pattern, string):
- result = re.search(pattern, string)
- if result:
- return result.groups()[0]
- def sleep_time(start_time: int, end_time=0, step=-1):
- time.sleep(random.random())
- for i in range(start_time, end_time, step):
- print(f"\r *** 休眠中... {i} 秒 *** ", end='')
- time.sleep(1)
- print("\r <* 休眠结束 *> ", end='')
- # 装饰器
- class Singleton(object):
- def __init__(self, cls):
- self._cls = cls
- self._instance = {}
- def __call__(self, *args, **kwargs):
- if self._cls not in self._instance:
- self._instance[self._cls] = self._cls(*args, **kwargs)
- return self._instance[self._cls]
- def down_load_image(proxy=None):
- img_url = 'https://gdgpo.czt.gd.gov.cn/freecms/verify/verifyCode.do?createTypeFlag=n'
- header = {
- "Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Connection": "keep-alive",
- "Referer": "https://gdgpo.czt.gd.gov.cn/cms-gd/site/guangdong/qwjsy/index.html?",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
- }
- res = requests.get(img_url, headers=header, proxies=proxy, timeout=30, verify=False)
- upload_address = "http://pycaptcha.spdata.jianyu360.com/v1/images/verify"
- content = {'file': res.content}
- # with open('image.jpg', 'wb+') as f:
- # f.write(res.content)
- headers = {'accept': 'application/json'}
- json_resp = requests.post(upload_address, headers=headers, files=content, stream=True).json()
- if "msg" in json_resp and "success" == json_resp["msg"]:
- code = json_resp["r"]["code"]
- if len(code) == 4:
- return code
- return None
- def _pack_file(file):
- """包装验证码格式"""
- if isinstance(file, str) and file.startswith("data:image"):
- img_file = {"file": file}
- elif isinstance(file, bytes):
- img_file = {"file": file}
- else:
- with open(file, "rb") as f:
- img_bytes = f.read()
- img_file = {"file": img_bytes}
- return img_file
- def simple_captcha(file):
- """
- 普通验证码
- @param file: 验证码 - 可以是图片或者图片base64编码
- @return:
- """
- url = "http://pycaptcha.spdata.jianyu360.com/v1/images/verify"
- files = _pack_file(file)
- r = requests.post(url, headers={"accept": "application/json"}, files=files, stream=True, timeout=10)
- rp_json = r.json()
- if "msg" in rp_json and "success" == rp_json["msg"]:
- return str(rp_json["r"]["code"])
- return None
- def retry_on_exception(retries=1, timeout=1):
- def decorate(func):
- @functools.wraps(func)
- def warp(*args, **kwargs):
- for _ in range(retries):
- try:
- return func(*args, **kwargs)
- except Exception as e:
- print(f"执行[{func.__name__}]失败, args:{args}, kwargs:{kwargs} 异常:{e}")
- time.sleep(timeout)
- raise RuntimeError(f"执行[{func.__name__}]达到最大重试次数")
- return warp
- return decorate
- class PySpiderError(Exception):
- def __init__(self, *args, **kwargs):
- if 'code' not in kwargs and 'reason' not in kwargs:
- kwargs['code'] = 10000
- kwargs['reason'] = '未知爬虫错误,请手动处理'
- for key, val in kwargs.items():
- setattr(self, key, val)
- super(PySpiderError, self).__init__(*args, kwargs)
- class AttachmentNullError(PySpiderError):
- def __init__(self, code: int = 10004, reason: str = '附件下载异常'):
- super(AttachmentNullError, self).__init__(code=code, reason=reason)
- class CustomError(Exception):
- def __init__(self, ErrorInfo):
- self.ErrorInfo = ErrorInfo
- def __str__(self):
- return self.ErrorInfo
- format_fileds = format_fields
- format_fileds_njpc = format_fields_njpc
|