123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- import hashlib
- import json
- import re
- from collections import namedtuple
- import requests
- from setting import WECHAT_WARNING_URL,WECHAT_WARNING_PHONE,WARNING_INTERVAL,WECHAT_WARNING_ALL
- import bson
- from feapder.utils.log import log
- from feapder.db.mongodb import MongoDB
- from .cleaner import cleaner
- import sys
- SearchText = namedtuple('SearchText', ['total'])
- def substitute(html_str,special=None, completely=False):
- """HTML 替换"""
- html_str = cleaner(html=html_str,special=None, completely=False)
- return html_str
- def get_signature(content: str) -> str:
- """
- 十六进制数字字符串形式摘要值
- @param content: 字符串文本
- @return: 摘要值
- """
- sha1 = hashlib.sha1()
- sha1.update(content.encode("utf-8"))
- return sha1.hexdigest()
- def text_search(content: str) -> SearchText:
- """
- 中文检索
- :param content: 文本
- :return: 中文数量
- """
- if not content:
- return SearchText(0)
- results = re.findall('[\u4e00-\u9fa5]', content, re.S)
- # 列表长度即是中文的字数
- return SearchText(len(results))
- def int2long(param: int):
- """int 转换成 long """
- return bson.int64.Int64(param)
- def get_spiders(menus):
- db = MongoDB(db="editor")
- for menu in menus:
- spider_info = db.find('luaconfig',{"code":menu.code})
- if len(spider_info) >0:
- if spider_info[0].get("state") not in (11,):
- menus.remove(menu)
- def wechat_warning(
- message,
- message_prefix=None,
- rate_limit=None,
- url=None,
- user_phone=None,
- all_users: bool = None,
- ):
- """企业微信报警"""
- # 为了加载最新的配置
- rate_limit = rate_limit if rate_limit is not None else WARNING_INTERVAL
- url = url or WECHAT_WARNING_URL
- user_phone = user_phone or WECHAT_WARNING_PHONE
- all_users = all_users if all_users is not None else WECHAT_WARNING_ALL
- if isinstance(user_phone, str):
- user_phone = [user_phone] if user_phone else []
- if all_users is True or not user_phone:
- user_phone = ["@all"]
- if not all([url, message]):
- return
- data = {
- "msgtype": "text",
- "text": {"content": message, "mentioned_mobile_list": user_phone},
- }
- headers = {"Content-Type": "application/json"}
- try:
- response = requests.post(
- url, headers=headers, data=json.dumps(data).encode("utf8")
- )
- result = response.json()
- response.close()
- if result.get("errcode") == 0:
- return True
- else:
- raise Exception(result.get("errmsg"))
- except Exception as e:
- log.error("报警发送失败。 报警内容 {}, error: {}".format(message, e))
- return False
- class JyBasicException(Exception):
- def __init__(self, code: int, reason: str, **kwargs):
- self.code = code
- self.reason = reason
- self.err_details = kwargs
- for key, val in kwargs.items():
- setattr(self, key, val)
- class CustomCheckError(JyBasicException):
- def __init__(self, code: int = 10002, reason: str = '特征条件检查异常', **kwargs):
- self.code = code
- self.reason = reason
- self.err_details = kwargs
- for key, val in kwargs.items():
- setattr(self, key, val)
- class HtmlEmptyError(JyBasicException):
- def __init__(self, code: int = 10002, reason: str = '正文获取异常,正文为空', **kwargs):
- self.code = code
- self.reason = reason
- self.err_details = kwargs
- for key, val in kwargs.items():
- setattr(self, key, val)
- class CheckPrePareRequest:
- def __init__(self):
- self.crawl_keywords = {
- '招标', '流标', '评标', '询价', '中标候选人', '抽签', '谈判', '中选', '意见征询',
- '更正公告', '废标', '补遗', '议价', '邀请', '资格预审', '竞标', '变更', '遴选',
- '磋商', '项目', '评审', '询比', '开标', '澄清', '比选', '中止', '采购', '竟价',
- '招投标', '拟建', '成交', '中标', '竞争性谈判', '工程', '验收公告', '更正',
- '单一来源', '变更公告', '合同', '违规', '评判', '监理', '竞价', '答疑',
- '终止', '系统'
- }
- def check_crawl_title(self, title: str):
- for keyword in self.crawl_keywords:
- valid_keyword = re.search(keyword, title)
- if valid_keyword is not None:
- break
- else:
- # raise CustomCheckError(code=10106, reason='标题未检索到采集关键词', title=title)
- return 10106,'标题未检索到采集关键词'
- return 200,'ok'
- def __check(self, rows: dict):
- title, publish_time = rows['title'], rows['l_np_publishtime']
- self.check_crawl_title(title)
- def __call__(self, rows: dict, *args, **kwargs):
- self.__check(rows)
- def get_proxy():
- headers = {
- "Authorization": "Basic amlhbnl1MDAxOjEyM3F3ZSFB"
- }
- proxy = requests.get("http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch", headers=headers).json()
- print(f"切换代理:{proxy.get('data')}")
- return proxy.get("data").get("http")
- import json
- class Obj(object):
- def __init__(self, dict_):
- self.__dict__.update(dict_)
- def get_argvs():
- argvs = {"next_page":False,"max_page":10}
- for item in sys.argv[1:]:
- print(item)
- if item.startswith("--"):
- argvs[item.replace("--", "").split('=')[0]] = int(item.split('=')[-1])
- return json.loads(json.dumps(argvs), object_hook=Obj)
- def search(pattern, string):
- result = re.search(pattern, string)
- if result:
- return result.groups()[0]
- def search_construction(string):
- result = re.search('pattern', string)
- if result:
- return result.groups()[0]
- def search_floor(string):
- result = re.search('pattern', string)
- if result:
- return result.groups()[0]
|