Sfoglia il codice sorgente

添加账号使用记录表,原账号表仅做账号池提供账号

dongzhaorui@topnet.net.cn 3 anni fa
parent
commit
fa027d0ea5
1 ha cambiato i file con 117 aggiunte e 81 eliminazioni
  1. 117 81
      ybw/crawler/crawl_scheduler.py

+ 117 - 81
ybw/crawler/crawl_scheduler.py

@@ -15,46 +15,31 @@ from utils.tools import get_host_ip
 
 class Scheduler:
 
-    def __init__(self, query: dict):
-        self.query = query
-        self.account_tab = mongo_table('py_spider', 'match_account')
-        self.error_tab = mongo_table('py_spider', 'crawl_error')
+    def __init__(self, site, crawl_type, **kwargs):
+        self.site = site
+        self.crawl_type = crawl_type
         self.crawl_start = False  # 控制调度的状态
         self.count = None  # 日采集数量
         self.total = None  # 日采集上限
-        self.account_id = None
-        self.user = None
+        self.account_id = None  # 账号id
+        self.record_id = None  # 日采集记录id
+        self.user = None    # 账户
         self.spider_code = None
         self.crawl_url = None
         self.crawl_params = None
         self.crawl_exception = None
-        self.crawl_type = None
-
-    @property
-    def today(self):
-        return datetime.datetime.today().strftime('%Y-%m-%d')
+        self.kwargs = kwargs
 
-    @property
-    def yesterday(self):
-        return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d")
+        self.account_tab = mongo_table('py_spider', 'match_account')
+        self.record_tab = mongo_table('py_spider', 'match_account_record')
+        self.error_tab = mongo_table('py_spider', 'crawl_error')
 
-    @property
-    def crawl_task(self):
-        results = {}
-        url = 'http://cc.spdata.jianyu360.com/schedule/crawl_task/ybw_scheduler'
-        # url = 'http://127.0.0.1:1405/schedule/crawl_task/ybw_scheduler'
-        try:
-            response = requests.get(url, timeout=10)
-            if response.status_code == 200:
-                data = response.json()['data']
-                if len(data) > 0:
-                    results['_id'] = object_id(data['_id'])
-                    for key, val in data.items():
-                        if key != '_id':
-                            results[key] = val
-            return results
-        except requests.RequestException:
-            return results
+    def crawl_counter(self, number: int):
+        """采集计数器"""
+        records = self.record_tab.find_one({'_id': self.record_id})
+        records['count'] += number
+        self.count = records['count']
+        self._update_tab(self.record_tab, self.record_id, records)
 
     def finished(self, execute_next_time=None):
         logger.info("任务结束")
@@ -74,6 +59,47 @@ class Scheduler:
         }
         self.error_tab.insert_one(rows)
 
+    def _release_account(self):
+        if self.crawl_type == 'detail':
+            rows = dict(crawl_detail=False,)
+        else:
+            rows = dict(crawl_list=False,)
+        if self.account_id is not None:
+            self._update_tab(self.account_tab, self.account_id, rows)
+
+    def _update_tab(self, mgo_coll, _id, item):
+        """
+        更新mongo表
+
+        :param mgo_coll: mongo表
+        :param _id: mongo_id
+        :param item: 数据
+        """
+        item['update_time'] = self.current_time
+        mgo_coll.update_one({'_id': _id}, {'$set': item})
+
+    def __enter__(self):
+        logger.info(f'[开启调度]')
+        '''获取闲置账号'''
+        if self.account is not None:
+            self.account_id = self.account['_id']
+            self.user = User(self.account['account'], self.account['password'])
+            logger.info(f'[启用账号]{self.user.phone}')
+            if self.crawl_type == 'detail':
+                item = {'crawl_detail': True}
+                '''初始化记录表'''
+                records = self.account_records
+                self.total = records['total']
+                self.count = records['count']
+            else:
+                item = {'crawl_list': True}
+            '''初始化采集账号记录'''
+            self._update_tab(self.account_tab, self.account_id, item)
+            self.crawl_start = True
+        else:
+            logger.warning(f'[{self.site}]暂无闲置账号')
+        return self
+
     @staticmethod
     def wait_for_next_task(wait_time=None):
         _sleep = (wait_time or random.choice(range(5, 11)))
