|
@@ -17,7 +17,7 @@ import feapder.setting as setting
|
|
|
import feapder.utils.tools as tools
|
|
|
from feapder.buffer.item_buffer import ItemBuffer
|
|
|
from feapder.db.memory_db import MemoryDB
|
|
|
-from feapder.network.item import Item, BaseListItem
|
|
|
+from feapder.network.item import Item
|
|
|
from feapder.network.request import Request
|
|
|
from feapder.utils import metrics
|
|
|
from feapder.utils.log import log
|
|
@@ -35,14 +35,14 @@ class PaserControl(threading.Thread):
|
|
|
_success_task_count = 0
|
|
|
_failed_task_count = 0
|
|
|
|
|
|
- def __init__(self, collector, redis_key, request_buffer, item_buffer, task_buffer):
|
|
|
+ def __init__(self, collector, redis_key, request_buffer, item_buffer):
|
|
|
super(PaserControl, self).__init__()
|
|
|
self._parsers = []
|
|
|
self._collector = collector
|
|
|
self._redis_key = redis_key
|
|
|
self._request_buffer = request_buffer
|
|
|
self._item_buffer = item_buffer
|
|
|
- self._task_buffer = task_buffer
|
|
|
+
|
|
|
self._thread_stop = False
|
|
|
|
|
|
def run(self):
|
|
@@ -203,21 +203,13 @@ class PaserControl(threading.Thread):
|
|
|
elif isinstance(result, Item):
|
|
|
result_type = 2
|
|
|
|
|
|
+ result.is_mixed = parser.is_mix # 添加属性 - 混合采集
|
|
|
counter['extractQuantity'] += 1 # 统计抽取列表数
|
|
|
if not self.is_duplicate(result):
|
|
|
counter['realQuantity'] += 1 # 统计实际列表数
|
|
|
|
|
|
- # 添加属性 - 待采集任务队列名称(仅对采集列表生效,便以发布任务)
|
|
|
- if isinstance(result, BaseListItem):
|
|
|
- result.queue_name = self._task_buffer._tab_items
|
|
|
-
|
|
|
- # 添加属性 - 混合采集
|
|
|
- result.is_mixed = parser.is_mix
|
|
|
-
|
|
|
# 将item入库(异步)
|
|
|
self._item_buffer.put_item(result)
|
|
|
- # 推送任务到待采集队列(异步)
|
|
|
- self._task_buffer.put_task(result)
|
|
|
|
|
|
elif callable(result): # result为可执行的无参函数
|
|
|
if result_type == 2: # item 的 callback,buffer里的item均入库后再执行
|