import hashlib import json import re from collections import namedtuple import requests from setting import WECHAT_WARNING_URL,WECHAT_WARNING_PHONE,WARNING_INTERVAL,WECHAT_WARNING_ALL import bson from feapder.utils.log import log from feapder.db.mongodb import MongoDB SearchText = namedtuple('SearchText', ['total']) def substitute(html_str): """HTML 替换""" patterns = { '': '', '"': "'", '\n': '', '\xa0': "", '': '', ' ': '', '': '', '': '', '': '
', '

': '
', '
': '
', '
': '
', '
': '
', '': '
', '': '', '': '', '': '', '': '', '': '', '': '', '': '', '': '
', '': '', 'style=".*?"': '', "style='.*?'": '', 'class=".*?"': '', "class='.*?'": '', "align='.*?'": '', 'align=".*?"': '', 'border=".*?"': '', "border='.*?'": '', 'cellpadding=".*?"': '', "cellpadding='.*?'": '', 'cellspacing=".*?"': '', "cellspacing='.*?'": '', 'center=".*?"': '', "center='.*?'": '', 'width=".*?"': '', "width='.*?'": '', "bordercolor='.*?'": '', 'bgcolor=".*?"': '', 'BORDERCOLOR=".*?"': '', '': '', '': '', '': '', '': '', '': '', '': '', '': '', '': '', '': '', '': '', '': '', '': '', '【关闭】': '', '【打印】': '', 'function .*?() ': '', 'var .*?;': '', 'if .*?\)': '', '{[^{}]+}': '', '{.*?}': '', } def substitutes(k, v, c): return re.sub(k, v, c) for k, v in patterns.items(): html_str = re.sub(k, v, substitutes(k, v, html_str), re.S, re.M) return html_str def get_signature(content: str) -> str: """ 十六进制数字字符串形式摘要值 @param content: 字符串文本 @return: 摘要值 """ sha1 = hashlib.sha1() sha1.update(content.encode("utf-8")) return sha1.hexdigest() def text_search(content: str) -> SearchText: """ 中文检索 :param content: 文本 :return: 中文数量 """ if not content: return SearchText(0) results = re.findall('[\u4e00-\u9fa5]', content, re.S) # 列表长度即是中文的字数 return SearchText(len(results)) def int2long(param: int): """int 转换成 long """ return bson.int64.Int64(param) def get_spiders(menus): db = MongoDB(db="editor") for menu in menus: spider_info = db.find('luaconfig',{"code":menu.code}) if len(spider_info) >0: if spider_info[0].get("state") not in (11,): menus.remove(menu) def wechat_warning( message, message_prefix=None, rate_limit=None, url=None, user_phone=None, all_users: bool = None, ): """企业微信报警""" # 为了加载最新的配置 rate_limit = rate_limit if rate_limit is not None else WARNING_INTERVAL url = url or WECHAT_WARNING_URL user_phone = user_phone or WECHAT_WARNING_PHONE all_users = all_users if all_users is not None else WECHAT_WARNING_ALL if isinstance(user_phone, str): user_phone = [user_phone] if user_phone else [] if all_users is True or not user_phone: user_phone = ["@all"] if not all([url, message]): return data = { "msgtype": "text", "text": {"content": message, "mentioned_mobile_list": user_phone}, } headers = {"Content-Type": "application/json"} try: response = requests.post( url, headers=headers, data=json.dumps(data).encode("utf8") ) result = response.json() response.close() if result.get("errcode") == 0: return True else: raise Exception(result.get("errmsg")) except Exception as e: log.error("报警发送失败。 报警内容 {}, error: {}".format(message, e)) return False class JyBasicException(Exception): def __init__(self, code: int, reason: str, **kwargs): self.code = code self.reason = reason self.err_details = kwargs for key, val in kwargs.items(): setattr(self, key, val) class CustomCheckError(JyBasicException): def __init__(self, code: int = 10002, reason: str = '特征条件检查异常', **kwargs): self.code = code self.reason = reason self.err_details = kwargs for key, val in kwargs.items(): setattr(self, key, val) class CheckPrePareRequest: def __init__(self): self.crawl_keywords = { '招标', '流标', '评标', '询价', '中标候选人', '抽签', '谈判', '中选', '意见征询', '更正公告', '废标', '补遗', '议价', '邀请', '资格预审', '竞标', '变更', '遴选', '磋商', '项目', '评审', '询比', '开标', '澄清', '比选', '中止', '采购', '竟价', '招投标', '拟建', '成交', '中标', '竞争性谈判', '工程', '验收公告', '更正', '单一来源', '变更公告', '合同', '违规', '评判', '监理', '竞价', '答疑', '终止', '系统' } @staticmethod def check_es_cache(title: str, publish_time: int, rows: dict): """ :param title: 标题 :param publish_time: 发布时间的时间戳(l_np_publishtime) :param rows: 采集内容 """ pass # retrieved_result = es_query(title, publish_time) # if retrieved_result != 0: # '''es查询数据结果''' # rows['count'] = retrieved_result # raise CustomCheckError(code=10105, reason='标题内容已存在es') def check_crawl_title(self, title: str): for keyword in self.crawl_keywords: valid_keyword = re.search(keyword, title) if valid_keyword is not None: break else: raise CustomCheckError(code=10106, reason='标题未检索到采集关键词', title=title) def __check(self, rows: dict): title, publish_time = rows['title'], rows['l_np_publishtime'] self.check_crawl_title(title) self.check_es_cache(title, publish_time, rows) def __call__(self, rows: dict, *args, **kwargs): self.__check(rows)