123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295 |
- import io
- import operator
- import re
- import zlib
- from html import unescape
- from urllib.parse import urlencode, urljoin
- import tldextract
- from bs4 import BeautifulSoup
- from lxml.html import etree, HtmlElement, fromstring, tostring
- from urllib3 import get_host
- from common.log import logger
- from crawler.defaults import (
- USELESS_TAG,
- USELESS_ATTR,
- TAGS_CAN_BE_REMOVE_IF_EMPTY,
- VALID_WORDS,
- VOID_WORDS,
- PAGE_TEXT_CHECK_WORDS,
- PAGE_TEXT_FILTER_WORDS
- )
- from predict_bidding_model import exists_ztb
- def err_details(worker):
- worker_exception = worker.exception()
- if worker_exception:
- logger.exception("Worker return exception: {}".format(worker_exception))
- return worker
- def split_domain(val: str):
- if re.match(r'\d+', val) is None:
- return re.split(r'[\\.:]', val)
- return [val]
- def extract_host(url):
- """
- # >>> base_url = extract_host('http://192.168.3.207:8080/')
- """
- _s, _h, _p = get_host(url)
- return f"{_s}://{_h}/" if _p is None else f"{_s}://{_h}:{_p}/"
- def extract_domain(url):
- """
- 抽取一级域名,使用点连接域和后缀字段(如果提供的域名是ipv4,就返回ipv4;)
- # >>> extract_domain('http://192.168.3.207:8080/')
- 192.168.3.207
- # >>> extract_domain('http://forums.bbc.co.uk')
- 'bbc.co.uk'
- """
- ext = tldextract.extract(url)
- return ext.registered_domain or ext.ipv4
- def extract_fqdn(url):
- """返回一个完全限定的域名"""
- ext = tldextract.extract(url)
- return ext.fqdn or ext.ipv4
- def extract_page_title(source):
- node = ''
- try:
- element = html2element(source)
- node = element.xpath('/html/head/title/text()|//title/text()')
- except etree.ParserError:
- pass
- if len(node) > 1:
- return "".join(";".join(node).split())
- return "".join("".join(node).split())
- def is_url(url):
- """判断url格式"""
- _regex = re.compile(
- r'^(?:http|ftp)s?://' # http:// or https://
- r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
- r'localhost|' # localhost...
- r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
- r'(?::\d+)?' # optional port
- r'(?:/?|[/?]\S+)$', re.IGNORECASE)
- return re.match(_regex, url) is not None
- def is_contains(val: str, feature: str):
- if operator.contains(val, feature):
- return True
- return False
- def is_domain(domain):
- ext = tldextract.extract(domain)
- if not ext.domain:
- return False
- return True
- def label_split(val):
- # '~`!#$%^&*()_+-=|\';"":/.,?><~·!@#¥%……&*()——+-=“:’;、。,?》{《}】【\n\]\[ '
- result = re.split(r'[- _,,\\.|-「」【】??!!/、] *', val)
- result = [v for v in result if len(v) > 0]
- return result
- def join_url(url: str, parameters: dict):
- """
- 拼接url与所带参数
- :param url: 链接
- :param parameters: 参数
- :return: 拼接后的url
- """
- _data = '?' + urlencode(parameters)
- return urljoin(url, _data)
- def extract_text(source: str):
- soup = BeautifulSoup(source, "lxml")
- return soup.get_text()
- def verify_text(val: str, length=50):
- """检查数字、字母、中文的个数"""
- if val is None:
- return False
- sub_pattern = ['<[^>]+>', '[^0-9a-zA-Z\u4e00-\u9fa5]+']
- for pattern in sub_pattern:
- val = re.sub(pattern, '', val)
- # 若文本长度小于指定文本长度(length),表示页面内容无详情内容
- if len(val) < length:
- '''无效文本'''
- return False
- '''有效文本'''
- return True
- def clean_whitespace(text: str):
- """清洗空白符 \n=换行 \r=回车 \v=垂直制表符 \f=换页"""
- obj = io.StringIO()
- for i in text:
- # 不要剔除 "" 和 "\t" 空白符,保持页面标签名称与属性分离
- if i not in '\n\r\v\f':
- obj.write(i)
- return obj.getvalue()
- def clean_html(source):
- html_str = re.sub('\ufeff|\xa0|\u3000|\x00', '', source)
- html_str = re.sub('<!--[\s\S]*?-->', '', html_str) # 清除注释
- html_str = re.sub(r'<style[^<>]*>[\s\S]*?</style>', '', html_str) # 清除样式
- html_str = re.sub(r'<script[^<>]*>[\s\S]*?</script>', '', html_str) # 清除js
- html_str = re.sub('</?br.*?>', '', html_str)
- html_str = re.sub(r'<\?xml.*?>', '', html_str)
- html_str = re.sub(r'<[!]DOCTYPE.*?>', '', html_str)
- html_str = clean_whitespace(html_str)
- return html_str
- def html2element(source: str, base_url=None) -> HtmlElement:
- html_str = clean_html(source)
- if len(html_str) == 0:
- # 防止清洗页面垃圾标签,导致fromstring抛出lxml.etree.ParserError
- html_str = '''<html lang="en"></html>'''
- return fromstring(html_str, base_url=base_url)
- def element2html(element: HtmlElement) -> str:
- return unescape(tostring(element, encoding="utf-8").decode())
- def iter_node(element: HtmlElement, depth=1):
- yield element, depth
- depth += 1
- for sub_element in element:
- if isinstance(sub_element, HtmlElement):
- yield from iter_node(sub_element, depth)
- # print('退出', depth)
- def remove_node(node: HtmlElement):
- """
- this is a in-place operation, not necessary to return
- :param node:
- :return:
- """
- parent = node.getparent()
- if parent is not None:
- node.drop_tree()
- # parent.remove(node)
- def drop_tag(node: HtmlElement):
- """
- only delete the tag, but merge its text to parent.
- :param node:
- :return:
- """
- parent = node.getparent()
- if parent is not None:
- node.drop_tag()
- def is_empty_element(node: HtmlElement):
- return not node.getchildren() and not node.text
- def normalize_node(element: HtmlElement):
- etree.strip_elements(element, *USELESS_TAG, with_tail=False)
- # 节点预处理,删除节点与更新节点的操作在同一循环发生时,更新节点的操作不会生效,原因:?
- # 空节点合并、噪声节点剔除
- for node, _ in iter_node(element):
- if node.tag.lower() in TAGS_CAN_BE_REMOVE_IF_EMPTY and is_empty_element(node):
- remove_node(node)
- if node.tag.lower() == 'p':
- etree.strip_tags(node, 'span')
- etree.strip_tags(node, 'strong')
- # if a div tag does not contain any sub node, it could be converted to p node.
- if node.tag.lower() == 'div' and not node.getchildren():
- node.tag = 'p'
- if node.tag.lower() == 'span' and not node.getchildren():
- node.tag = 'p'
- # remove empty p tag
- if node.tag.lower() == 'p' and not node.xpath('.//img'):
- if not (node.text and node.text.strip()):
- drop_tag(node)
- # Delete inline styles
- style = node.get('style')
- if style:
- del node.attrib['style']
- # Obsolete scroll property
- if node.tag.lower() == 'marquee':
- remove_node(node)
- # 删除包含干扰属性的节点(完全匹配)
- for node, _ in iter_node(element):
- attr = (node.get('id') or node.get('class'))
- if attr:
- if attr.lower() in USELESS_ATTR:
- remove_node(node)
- break
- def pre_parse(element):
- normalize_node(element)
- return element
- def check_text_by_words(val: str):
- for word in VOID_WORDS:
- search = re.search(word, val)
- if search is not None:
- return False
- for keyword in VALID_WORDS:
- search = re.search(keyword, val)
- if search is not None:
- return True
- return False
- def check_page_by_words(val: str):
- if 7 < len(val) < 100:
- for word in PAGE_TEXT_FILTER_WORDS:
- search = re.search(word, val)
- if search is not None:
- return False
- for keyword in PAGE_TEXT_CHECK_WORDS:
- search = re.search(keyword, val)
- if search is not None:
- return True
- return False
- def predict_bidding_model(item: dict):
- result = {**item}
- predict_result = exists_ztb(item)
- predict = any({v for _, v in predict_result.items()})
- result['predict'] = int(predict)
- return result
- def compress_str(content, level=9):
- return zlib.compress(content.encode(encoding='utf-8'), level=level)
|