|
@@ -77,13 +77,15 @@ class PaserControl(threading.Thread):
|
|
del_request_redis_after_item_to_db = False
|
|
del_request_redis_after_item_to_db = False
|
|
del_request_redis_after_request_to_db = False
|
|
del_request_redis_after_request_to_db = False
|
|
|
|
|
|
- is_sent_heartbeat = False # 心跳标识
|
|
|
|
- heartbeat_lst = [] # 心跳表
|
|
|
|
|
|
+ is_sent_heartbeat = False # 发送心跳的条件
|
|
|
|
+ heartbeat_lst = [] # 待推送的心跳信息列表
|
|
|
|
|
|
for parser in self._parsers:
|
|
for parser in self._parsers:
|
|
|
|
|
|
- now_page = getattr(request, 'page', -1) # 当前访问页码
|
|
|
|
- total, rel_count = 0, 0 # 入库总数量,实际入库量
|
|
|
|
|
|
+ now_page = getattr(request, "page", -1) # 当前访问页码
|
|
|
|
+ extract_count = 0 # 列表抽取总数量
|
|
|
|
+ task_count = 0 # 详情任务总数量
|
|
|
|
+ rel_count = 0 # 实际入库量
|
|
|
|
|
|
if parser.name == request.parser_name:
|
|
if parser.name == request.parser_name:
|
|
used_download_midware_enable = False
|
|
used_download_midware_enable = False
|
|
@@ -155,6 +157,9 @@ class PaserControl(threading.Thread):
|
|
)
|
|
)
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+ # 详情任务数量计数
|
|
|
|
+ task_count += 1
|
|
|
|
+
|
|
if response == None:
|
|
if response == None:
|
|
raise Exception(
|
|
raise Exception(
|
|
"连接超时 url: %s" % (request.url or request_temp.url)
|
|
"连接超时 url: %s" % (request.url or request_temp.url)
|
|
@@ -206,17 +211,14 @@ class PaserControl(threading.Thread):
|
|
del_request_redis_after_request_to_db = True
|
|
del_request_redis_after_request_to_db = True
|
|
|
|
|
|
elif isinstance(result, Item):
|
|
elif isinstance(result, Item):
|
|
- total += 1
|
|
|
|
- if setting.ITEM_FILTER_ENABLE:
|
|
|
|
- if not self._item_buffer.__class__.dedup.get(result.fingerprint):
|
|
|
|
- request.rel_count += 1
|
|
|
|
- rel_count += 1
|
|
|
|
- else:
|
|
|
|
- request.rel_count += 1
|
|
|
|
- rel_count += 1
|
|
|
|
-
|
|
|
|
result_type = 2
|
|
result_type = 2
|
|
|
|
|
|
|
|
+ # 实际入库数量计数
|
|
|
|
+ if not self.is_filter(result):
|
|
|
|
+ rel_count += 1
|
|
|
|
+ # 实际入库数量(无限翻页控制条件)
|
|
|
|
+ request.rel_count = rel_count
|
|
|
|
+
|
|
# 将item入库(异步)
|
|
# 将item入库(异步)
|
|
self._item_buffer.put_item(result)
|
|
self._item_buffer.put_item(result)
|
|
# 需删除正在做的request
|
|
# 需删除正在做的request
|
|
@@ -246,7 +248,12 @@ class PaserControl(threading.Thread):
|
|
f"{function_name} result expect Request、Item or callback, but get type: {type(result)}"
|
|
f"{function_name} result expect Request、Item or callback, but get type: {type(result)}"
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+ # 发送心跳的条件
|
|
is_sent_heartbeat = True
|
|
is_sent_heartbeat = True
|
|
|
|
+ # 抽取列表数量计数
|
|
|
|
+ if hasattr(parser, "_extract_count"):
|
|
|
|
+ extract_count += parser.get_extract_count()
|
|
|
|
+ parser.reset_extract_count()
|
|
|
|
|
|
except (Exception, BaseException) as e:
|
|
except (Exception, BaseException) as e:
|
|
exception_type = (
|
|
exception_type = (
|
|
@@ -427,7 +434,8 @@ class PaserControl(threading.Thread):
|
|
heartbeat_lst.append(dict(
|
|
heartbeat_lst.append(dict(
|
|
parser=parser,
|
|
parser=parser,
|
|
now_page=now_page,
|
|
now_page=now_page,
|
|
- total=total,
|
|
|
|
|
|
+ task_count=task_count,
|
|
|
|
+ extract_count=extract_count,
|
|
rel_count=rel_count,
|
|
rel_count=rel_count,
|
|
request=request,
|
|
request=request,
|
|
response=response,
|
|
response=response,
|
|
@@ -494,7 +502,8 @@ class PaserControl(threading.Thread):
|
|
"""爬虫心跳"""
|
|
"""爬虫心跳"""
|
|
parser = kwargs["parser"]
|
|
parser = kwargs["parser"]
|
|
now_page = kwargs["now_page"]
|
|
now_page = kwargs["now_page"]
|
|
- total = kwargs["total"]
|
|
|
|
|
|
+ extract_count = kwargs["extract_count"]
|
|
|
|
+ task_count = kwargs["task_count"]
|
|
rel_count = kwargs["rel_count"]
|
|
rel_count = kwargs["rel_count"]
|
|
filepath = kwargs["filepath"]
|
|
filepath = kwargs["filepath"]
|
|
status_code = getattr(response, "status_code", -1)
|
|
status_code = getattr(response, "status_code", -1)
|
|
@@ -503,7 +512,7 @@ class PaserControl(threading.Thread):
|
|
site = (item.get("site") if isinstance(item, dict) else getattr(item, "site", None)) or getattr(parser, "site", None) or "unknown"
|
|
site = (item.get("site") if isinstance(item, dict) else getattr(item, "site", None)) or getattr(parser, "site", None) or "unknown"
|
|
channel = (item.get("channel") if isinstance(item, dict) else getattr(item, "channel", None)) or getattr(parser, "channel", None) or "unknown"
|
|
channel = (item.get("channel") if isinstance(item, dict) else getattr(item, "channel", None)) or getattr(parser, "channel", None) or "unknown"
|
|
code = (item.get("code") or item.get("spidercode")) if isinstance(item, dict) else getattr(item, "spidercode", "unknown")
|
|
code = (item.get("code") or item.get("spidercode")) if isinstance(item, dict) else getattr(item, "spidercode", "unknown")
|
|
- business_type = parser.__business_type__ # 爬虫业务类型
|
|
|
|
|
|
+ business_type: str = parser.__business_type__ # 爬虫业务类型
|
|
run_time = tools.get_current_date(date_format="%Y-%m-%d") # 运行时间,单位:天
|
|
run_time = tools.get_current_date(date_format="%Y-%m-%d") # 运行时间,单位:天
|
|
spider_id = tools.get_md5(code + business_type + run_time)
|
|
spider_id = tools.get_md5(code + business_type + run_time)
|
|
heartbeat_content = dict(
|
|
heartbeat_content = dict(
|
|
@@ -521,37 +530,45 @@ class PaserControl(threading.Thread):
|
|
create_at=tools.ensure_int64(tools.get_current_timestamp()), # 执行时间, 单位:秒
|
|
create_at=tools.ensure_int64(tools.get_current_timestamp()), # 执行时间, 单位:秒
|
|
)
|
|
)
|
|
|
|
|
|
- feature = {}
|
|
|
|
- if not getattr(request, "error_msg", None): # 正常心跳
|
|
|
|
- if parser.__business_type__.count("List") > 0:
|
|
|
|
|
|
+ if hasattr(request, "error_msg") and status_code != 200:
|
|
|
|
+ error = getattr(request, "error_msg")
|
|
|
|
+ feature = dict(
|
|
|
|
+ err_type=str(error.split(": ")[0]),
|
|
|
|
+ err_msg=getattr(request, "error_msg"),
|
|
|
|
+ )
|
|
|
|
+ feature.setdefault("request_success", False)
|
|
|
|
+ if business_type.endswith("List"):
|
|
|
|
+ feature.update(dict(nowpage=now_page, ))
|
|
|
|
+ else:
|
|
|
|
+ feature.update(dict(count=task_count, ))
|
|
|
|
+ else:
|
|
|
|
+ if business_type.endswith("List"):
|
|
# 列表页
|
|
# 列表页
|
|
list_feature = dict(
|
|
list_feature = dict(
|
|
- nowpage=now_page,
|
|
|
|
- count=getattr(request, "total_count", total), # 入库总数 (入库总数 = 原始数量 - 过滤数量)
|
|
|
|
|
|
+ nowpage=now_page, # 当前页码
|
|
|
|
+ count=extract_count, # 列表提取总数
|
|
rel_count=rel_count, # 实际入库总数
|
|
rel_count=rel_count, # 实际入库总数
|
|
)
|
|
)
|
|
feature = list_feature
|
|
feature = list_feature
|
|
- elif parser.__business_type__.count("Detail") > 0:
|
|
|
|
|
|
+ else:
|
|
# 详情页
|
|
# 详情页
|
|
detail_feature = dict(
|
|
detail_feature = dict(
|
|
- count=total,
|
|
|
|
- rel_count=rel_count,
|
|
|
|
|
|
+ count=task_count, # 发起请求的总数
|
|
|
|
+ rel_count=rel_count, # 实际入库总数
|
|
)
|
|
)
|
|
feature = detail_feature
|
|
feature = detail_feature
|
|
feature.setdefault("request_success", True)
|
|
feature.setdefault("request_success", True)
|
|
- else:
|
|
|
|
- error = getattr(request, "error_msg")
|
|
|
|
- feature = dict(
|
|
|
|
- err_type=str(error.split(": ")[0]),
|
|
|
|
- err_msg=getattr(request, "error_msg"),
|
|
|
|
- )
|
|
|
|
- feature.setdefault("request_success", False)
|
|
|
|
- if parser.__business_type__.count("List") > 0:
|
|
|
|
- feature.update(dict(nowpage=now_page, ))
|
|
|
|
|
|
|
|
heartbeat_content.update(feature)
|
|
heartbeat_content.update(feature)
|
|
return self.sent_heartbeat(heartbeat_content)
|
|
return self.sent_heartbeat(heartbeat_content)
|
|
|
|
|
|
|
|
+ def is_filter(self, item):
|
|
|
|
+ """item入库前是否会被过滤"""
|
|
|
|
+ if setting.ITEM_FILTER_ENABLE:
|
|
|
|
+ if self._item_buffer.__class__.dedup.get(item.fingerprint):
|
|
|
|
+ return True
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
|
|
class AirSpiderParserControl(PaserControl):
|
|
class AirSpiderParserControl(PaserControl):
|
|
is_show_tip = False
|
|
is_show_tip = False
|