|
@@ -78,6 +78,17 @@ class DetailSpider:
|
|
self.save_tab = mongo_table(db, save_tab)
|
|
self.save_tab = mongo_table(db, save_tab)
|
|
self.user = None
|
|
self.user = None
|
|
|
|
|
|
|
|
+ def _update_crawl_task(self, tid, **kwargs):
|
|
|
|
+ self.crawl_tab.update_one({'_id': tid}, {'$set': kwargs})
|
|
|
|
+
|
|
|
|
+ def _lock_task(self, task: dict):
|
|
|
|
+ update = {'crawl': True}
|
|
|
|
+ self._update_crawl_task(task['_id'], **update)
|
|
|
|
+
|
|
|
|
+ def _release_task(self, task: dict):
|
|
|
|
+ update = {'crawl': False}
|
|
|
|
+ self._update_crawl_task(task['_id'], **update)
|
|
|
|
+
|
|
def crawl_request(self, url):
|
|
def crawl_request(self, url):
|
|
headers = {
|
|
headers = {
|
|
'Host': 'www.chinabidding.cn',
|
|
'Host': 'www.chinabidding.cn',
|
|
@@ -129,7 +140,7 @@ class DetailSpider:
|
|
element = fromstring(r.text)
|
|
element = fromstring(r.text)
|
|
nodes = element.xpath('//*[@id="main_dom"]/div[1]')
|
|
nodes = element.xpath('//*[@id="main_dom"]/div[1]')
|
|
if len(nodes) != 1:
|
|
if len(nodes) != 1:
|
|
- raise CrawlError
|
|
|
|
|
|
+ raise CrawlError(code=10021, reason=f'"main_dom"属性匹配个数:{len(nodes)}')
|
|
else:
|
|
else:
|
|
node = nodes[0]
|
|
node = nodes[0]
|
|
logger.info(f'[采集正文] id={node.attrib.get("id")}')
|
|
logger.info(f'[采集正文] id={node.attrib.get("id")}')
|
|
@@ -178,54 +189,41 @@ class DetailSpider:
|
|
pass
|
|
pass
|
|
logger.info('[采集成功]{}-{}'.format(item['title'], item['publishtime']))
|
|
logger.info('[采集成功]{}-{}'.format(item['title'], item['publishtime']))
|
|
|
|
|
|
- def update_crawl_status(self, item: dict, status: bool):
|
|
|
|
- self.crawl_tab.update_one(
|
|
|
|
- {'_id': item['_id']},
|
|
|
|
- {'$set': {'crawl': status}}
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
def crawl_spider(self, sc: Scheduler):
|
|
def crawl_spider(self, sc: Scheduler):
|
|
while True:
|
|
while True:
|
|
|
|
+ next_task_interval = None
|
|
if sc.count >= sc.total:
|
|
if sc.count >= sc.total:
|
|
return True
|
|
return True
|
|
item = sc.crawl_task
|
|
item = sc.crawl_task
|
|
if len(item) == 0:
|
|
if len(item) == 0:
|
|
return False
|
|
return False
|
|
- self.update_crawl_status(item, True)
|
|
|
|
- '''使用调度器记录采集内容,出现错误时错误写入数据库'''
|
|
|
|
|
|
+ self._lock_task(item)
|
|
|
|
+ # 记录采集异常的爬虫代码与来源
|
|
sc.spider_code = item['spidercode']
|
|
sc.spider_code = item['spidercode']
|
|
sc.crawl_url = item['competehref']
|
|
sc.crawl_url = item['competehref']
|
|
try:
|
|
try:
|
|
- '''检查请求采集任务'''
|
|
|
|
|
|
+ # 检查请求采集任务
|
|
CheckTask(item)
|
|
CheckTask(item)
|
|
url = item['competehref']
|
|
url = item['competehref']
|
|
response = self.crawl_request(url)
|
|
response = self.crawl_request(url)
|
|
if response is not None:
|
|
if response is not None:
|
|
self.crawl_response(response, item)
|
|
self.crawl_response(response, item)
|
|
- self.crawl_tab.update_one(
|
|
|
|
- {"_id": item["_id"]},
|
|
|
|
- {'$set': {'crawl_status': 'finished'}}
|
|
|
|
- )
|
|
|
|
|
|
+ self._update_crawl_task(item["_id"], crawl_status='finished')
|
|
sc.crawl_counter(1)
|
|
sc.crawl_counter(1)
|
|
except YbwCrawlError as e:
|
|
except YbwCrawlError as e:
|
|
if e.code == 10105:
|
|
if e.code == 10105:
|
|
- '''检查出该异常时,程序会将es查询结果更新采集表'''
|
|
|
|
- self.crawl_tab.update_one(
|
|
|
|
- {"_id": item["_id"]},
|
|
|
|
- {'$set': {'count': item['count']}}
|
|
|
|
- )
|
|
|
|
|
|
+ # 抛出异常时,将es查询统计结果进行更新
|
|
|
|
+ self._update_crawl_task(item["_id"], count=item['count'])
|
|
logger.info('[重复数据]{}-{}'.format(item['title'], item['publishtime']))
|
|
logger.info('[重复数据]{}-{}'.format(item['title'], item['publishtime']))
|
|
else:
|
|
else:
|
|
sc.err_record(e)
|
|
sc.err_record(e)
|
|
- self.crawl_tab.update_one(
|
|
|
|
- {"_id": item["_id"]},
|
|
|
|
- {'$set': {'crawl_status': 'error'}}
|
|
|
|
- )
|
|
|
|
|
|
+ self._update_crawl_task(item["_id"], crawl_status='error')
|
|
logger.info('[问题数据]{}-{}'.format(item['title'], item['publishtime']))
|
|
logger.info('[问题数据]{}-{}'.format(item['title'], item['publishtime']))
|
|
sc.crawl_counter(0)
|
|
sc.crawl_counter(0)
|
|
|
|
+ next_task_interval = 0.1
|
|
finally:
|
|
finally:
|
|
- self.update_crawl_status(item, False)
|
|
|
|
- sc.wait_for_next_task()
|
|
|
|
|
|
+ self._release_task(item)
|
|
|
|
+ sc.wait_for_next_task(next_task_interval)
|
|
|
|
|
|
def start(self):
|
|
def start(self):
|
|
while True:
|
|
while True:
|