# -*- coding: utf-8 -*- """ Created on 2024-04-09 --------- @summary: 主题爬虫 工具类 --------- @author: Lzz """ import calendar import datetime import functools import hashlib import random import re import time from collections import namedtuple import bson import execjs import redis import requests from loguru import logger from pymongo import MongoClient from utils.clean_html import cleaner try: from pymongo.errors import DuplicateKeyError from hashlib import md5 except ImportError as e: raise e SearchText = namedtuple('SearchText', ['total']) def nsssjss(): ex_js = ''' const jsdom = require("jsdom"); const {JSDOM} = jsdom; const dom = new JSDOM(`

Hello world

`); window = dom.window; document = window.document; JSEncrypt = require('jsencrypt') function encryptByRSA(value) { var encrypt = new JSEncrypt; var RSAPublicKey = "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCS2TZDs5+orLYCL5SsJ54+bPCVs1ZQQwP2RoPkFQF2jcT0HnNNT8ZoQgJTrGwNi5QNTBDoHC4oJesAVYe6DoxXS9Nls8WbGE8ZNgOC5tVv1WVjyBw7k2x72C/qjPoyo/kO7TYl6Qnu4jqW/ImLoup/nsJppUznF0YgbyU/dFFNBQIDAQAB"; encrypt.setPublicKey('-----BEGIN PUBLIC KEY-----' + RSAPublicKey + '-----END PUBLIC KEY-----') return encrypt.encrypt(value) } function get_njs(){ nsssjss = encryptByRSA('/freecms' + '/rest/v1/notice/selectInfoMoreChannel.do' + '$$' + new Date().getTime()) return nsssjss } ''' ctx = execjs.compile(ex_js) njs = ctx.call('get_njs') return njs def get_QGIP(): proxy = "http://6278CF0D:41D9C796172D@tun-vdpzuj.qg.net:15254" proxies = { "http": proxy, "https": proxy, } return proxies def get_proxy(scheme=None, default=None, socks5h=False): headers = { "Authorization": "Basic amlhbnl1MDAxOjEyM3F3ZSFB" } while True: proxy = requests.get("http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch", headers=headers).json() # proxy = requests.get("http://39.106.157.58:1405/crawl/proxy/socks5/fetch", headers=headers).json() proxies = proxy.get("data") if proxies: break else: logger.warning("暂无代理...") time.sleep(3) if socks5h: proxyh = { "http": proxies.get("http").replace("socks5", "socks5h"), "https": proxies.get("http").replace("socks5", "socks5h") } proxies = proxyh logger.info(f"切换代理: {proxies}") if not scheme: return proxies else: return proxies.get(scheme, default) def Mongo_client(): client = MongoClient("172.17.4.87", 27080) # client = MongoClient("172.20.45.130", 27017) return client def Redis_client(): _pool = redis.ConnectionPool( host='172.17.162.28', port=7361, password='k5ZJR5KV4q7DRZ92DQ', db=1 ) # _pool = redis.ConnectionPool( # host='172.20.45.129', # password='jianyu@python', # port=3379, # db=1 # ) r = redis.Redis(connection_pool=_pool, decode_responses=True) return r def int2long(param: int): """int 转换成 long """ return bson.int64.Int64(param) def get_current_date(date_format="%Y-%m-%d %H:%M:%S"): return datetime.datetime.now().strftime(date_format) def date_to_timestamp(date, time_format="%Y-%m-%d %H:%M:%S"): """ @summary: --------- @param date:将"2011-09-28 10:00:00"时间格式转化为时间戳 @param time_format:时间格式 --------- @result: 返回时间戳 """ if ":" in date: timestamp = time.mktime(time.strptime(date, time_format)) else: timestamp = time.mktime(time.strptime(date, "%Y-%m-%d")) return int(timestamp) def timestamp_to_date(timestamp, time_format="%Y-%m-%d %H:%M:%S"): """ @summary: --------- @param timestamp: 将时间戳转化为日期 @param time_format: 日期格式 --------- @result: 返回日期 """ if timestamp is None: raise ValueError("timestamp is null") date = time.localtime(timestamp) return time.strftime(time_format, date) def get_sha1(*args): """ @summary: 获取唯一的40位值, 用于获取唯一的id --------- @param *args: 参与联合去重的值 --------- @result: ba4868b3f277c8e387b55d9e3d0be7c045cdd89e """ sha1 = hashlib.sha1() for arg in args: sha1.update(str(arg).encode()) return sha1.hexdigest() # 40位 def get_sha256(*args): """ @summary: 获取唯一的64位值, 用于获取唯一的id --------- @param *args: 参与联合去重的值 --------- @result: 5580c91ea29bf5bd963f4c08dfcacd983566e44ecea1735102bc380576fd6f30 """ sha256 = hashlib.sha256() for arg in args: sha256.update(str(arg).encode()) return sha256.hexdigest() # 64位 def md5value(val): md5 = hashlib.md5() if isinstance(val, bytes): md5.update(str(val).encode("utf-8")) elif isinstance(val, str): md5.update(val.encode("utf-8")) return md5.hexdigest() def ensure_int64(n): """ >>> ensure_int64(None) 0 >>> ensure_float(False) 0 >>> ensure_float(12) 12 >>> ensure_float("72") 72 """ if not n: return bson.int64.Int64(0) return bson.int64.Int64(n) def get_today_of_day(day_offset=0): return str(datetime.date.today() + datetime.timedelta(days=day_offset)) def get_current_timestamp(): return int(time.time()) def add_zero(n): return "%02d" % n def sup_zero(indate): deal = indate.split(' ') head = deal[0].split('-') tail = "" if len(deal) == 2: tail = " " + deal[1] year = int(head[0]) month = int(head[1]) day = int(head[2]) fdate = datetime.datetime(year=year, month=month, day=day) formatted_date = fdate.strftime("%Y-%m-%d") + tail return formatted_date def get_days_of_month(year, month): """ 返回天数 """ return calendar.monthrange(year, month)[1] def get_year_month_and_days(month_offset=0): """ @summary: --------- @param month_offset: 月份偏移量 --------- @result: ('2019', '04', '30') """ today = datetime.datetime.now() year, month = today.year, today.month this_year = int(year) this_month = int(month) total_month = this_month + month_offset if month_offset >= 0: if total_month <= 12: days = str(get_days_of_month(this_year, total_month)) total_month = add_zero(total_month) return (year, total_month, days) else: i = total_month // 12 j = total_month % 12 if j == 0: i -= 1 j = 12 this_year += i days = str(get_days_of_month(this_year, j)) j = add_zero(j) return (str(this_year), str(j), days) else: if (total_month > 0) and (total_month < 12): days = str(get_days_of_month(this_year, total_month)) total_month = add_zero(total_month) return (year, total_month, days) else: i = total_month // 12 j = total_month % 12 if j == 0: i -= 1 j = 12 this_year += i days = str(get_days_of_month(this_year, j)) j = add_zero(j) return (str(this_year), str(j), days) def get_month(month_offset=0): """'' 获取当前日期前后N月的日期 if month_offset>0, 获取当前日期前N月的日期 if month_offset<0, 获取当前日期后N月的日期 date format = "YYYY-MM-DD" """ today = datetime.datetime.now() day = add_zero(today.day) (y, m, d) = get_year_month_and_days(month_offset) arr = (y, m, d) if int(day) < int(d): arr = (y, m, day) return "-".join("%s" % i for i in arr) def extract_file_type(file_name="附件名", file_url="附件地址", file_type_list=None): """ 抽取附件类型 Args: file_name: 附件名 file_url: 附件地址 file_type_list: 其他附件后缀 Returns: 附件类型 """ if file_type_list is None: file_type_list = [] if file_name and file_url: file_name = file_name.strip() file_types = ['zip', 'docx', 'ftp', 'pdf', 'doc', 'rar', 'gzzb', 'hzzbs', 'jpg', 'png', 'zbid', 'xls', 'xlsx', 'swp', 'dwg'] if file_type_list: ftp_list = list(map(lambda x: x.lower(), file_type_list)) file_types.extend(ftp_list) file_type = file_url.split('?')[0].split('.')[-1].lower() if file_type not in file_types: file_type = file_url.split('?')[-1].split('.')[-1].lower() if file_type in file_types: return file_type else: for ftp in file_types: file_type = re.search(ftp, file_name) or re.search("\." + ftp, file_url) if file_type: return file_type.group(0).replace('.', '') else: return file_type return None def remove_htmldata(remove_info_list: list, html: str, response): """ 过滤详情页无效数据 Args: remove_info_list: 需删除内容的xpath或文本 -> list [xpath,re,str] eg:['data:image/(.*?)"',] html: 待清洗文本 response: 原文响应体 Returns: 清洗后的文本 """ if html and remove_info_list: for extra_item in remove_info_list: if re.search('^//.*', extra_item): extra_html_list = response.xpath(extra_item).extract() for extra_html in extra_html_list: if extra_html: html = html.replace(extra_html, '') elif re.search('^.*', extra_item): extra_item = extra_item.replace('', '') extra_html_list = re.findall(f'{extra_item}', html, re.S | re.I | re.M) if extra_html_list: for exhtml in extra_html_list: html = html.replace(exhtml, '') else: extra_html = extra_item if extra_html: html = html.replace(extra_html, '') return html def text_search(content: str) -> SearchText: """ 中文检索 :param content: 文本 :return: 中文数量 """ if not content: return SearchText(0) results = re.findall('[\u4e00-\u9fa5]', content, re.S) # 列表长度即是中文的字数 return SearchText(len(results)) def clean_title(title): '''清洗标题''' if title: rule_list = [ '\(\d{1,20}\)', '\[[\u4e00-\u9fa5]{1,9}\]', '【[\u4e00-\u9fa5]{1,9}】', ] for rule in rule_list: title = re.sub(rule, '', title) return title def substitute(html_str, special=None, completely=False): """HTML 替换""" html_str = cleaner(html=html_str, special=special, completely=completely) return html_str def handle_publish_time(publishtime): '''处理发布时间''' try: time_str = get_current_date().split(' ')[-1] if ':' not in publishtime: publishtime = publishtime + ' ' + time_str else: if '00:00:00' in publishtime: publishtime = publishtime.split(' ')[0] + ' ' + time_str l_np_publishtime = int2long(date_to_timestamp(publishtime)) publishtime, l_np_publishtime = handle_publish_time_overdue(publishtime, l_np_publishtime) return publishtime, l_np_publishtime except: raise EOFError("publishtime 格式错误!") def handle_publish_time_overdue(publishtime, l_np_publishtime): """处理超期发布时间""" if l_np_publishtime and l_np_publishtime > get_current_timestamp(): logger.warning("发布时间大于当前时间,已设置当前时间为发布时间!") publishtime = get_current_date() l_np_publishtime = ensure_int64(date_to_timestamp(publishtime)) return publishtime, l_np_publishtime def handle_page_html(item): '''检测正文''' title = item.get('title') publishtime = item.get('publishtime') href = item.get('href') if href == "#": href = item.get('competehref') contenthtml = item.get('contenthtml') detail = item.get('detail') if not contenthtml: logger.warning(f"页面源码不能为空!\n 发布地址:{href}\n 发布时间:{publishtime}\n 标题:{title}") raise ValueError("无效正文!") else: if text_search(detail).total == 0: logger.warning("无内容数据,数据不入保存服务!") item['sendflag'] = "true" def check_data_validity(item): '''检测基础字段是否完整''' title = item.get('title') publishtime = item.get('publishtime') href = item.get('href') if href == "#": href = item.get('competehref') if not title or not publishtime or not href: logger.error(f"基础数据不能为空!\n 发布地址:{href}\n 发布时间:{publishtime}\n 标题:{title}") raise ValueError("基础数据异常") _fields = { 'title', 'publishtime', 'spidercode', 'infoformat', 'site', 'channel', 'area', 'city', 'jsondata', 'district', 'href', 'is_mixed', 'comeintime', 's_title', 'l_np_publishtime', 'contenthtml', 'competehref', 'detail', 'iscompete', 'sendflag', '_d', 'publishdept', 'type', 'T', 'projectinfo', 'is_theme' } def clean_fields(item, special_fields=None): special_fields = special_fields or _fields rm_fields = [] for key, val in item.items(): # 过滤非必须字段 if key not in special_fields: rm_fields.append(key) for field in rm_fields: del item[field] def join_fields(item, special_fields=None, **kwargs): special_fields = special_fields or _fields for k, v in kwargs.items(): if k in special_fields: item[k] = v else: logger.error(f"{k} 入库字段未定义!") def format_fields(item, callback=handle_publish_time, **kwargs): """ 格式化入库字段(bidding) """ clean_fields(item) if callable(callback): time_str, timestamp = callback(item.get('publishtime')) item['publishtime'] = time_str item['l_np_publishtime'] = timestamp item['detail'] = substitute(item.get('contenthtml')) item['s_title'] = item.get('s_title') or item.get('title') item['infoformat'] = 1 item['iscompete'] = True item['sendflag'] = 'false' item['_d'] = 'comeintime' item['publishdept'] = '' item['type'] = '' item['T'] = 'bidding' join_fields(item, **kwargs) handle_page_html(item) check_data_validity(item) item['comeintime'] = int2long(int(time.time())) return item def format_fields_njpc(item, callback=handle_publish_time, **kwargs): """ 格式化入库字段(拟建爬虫) """ req_fields = { 'site', 'approvenumber', 'method', 'project_scale', 'area', 'is_mixed', 'competehref', 'air_conditioner', 'funds', 'scale', 'construction_area', 'channel', 'contenthtml', 'elevator', 'building_floors', 'ownertel', 'parking', 'building', 'spidercode', 'title', 'detail', 'projectinfo', 'exterior', 'constructionunit', 'owner_info', 'approvetime', 'project_startdate', 'investment', 'heating', 'district', 'constructionunitperson', 'designunitperson', 'publishtime', 'system', 'pace', 'total', 'project_scale_info', 'passive', 'phone', 'construction', 'parking_pace', 'floors', 'freshair_system', 'other_project_scale', 'conditioner', 'wall', 'designunit', 'owneraddr', 'prefabricated_building', 'materials', 'constructionunitaddr', 'constructionunit_info', 'project_person', 'approvecontent', 'constructionunittel', 'floor', 'person', 'city', 'floor_area', 'project', 'approvestatus', 'project_completedate', 'completedate', 'ownerperson', 'sendflag', 'comeintime', 'steel_structure', 'projectaddr', 'freshair', 'T', 'startdate', 'house', 'projectname', 'exterior_wall_materials', 'other', 'passive_house', 'jsondata', 'air', 'prefabricated', 'designunit_info', 'approvedept', 'total_investment', 'infoformat', 'project_phone', 'owner', 'designunittel', 'projecttype', 'approvecode', 'steel', 'is_theme', 'designunitaddr', 'heating_method', 'href', 'projectperiod', 'structure' } clean_fields(item, special_fields=req_fields) if callable(callback): _, timestamp = callback(item.get('publishtime')) item['publishtime'] = timestamp item['detail'] = substitute(item.get('contenthtml')) item['title'] = item.get('title') or item.get('projectname') item['infoformat'] = 2 item['sendflag'] = "false" item['T'] = "bidding" join_fields(item, special_fields=req_fields, **kwargs) handle_page_html(item) check_data_validity(item) item['comeintime'] = int2long(int(time.time())) return item def search(pattern, string): result = re.search(pattern, string) if result: return result.groups()[0] def sleep_time(start_time: int, end_time=0, step=-1): time.sleep(random.random()) for i in range(start_time, end_time, step): print(f"\r *** 休眠中... {i} 秒 *** ", end='') time.sleep(1) print("\r <* 休眠结束 *> ", end='') # 装饰器 class Singleton(object): def __init__(self, cls): self._cls = cls self._instance = {} def __call__(self, *args, **kwargs): if self._cls not in self._instance: self._instance[self._cls] = self._cls(*args, **kwargs) return self._instance[self._cls] def down_load_image(proxy=None): img_url = 'https://gdgpo.czt.gd.gov.cn/freecms/verify/verifyCode.do?createTypeFlag=n' header = { "Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9", "Connection": "keep-alive", "Referer": "https://gdgpo.czt.gd.gov.cn/cms-gd/site/guangdong/qwjsy/index.html?", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", } res = requests.get(img_url, headers=header, proxies=proxy, timeout=30, verify=False) upload_address = "http://pycaptcha.spdata.jianyu360.com/v1/images/verify" content = {'file': res.content} # with open('image.jpg', 'wb+') as f: # f.write(res.content) headers = {'accept': 'application/json'} json_resp = requests.post(upload_address, headers=headers, files=content, stream=True).json() if "msg" in json_resp and "success" == json_resp["msg"]: code = json_resp["r"]["code"] if len(code) == 4: return code return None def _pack_file(file): """包装验证码格式""" if isinstance(file, str) and file.startswith("data:image"): img_file = {"file": file} elif isinstance(file, bytes): img_file = {"file": file} else: with open(file, "rb") as f: img_bytes = f.read() img_file = {"file": img_bytes} return img_file def simple_captcha(file): """ 普通验证码 @param file: 验证码 - 可以是图片或者图片base64编码 @return: """ url = "http://pycaptcha.spdata.jianyu360.com/v1/images/verify" files = _pack_file(file) r = requests.post(url, headers={"accept": "application/json"}, files=files, stream=True, timeout=10) rp_json = r.json() if "msg" in rp_json and "success" == rp_json["msg"]: return str(rp_json["r"]["code"]) return None def retry_on_exception(retries=1, timeout=1): def decorate(func): @functools.wraps(func) def warp(*args, **kwargs): for _ in range(retries): try: return func(*args, **kwargs) except Exception as e: print(f"执行[{func.__name__}]失败, args:{args}, kwargs:{kwargs} 异常:{e}") time.sleep(timeout) raise RuntimeError(f"执行[{func.__name__}]达到最大重试次数") return warp return decorate class PySpiderError(Exception): def __init__(self, *args, **kwargs): if 'code' not in kwargs and 'reason' not in kwargs: kwargs['code'] = 10000 kwargs['reason'] = '未知爬虫错误,请手动处理' for key, val in kwargs.items(): setattr(self, key, val) super(PySpiderError, self).__init__(*args, kwargs) class AttachmentNullError(PySpiderError): def __init__(self, code: int = 10004, reason: str = '附件下载异常'): super(AttachmentNullError, self).__init__(code=code, reason=reason) class CustomError(Exception): def __init__(self, ErrorInfo): self.ErrorInfo = ErrorInfo def __str__(self): return self.ErrorInfo format_fileds = format_fields format_fileds_njpc = format_fields_njpc