Browse Source

多线程账号池采集脚本

dongzhaorui 1 năm trước cách đây
mục cha
commit
8f57badaaf
1 tập tin đã thay đổi với 92 bổ sung0 xóa
  1. 92 0
      qlm/source_qianlima_mt.py

+ 92 - 0
qlm/source_qianlima_mt.py

@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+import random
+import time
+from concurrent.futures import Future, ThreadPoolExecutor, wait
+
+from loguru import logger
+
+from source_qianlima import (
+    core as reqeust,
+    disrupt_account_pool
+)
+from utils.config_parms import area_dict, city_dict, province_dict, channel_dict
+from utils.tools import get_today_of_day
+
+
+def spider(account):
+    date = get_today_of_day(-1)
+    follow = account['follow']
+    phone = account['phone']
+
+    try:
+        for category, category_name in channel_dict.items():
+            for area_id in follow:
+                cities = area_dict[area_id]
+                for city in cities:
+                    logger.info(' && '.join([
+                        phone,
+                        date,
+                        category_name,
+                        province_dict[area_id],
+                        city_dict[city]
+                    ]))
+
+                    if len(cities) == 1:
+                        city = area_id
+
+                    reqeust(date, category, city, account, page_size=100)
+
+    except Exception as e:
+        raise e
+
+
+def show_exception(f: Future):
+    error = f.exception()
+    if error:
+        logger.exception(f'工作线程运行错误:{error}')
+
+
+def delay_bar():
+    try:
+        interval = random.randint(300, 900)
+        ts = time.time()
+        start_dt = time.strftime('%Y-%m-%d %H:%M:%S',
+                                 time.localtime(ts + interval))
+        i = 0
+        logger.info(f'采集开始时间:{start_dt}')
+        while time.time() - ts < interval:
+            dots = '.' * i
+            print(f'\r请稍候{dots}', end='', flush=True)
+            time.sleep(.5)
+            if i >= 3:
+                i = 0
+            else:
+                i += 1
+
+    finally:
+        print('')
+
+
+def start():
+    try:
+        delay_bar()
+        logger.info('+++ 采集开始 +++')
+        account_pool = disrupt_account_pool()
+        with ThreadPoolExecutor(max_workers=5) as pool:
+            fs = []
+            for account in account_pool:
+                f = pool.submit(spider, account)
+                f.add_done_callback(show_exception)
+
+            wait(fs)
+
+    except KeyboardInterrupt:
+        pass
+
+    finally:
+        print('')
+        logger.info('+++ 采集结束 +++')
+
+
+if __name__ == '__main__':
+    start()