@@ -84,61 +110,71 @@ class Scheduler:
         sleep_time = (wait_time or 600)
         time.sleep(sleep_time)
 
-    def _release_account(self):
-        rows = dict(
-            used=False,
-            update_time=datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
+    @property
+    def account_records(self):
+        """账号使用记录"""
+        query = dict(
+            account=self.account['account'],
+            date=self.today,
+            type=self.crawl_type,
+            site=self.site
         )
-        if self.account_id is not None:
-            self._update_data(rows)
+        item = self.record_tab.find_one(query)
+        if item is None:
+            item = dict(
+                site=self.site,
+                account=self.account['account'],
+                type=self.crawl_type,
+                total=self.account['total'],
+                count=0,
+                ip=get_host_ip(),
+                date=self.today,
+                update_time=self.current_time,
+            )
+            result = self.record_tab.insert_one(item)
+            item['_id'] = result.inserted_id
+        self.record_id = item['_id']
+        return item
 
-    def _update_data(self, item):
-        """
-        更新账号所属的采集数据信息
+    @property
+    def account(self):
+        """账号"""
+        query = dict(site=self.site)
+        if self.crawl_type == 'detail':
+            query['crawl_detail'] = False
+        else:
+            query['crawl_list'] = False
+        return self.account_tab.find_one(query, sort=[('update_time', 1)])
 
-        :param item: 最新数据
-        """
-        item['ip'] = get_host_ip()
-        item['update_time'] = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
-        self.account_tab.update_one(
-            {'_id': self.account_id},
-            {'$set': item}
-        )
+    @property
+    def crawl_task(self):
+        results = {}
+        url = 'http://cc.spdata.jianyu360.com/schedule/crawl_task/ybw_scheduler'
+        # url = 'http://127.0.0.1:1405/schedule/crawl_task/ybw_scheduler'
+        try:
+            response = requests.get(url, timeout=10)
+            if response.status_code == 200:
+                data = response.json()['data']
+                if len(data) > 0:
+                    results['_id'] = object_id(data['_id'])
+                    for key, val in data.items():
+                        if key != '_id':
+                            results[key] = val
+            return results
+        except requests.RequestException:
+            return results
 
-    def update_count(self, number):
-        rows = self.account_tab.find_one({'_id': self.account_id})
-        records = rows.get('records', {self.today: 0})
-        '''采集记录历史保存7天'''
-        count = records.get(self.today, 0)
-        count += number
-        self.count = count
-        if len(records) > 7:
-            records.clear()
-            records.setdefault(self.today, count)
-        else:
-            records.update({self.today: count})
-        rows.update({'records': records})
-        self._update_data(rows)
+    @property
+    def today(self):
+        return datetime.datetime.today().strftime('%Y-%m-%d')
 
-    def __enter__(self):
-        logger.info(f'[开启调度]')
-        '''获取一个闲置时间较久的账号'''
-        rows = self.account_tab.find_one(self.query, sort=[('update_time', 1)])
-        if rows is not None:
-            self.account_id = rows['_id']
-            self.user = User(rows['account'], rows['password'])
-            logger.info(f'[启用账号]{self.user.phone}')
-            rows['used'] = True
-            '''初始化采集数据记录'''
-            records = rows.get('records', {self.today: 0})
-            rows.update({'records': records})
-            self._update_data(rows)
-            self.total = rows['total']
-            self.count = records.get(self.today, 0)
-            self.crawl_start = True
-        else:
-            logger.warning(f'[{self.query.get("site")}采集]暂无闲置账号')
-        return self
+    @property
+    def current_time(self):
+        return datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
+
+    @property
+    def yesterday(self):
+        return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d")
 
     def __exit__(self, exc_type, exc_val, exc_tb):
         logger.info(f'[关闭调度]')