import random
import time
import re
import requests.exceptions
from lxml.html import fromstring, HtmlElement, tostring
from lxml.html.clean import Cleaner
from crawler.check_utils import CheckText, CheckTask
from crawler.clean_html import cleaner
from crawler.crawl_scheduler import Scheduler
from crawler.login import login, load_login_cookies, login_check
from utils.databases import mongo_table, int2long
from utils.execptions import YbwCrawlError
from utils.log import logger
def iter_node(element: HtmlElement):
yield element
for sub_element in element:
if isinstance(sub_element, HtmlElement):
yield from iter_node(sub_element)
def pre_parse(element: HtmlElement):
"""对 HTML 进行预处理可能会破坏 HTML 原有的结构,导致根据原始 HTML 编写的 XPath 不可用"""
pre_remove = {
'log_col2', 'log_col1', 'cz', 'iconfont closei', 'p2 p1', 'cnxh_b',
'msg_error', 'r_gg TB-focus', 'april', 'cont2', 'to_login', 'regtxt',
'shouchang an_n sc', 'april_title red', 'cn_lt', 'dayin an_n',
'dl_zc vip_t free_member', 'rmbq', 'login-form cl', 'dian_g fr',
'di_n', 'd_fx', 'd_tub', 'd_dy', 'anniu1', 'cnxh_list', 'btns cl',
'active', 'close', 'd_an fr', 'avatar', 'toolbar', 'deng_l',
'cen_right fr', 'log_col5', 'agreement', 'log_col3',
'shouchang_af an_n sc_after', 'fast_box', 'di_nr fl', 'xgfj', 'dianh',
'cnxh_list tab_b2 city_list', 'contract cl', 'zb_cen_r fr', 'd_zsms',
'sc_after active', 'dl_k', 'ewm_b', 'fl', 'wypj', 'rukou', 'p1',
'dl_zc', 'success', 'daoh h_30', 'bd', 'april_content', 'print',
'foot', 'cnxh zbgg', 'april_first', 'fastlog', 'tx_mc user_name',
'tab_h2', 'fanding an_n', 'toux', 'log_col4 cl', 'hangy rem_1', 'red',
'regshadow', 'bottom', 'dl_zc vip_t fee_member', 'xszn fl', 'no-print',
'cnxh_b zbgg_b', 'rem rem_1', 'logshadowz', 'd_pj fl', 'tjgjc',
'spdujaiwlohh', 'di_ewm fr', 'dian_h fl',
'tab_h2 zbgg_b_gray', 'fanshou an_n fs', 'login-btn', 'fl gjc',
'agreeshadow', 'guang_db', 'footer_1', 'log_p', 'cnxh_list tab_b2',
'd_sw', 'april_close', 'd_sc', 'erweima no-print', 'qgzx', 'p2', 'sc',
'hd', 'log_col6', 'dh_b', 'dian_guang', 'zhu_c', 'ck cai_k', 'april_box',
'display:none'
}
for node in iter_node(element):
id_attr = node.attrib.get('id')
class_attr = node.attrib.get('class')
style_attr = node.attrib.get('style')
if any([id_attr in pre_remove,
class_attr in pre_remove,
style_attr in pre_remove]):
node.drop_tree()
return element
def page_source(element: HtmlElement):
clear = Cleaner(
forms=False,
style=True
)
return clear.clean_html(tostring(element, encoding="utf-8").decode())
class DetailSpider:
def __init__(
self,
db: str,
crawl_tab: str,
save_tab: str,
):
self.crawl_tab = mongo_table(db, crawl_tab)
self.save_tab = mongo_table(db, save_tab)
self.save_url = mongo_table("editor", "source_url")
self.user = None
def _update_crawl_task(self, tid, **kwargs):
self.crawl_tab.update_one({'_id': tid}, {'$set': kwargs})
def _lock_task(self, task: dict):
update = {'crawl': True}
self._update_crawl_task(task['_id'], **update)
def _release_task(self, task: dict):
update = {'crawl': False}
self._update_crawl_task(task['_id'], **update)
def json_request(self, fid, request_params):
url = "https://www.chinabidding.cn/agency.info.Detail/show"
params = {
"fid": f"{fid}"
}
res = requests.get(url, params=params, **request_params)
return res
def crawl_request(self, item: dict):
url = item['competehref']
headers = {
'Host': 'www.chinabidding.cn',
'sec-ch-ua': '" Not;A Brand";v="99", "Google Chrome";v="97", "Chromium";v="97"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-User': '?1',
'Sec-Fetch-Dest': 'document',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
}
request_params = {}
request_params.setdefault('headers', headers)
request_params.setdefault('timeout', 60)
retries = 0
retries_502, max_retries_502 = 0, 15
proxy, proxies = None, None
while retries < 3:
if retries_502 > max_retries_502:
# 网站已移除该数据
self._update_crawl_task(item['_id'], crawl_status='remove')
break
login_cookies = load_login_cookies(self.user.phone)
if login_cookies is None:
login(*self.user)
continue
elif 'cookies' not in request_params:
request_params.setdefault('cookies', login_cookies)
else:
request_params.update({'cookies': login_cookies})
fid = "".join(re.findall('\?fid=(.*)',url))
if fid:
try:
r = self.json_request(fid, request_params)
# 账号登录状态检查
retry_login = login_check(self.user.phone, url, False)
if retry_login:
logger.info(f"[重新登录]{self.user.phone}")
_, code = login(*self.user, proxies=proxies)
if code == 200:
retries += 1
else:
time.sleep(1800)
retries += 1
continue
logger.info(f'[采集正文] fid_{fid}')
return r
except:
retries += 1
continue
else:
try:
r = requests.get(url, **request_params)
# 账号登录状态检查
retry_login = login_check(self.user.phone, url, False)
if retry_login:
logger.info(f"[重新登录]{self.user.phone}")
_, code = login(*self.user, proxies=proxies)
if code == 200:
retries += 1
else:
time.sleep(1800)
retries += 1
continue
element = fromstring(r.text)
nodes = element.xpath('//*[@id="main_dom"]/div[1]')
if len(nodes) != 1:
retries_502 += 1
logger.debug(f'"main_dom"属性匹配个数:{len(nodes)}, {r.status_code} - {url}')
continue
else:
node = nodes[0]
logger.info(f'[采集正文] id={node.attrib.get("id")}')
return r
except requests.RequestException:
retries += 1
continue
return None
def crawl_response(self, response, item):
if re.match('^\{', response.text):
html = response.json().get('c_info').get('content')
else:
element: HtmlElement = fromstring(response.text)
node = element.xpath('//*[@id="infoDescription"]')[0]
node = pre_parse(node)
features = {
'./div[@class="ckgys_cont"]',
'.//div[@class="detail-title ng-scope"]',
'.//table[@class="detail_Table"]',
}
for feature in features:
extract_node = node.xpath(feature)
if len(extract_node) > 0:
valid_node = extract_node[0]
break
else:
valid_node = node
html = page_source(valid_node)
'''检查原始页面内容'''
source_url = re.search('.*点击查看内容', html)
if source_url:
self.save_url.insert_one({
"site": "元博网",
"title": item['title'],
"source_url": source_url.group(1),
"comeintime": int2long(int(time.time()))
})
CheckText(html)
item["contenthtml"] = html
special = {
'若附件无法下载,你可以尝试使用360极速浏览器进行下载!': '',
# 'DD000E;|EE000F;|FF000E;': '',
'[(]?[)]?[A-Z]{2}000[A-Z]{1};[(]?[\d{1,4}]*[;]?[)]?[;]?': '',
}
item["detail"] = cleaner(html, special)
item["comeintime"] = int2long(int(time.time()))
'''检查清洗之后的详情'''
source_url = re.search('.*点击查看内容', item["detail"])
if source_url:
self.save_url.insert_one({
"site": "元博网",
"title": item['title'],
"source_url": source_url.group(1),
"comeintime": int2long(int(time.time()))
})
CheckText(item["detail"])
insert = {}
for key, val in item.items():
if key not in ['crawl_status', 'crawl', 'count', '_id']:
insert[key] = val
self.save_tab.insert_one(insert)
logger.info('[采集成功]{}-{}'.format(item['title'], item['publishtime']))
def crawl_spider(self, sc: Scheduler):
while True:
next_task_interval = None
logger.info(f"[count:]{str(sc.count)}")
if sc.count >= sc.total:
return True
item = sc.crawl_task
if len(item) == 0:
return False
self._lock_task(item)
# 记录采集异常的爬虫代码与来源
sc.spider_code = item['spidercode']
sc.crawl_url = item['competehref']
try:
# 检查请求采集任务
CheckTask(item)
response = self.crawl_request(item)
if response is not None:
self.crawl_response(response, item)
self._update_crawl_task(item["_id"], crawl_status='finished')
sc.crawl_counter(1)
next_task_interval = random.choice(range(3,9))
except (YbwCrawlError, Exception) as e:
if getattr(e, 'code', None) is None:
err = YbwCrawlError(unknown_err=e)
sc.err_record(err)
elif e.code == 10105:
# 抛出异常时,将es查询统计结果进行更新
self._update_crawl_task(item["_id"], count=item['count'])
logger.info('[重复数据]{}-{}'.format(item['title'], item['publishtime']))
else:
sc.err_record(e)
self._update_crawl_task(item["_id"], crawl_status='error')
logger.info('[问题数据]{}-{}'.format(item['title'], item['publishtime']))
sc.crawl_counter(0)
next_task_interval = 0.1
finally:
self._release_task(item)
sc.wait_for_next_task(next_task_interval)
def start(self):
while True:
with Scheduler(site='元博网', crawl_type='detail') as scheduler:
if scheduler.crawl_start:
self.user = scheduler.user
self.crawl_spider(scheduler)
scheduler.finished(10)
if __name__ == '__main__':
DetailSpider(
db='py_spider',
crawl_tab='ybw_list',
save_tab='data_bak',
).start()