import re
from html import unescape
from urllib.parse import urlencode, urljoin
from bs4 import BeautifulSoup
from lxml.html import etree, HtmlElement, fromstring, tostring
from urllib3 import get_host
from common.log import logger
from crawler.defaults import (
USELESS_TAG,
USELESS_ATTR,
TAGS_CAN_BE_REMOVE_IF_EMPTY,
KEYWORDS,
)
def err_details(worker):
worker_exception = worker.exception()
if worker_exception:
logger.exception("Worker return exception: {}".format(worker_exception))
return worker
def extract_host(url):
"""
# >>> base_url = extract_host('http://192.168.3.207:8080/')
"""
_s, _h, _p = get_host(url)
return f"{_s}://{_h}/" if _p is None else f"{_s}://{_h}:{_p}/"
def split_domain(val: str):
if re.match(r'\d+', val) is None:
return re.split(r'[\\.:]', val)
return [val]
def extract_domain(url):
"""
# >>> base_url = extract_domain('http://192.168.3.207:8080/')
"""
_, host, port = get_host(url)
return f"{host}" if port is None else f"{host}:{port}"
def extract_page_title(source):
element = html2element(source)
nodes = element.xpath('/html/head/title/text()|//title/text()')
if len(nodes) > 1:
return "".join(";".join(nodes).split())
return "".join("".join(nodes).split())
def is_url(url):
"""判断url格式畸形与否"""
_regex = re.compile(
r'^(?:http|ftp)s?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
return re.match(_regex, url) is not None
def is_domain(domain):
_regex = re.compile(
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?', re.IGNORECASE)
return re.match(_regex, domain) is not None
def label_split(val):
# '~`!#$%^&*()_+-=|\';"":/.,?><~·!@#¥%……&*()——+-=“:’;、。,?》{《}】【\n\]\[ '
result = re.split(r'[- _,,\\.|-「」【】??!!/、] *', val)
result = [v for v in result if len(v) > 0]
return result
def get_url(url: str, parameters: dict):
"""
拼接url与所带参数
:param url: 链接
:param parameters: 参数
:return: 拼接后的url
"""
_data = '?' + urlencode(parameters)
return urljoin(url, _data)
def clean_html(source: str):
html_str = re.sub(r'', '', source)
html_str = re.sub(r'|]*>|', '', html_str)
html_str = re.sub(r'
[\s\S]*?', '', html_str)
html_str = re.sub(r'|', '', html_str)
html_str = re.sub(r'', '', html_str)
html_str = re.sub(r']*>[\s\S]*?', '', html_str)
html_str = re.sub(r']*>', '', html_str)
return html_str
def extract_text(source: str):
soup = BeautifulSoup(source, "lxml")
return soup.get_text()
def verify_text(val: str, length=50):
"""检查数字、字母、中文的个数"""
if val is None:
return False
sub_pattern = ['<[^>]+>', '[^0-9a-zA-Z\u4e00-\u9fa5]+']
for pattern in sub_pattern:
val = re.sub(pattern, '', val)
# 若文本长度小于指定文本长度(length),表示页面内容无详情内容
if len(val) < length:
'''无效文本'''
return False
'''有效文本'''
return True
def element2html(element: HtmlElement) -> str:
return unescape(tostring(element, encoding="utf-8").decode())
def html2element(source: str, base_url=None) -> HtmlElement:
html_str = re.sub('\ufeff|\xa0|\u3000|\x00', '', source)
html_str = re.sub('', '', html_str) # 清除注释
html_str = re.sub(r'', '', html_str) # 清除样式
html_str = re.sub(r'', '', html_str) # 清除js
html_str = re.sub('?br.*?>', '', html_str)
html_str = re.sub(r'<\?xml.*?>', '', html_str)
html_str = re.sub(r'<[!]DOCTYPE.*?>', '', html_str)
return fromstring(html_str, base_url=base_url)
def iter_node(element: HtmlElement, depth=1):
yield element, depth
depth += 1
for sub_element in element:
if isinstance(sub_element, HtmlElement):
yield from iter_node(sub_element, depth)
# print('退出', depth)
def remove_node(node: HtmlElement):
"""
this is a in-place operation, not necessary to return
:param node:
:return:
"""
parent = node.getparent()
if parent is not None:
node.drop_tree()
# parent.remove(node)
def drop_tag(node: HtmlElement):
"""
only delete the tag, but merge its text to parent.
:param node:
:return:
"""
parent = node.getparent()
if parent is not None:
node.drop_tag()
def is_empty_element(node: HtmlElement):
return not node.getchildren() and not node.text
def normalize_node(element: HtmlElement):
etree.strip_elements(element, *USELESS_TAG, with_tail=False)
# 节点预处理,删除节点与更新节点的操作在同一循环发生时,更新节点的操作不会生效,原因:?
# 空节点合并、噪声节点剔除
for node, _ in iter_node(element):
if node.tag.lower() in TAGS_CAN_BE_REMOVE_IF_EMPTY and is_empty_element(node):
remove_node(node)
if node.tag.lower() == 'p':
etree.strip_tags(node, 'span')
etree.strip_tags(node, 'strong')
# if a div tag does not contain any sub node, it could be converted to p node.
if node.tag.lower() == 'div' and not node.getchildren():
node.tag = 'p'
if node.tag.lower() == 'span' and not node.getchildren():
node.tag = 'p'
# remove empty p tag
if node.tag.lower() == 'p' and not node.xpath('.//img'):
if not (node.text and node.text.strip()):
drop_tag(node)
# Delete inline styles
style = node.get('style')
if style:
del node.attrib['style']
# 删除包含干扰属性的节点(完全匹配)
for node, _ in iter_node(element):
attr = (node.get('id') or node.get('class'))
if attr:
if attr.lower() in USELESS_ATTR:
remove_node(node)
break
# # 删除无效节点(模糊匹配)
# for node, _ in iter_node(element):
# attrib = (node.get('id') or node.get('class'))
# if attrib:
# for attr in USELESS_ATTR:
# if re.match(attr, attrib.lower()) is not None:
# remove_node(node)
# break
def pre_parse(element):
normalize_node(element)
return element
def is_title(val: str):
"""检查数字、字母、中文的个数"""
for keyword in KEYWORDS:
search = re.search(keyword, val)
if search is not None:
return True
return False