import hashlib import json import re from collections import namedtuple import requests from setting import WECHAT_WARNING_URL,WECHAT_WARNING_PHONE,WARNING_INTERVAL,WECHAT_WARNING_ALL import bson from feapder.utils.log import log from feapder.db.mongodb import MongoDB from .cleaner import cleaner import sys SearchText = namedtuple('SearchText', ['total']) def substitute(html_str,special=None, completely=False): """HTML 替换""" html_str = cleaner(html=html_str,special=None, completely=False) return html_str def get_signature(content: str) -> str: """ 十六进制数字字符串形式摘要值 @param content: 字符串文本 @return: 摘要值 """ sha1 = hashlib.sha1() sha1.update(content.encode("utf-8")) return sha1.hexdigest() def text_search(content: str) -> SearchText: """ 中文检索 :param content: 文本 :return: 中文数量 """ if not content: return SearchText(0) results = re.findall('[\u4e00-\u9fa5]', content, re.S) # 列表长度即是中文的字数 return SearchText(len(results)) def int2long(param: int): """int 转换成 long """ return bson.int64.Int64(param) def get_spiders(menus): db = MongoDB(db="editor") for menu in menus: spider_info = db.find('luaconfig',{"code":menu.code}) if len(spider_info) >0: if spider_info[0].get("state") not in (11,): menus.remove(menu) def wechat_warning( message, message_prefix=None, rate_limit=None, url=None, user_phone=None, all_users: bool = None, ): """企业微信报警""" # 为了加载最新的配置 rate_limit = rate_limit if rate_limit is not None else WARNING_INTERVAL url = url or WECHAT_WARNING_URL user_phone = user_phone or WECHAT_WARNING_PHONE all_users = all_users if all_users is not None else WECHAT_WARNING_ALL if isinstance(user_phone, str): user_phone = [user_phone] if user_phone else [] if all_users is True or not user_phone: user_phone = ["@all"] if not all([url, message]): return data = { "msgtype": "text", "text": {"content": message, "mentioned_mobile_list": user_phone}, } headers = {"Content-Type": "application/json"} try: response = requests.post( url, headers=headers, data=json.dumps(data).encode("utf8") ) result = response.json() response.close() if result.get("errcode") == 0: return True else: raise Exception(result.get("errmsg")) except Exception as e: log.error("报警发送失败。 报警内容 {}, error: {}".format(message, e)) return False class JyBasicException(Exception): def __init__(self, code: int, reason: str, **kwargs): self.code = code self.reason = reason self.err_details = kwargs for key, val in kwargs.items(): setattr(self, key, val) class CustomCheckError(JyBasicException): def __init__(self, code: int = 10002, reason: str = '特征条件检查异常', **kwargs): self.code = code self.reason = reason self.err_details = kwargs for key, val in kwargs.items(): setattr(self, key, val) class HtmlEmptyError(JyBasicException): def __init__(self, code: int = 10002, reason: str = '正文获取异常,正文为空', **kwargs): self.code = code self.reason = reason self.err_details = kwargs for key, val in kwargs.items(): setattr(self, key, val) class CheckPrePareRequest: def __init__(self): self.crawl_keywords = { '招标', '流标', '评标', '询价', '中标候选人', '抽签', '谈判', '中选', '意见征询', '更正公告', '废标', '补遗', '议价', '邀请', '资格预审', '竞标', '变更', '遴选', '磋商', '项目', '评审', '询比', '开标', '澄清', '比选', '中止', '采购', '竟价', '招投标', '拟建', '成交', '中标', '竞争性谈判', '工程', '验收公告', '更正', '单一来源', '变更公告', '合同', '违规', '评判', '监理', '竞价', '答疑', '终止', '系统' } def check_crawl_title(self, title: str): for keyword in self.crawl_keywords: valid_keyword = re.search(keyword, title) if valid_keyword is not None: break else: # raise CustomCheckError(code=10106, reason='标题未检索到采集关键词', title=title) return 10106,'标题未检索到采集关键词' return 200,'ok' def __check(self, rows: dict): title, publish_time = rows['title'], rows['l_np_publishtime'] self.check_crawl_title(title) def __call__(self, rows: dict, *args, **kwargs): self.__check(rows) def get_proxy(): headers = { "Authorization": "Basic amlhbnl1MDAxOjEyM3F3ZSFB" } proxy = requests.get("http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch", headers=headers).json() print(f"切换代理:{proxy.get('data')}") return proxy.get("data").get("http") import json class Obj(object): def __init__(self, dict_): self.__dict__.update(dict_) def get_argvs(): argvs = {"next_page":False,"max_page":10} for item in sys.argv[1:]: print(item) if item.startswith("--"): argvs[item.replace("--", "").split('=')[0]] = int(item.split('=')[-1]) return json.loads(json.dumps(argvs), object_hook=Obj) def search(pattern, string): result = re.search(pattern, string) if result: return result.groups()[0] def search_construction(string): result = re.search('pattern', string) if result: return result.groups()[0] def search_floor(string): result = re.search('pattern', string) if result: return result.groups()[0]