Prechádzať zdrojové kódy

update:添加日志标识;

dongzhaorui 2 rokov pred
rodič
commit
b39c242700

+ 8 - 3
A数据处理/sync_data/README.md

@@ -1,3 +1,8 @@
-#### 数据同步
-    采集数据先推送RedisDB临时存放,然后从RedisDB通过同步服务间隔5分钟推送至爬虫MongoDB生产库,
-    数据同步脚本由crontab调用
+#### 数据服务
+    脚本由crontab服务进行定时运行
+
+#### 脚本用途
+    1、back_fill_task.py -> 错误任务回填采集队列
+    2、monitor_summary.py -> 监控信息汇总
+    3、send_data.py -> feapder爬虫采集数据同步爬虫库(爬虫将数据推送RedisDB存放,同步服务从RedisDB间隔5分钟推送至爬虫MongoDB生产库)
+    4、summary_data.py -> 心跳数据统计汇总

+ 72 - 0
A数据处理/sync_data/backfill_task.py

@@ -0,0 +1,72 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-04-07
+---------
+@summary:  采集任务回填
+---------
+@author: dzr
+"""
+import time
+
+from bson.int64 import Int64
+from pymongo import MongoClient
+from pymongo.errors import DuplicateKeyError
+
+from log import logger
+
+# MONGO_HOST = "172.17.4.87"
+# MONGO_PORT = 27080
+
+MONGO_HOST = "127.0.0.1"
+MONGO_PORT = 27001
+client = MongoClient(MONGO_HOST, MONGO_PORT)
+
+
+MONGO_DB = "py_spider"
+mongodb = client[MONGO_DB]
+
+# 爬虫数据表
+listdata_err = mongodb["listdata_err"]
+
+
+def backfill_data():
+    """回填重新采集的任务,并给出错误重试次数"""
+    count = 0  # 计数器
+    # 查询5天内范围内的错误重采次数小于20的错误任务
+    start_at = int(time.time()) - 432000
+    end_at = int(time.time())
+    q = {
+        "failed_times": {"$lt": 20},
+        "create_at": {"$gte": start_at, "$lt": end_at}
+    }
+    cursor = listdata_err.find(q, no_cursor_timeout=True)
+    try:
+        for doc in cursor:
+            count += 1
+            if count % 100 == 0:
+                logger.info(f"[BackFill]放入采集队列{count}条任务")
+
+            coll_name = doc["origin"]
+            document = doc["origin_data"]
+            document["failed_times"] = doc["failed_times"]
+            # 清除错误记录字段
+            for field in ["save", "pri", "failed", "error", "code"]:
+                document.pop(field, "")
+
+            if "comeintime" in document:
+                document["comeintime"] = Int64(time.time())
+
+            try:
+                mongodb[coll_name].insert_one(document)
+                listdata_err.delete_one({"_id": doc["_id"]})
+            except DuplicateKeyError as e:
+                logger.warning(f"[BackFill]重复键错误:{e.details.get('errmsg')}")
+
+        logger.info(f"[BackFill]共计放入采集队列{count}条任务")
+    finally:
+        client.close()
+        logger.info("[BackFill]采集任务回填结束")
+
+
+if __name__ == '__main__':
+    backfill_data()

+ 1 - 1
A数据处理/sync_data/log.py

@@ -3,7 +3,7 @@ from pathlib import Path
 from loguru import logger
 
 _absolute = Path(__file__).absolute().parent
-_log_path = (_absolute / 'logs/sync_{time:YYYY-MM-DD}.log').resolve()
+_log_path = (_absolute / 'logs/log_{time:YYYY-MM-DD}.log').resolve()
 logger.add(
     _log_path,
     format='{time:YYYY-MM-DD HH:mm:ss} - {level} - {message}',

+ 38 - 28
A数据处理/sync_data/send_data.py

@@ -8,6 +8,7 @@ Created on 2023-02-21
 """
 
 import ast
+import re
 import time
 from concurrent.futures import ThreadPoolExecutor, wait
 from typing import Dict
@@ -24,13 +25,6 @@ from redis.exceptions import DataError
 
 from log import logger
 
-# mongo
-MONGO_HOST = "172.17.4.87"
-MONGO_PORT = 27080
-MONGO_DB = "py_spider"
-mcli = MongoClient(MONGO_HOST, MONGO_PORT)
-mongodb = mcli[MONGO_DB]
-
 
 # redis
 class Encoder(RedisEncoder):
@@ -79,10 +73,17 @@ pool = redis.ConnectionPool(
 )
 rcli = redis.StrictRedis(connection_pool=pool, decode_responses=True)
 
+# mongo
+MONGO_HOST = "172.17.4.87"
+MONGO_PORT = 27080
+MONGO_DB = "py_spider"
+mcli = MongoClient(MONGO_HOST, MONGO_PORT)
+mongodb = mcli[MONGO_DB]
+
 # es
-ES_HOST = '172.17.145.178'
+ES_HOST = "172.17.145.178"
 ES_PORT = 9800
-ES_INDEX = 'biddingall'
+ES_INDEX = "biddingall"
 ecli = Elasticsearch([{"host": ES_HOST, "port": ES_PORT}])
 
 # 延时间隔
@@ -93,7 +94,7 @@ def literal_eval(node_or_string):
     try:
         return ast.literal_eval(node_or_string)
     except ValueError as e:
-        if 'malformed node or string' in e.args[0]:
+        if "malformed node or string" in e.args[0]:
             from bson import Code, ObjectId  # eval变量作用域,ObjectId参数
             return eval(node_or_string)
         else:
@@ -138,8 +139,7 @@ def es_query(title, publish_time):
         }
     }
     result = ecli.search(body=query, index=ES_INDEX, request_timeout=100)
-    # print(result['hits']['total'])
-    total = int(result['hits']['total'])
+    total = int(result["hits"]["total"])
     return total
 
 
@@ -161,34 +161,42 @@ def rpush(name, values, is_redis_cluster=False):
         return rcli.rpush(name, values)
 
 
+def handle_big_document(item):
+    if "contenthtml" in item:
+        item["contenthtml"] = re.sub("<img[^>]*>", "<br>", item["contenthtml"])
+
+
 def insert_one(table, item: Dict):
     """MongoDB 单条入库"""
     if item is not None:
-        item.pop('_id', '')
+        item.pop("_id", "")
         if item.get("comeintime"):
-            item['comeintime'] = int64.Int64(item['comeintime'])
+            item["comeintime"] = int64.Int64(item["comeintime"])
         try:
-            title = item.get('title')
+            title = item.get("title")
             result = mongodb[table].insert_one(item)
-            logger.info(f'{table}-{str(result.inserted_id)}-{title}--上传成功')
+            logger.info(f"[Send]{table}-{str(result.inserted_id)}-{title}--上传成功")
         except Exception as e:
+            if "BSON document too large" in ''.join(e.args):
+                handle_big_document(item)  # MongoDB文档保存要求 BSON 大小限制 16 MB
+
             rpush(get_redis_key(table), item)
-            logger.error(table + f"--推送失败,原因:{''.join(e.args)}")
+            logger.error("[Send]" + table + f"--推送失败,原因:{''.join(e.args)}")
 
 
 def sync_data(table):
     redis_key = get_redis_key(table)
     total = rcli.llen(redis_key)
-    logger.info(f"同步表名:{table},推送总数:{total}")
+    logger.info(f"[Send]同步表名:{table},推送总数:{total}")
     for _ in range(total):
         obj = rcli.lpop(redis_key)
         if obj is None:
-            logger.warning(f'{table} 错误数据:{obj}')
+            logger.warning(f"[Send]{table} 错误数据:{obj}")
             continue
 
         try:
             item = literal_eval(obj)
-            if table != 'mgp_list':
+            if table != "mgp_list":
                 insert_one(table, item)
             else:
                 title = item.get("item").get("title")
@@ -198,7 +206,7 @@ def sync_data(table):
                     t_diff = int(time.time()) - item.get("comeintime")
                     if t_diff <= DELAY:
                         rpush(redis_key, item)
-                        logger.info(f"{site}-{title}-等待{t_diff}秒--延时入库")
+                        logger.info(f"[Send]{site}-{title}-等待{t_diff}秒--延时入库")
                 # es检索流程
                 elif item.get("if_es"):
                     pt = item.get("item").get("publishtime")
@@ -207,22 +215,24 @@ def sync_data(table):
                 else:
                     insert_one(table, item)
         except Exception as e:
-            # print(e)
-            # print(f'{table} {type(obj)} >>>>> {repr(obj)}')
             rpush(redis_key, obj)
+            logger.error("[Send]" + table + f"--推送失败,原因:{''.join(e.args)}")
 
 
 def err_msg(worker):
     err = worker.exception()
     if err:
-        logger.exception("worker err: {}".format(err))
+        logger.exception("[Send]worker err: {}".format(err))
     return worker
 
 
 @func_set_timeout(60 * 20)
 def main():
-    logger.info("数据同步开始")
-    tables = ["mgp_list", "data_bak", "spider_heartbeat", "njpc_list", "data_njpc", "listdata_err"]
+    logger.info("[Send]数据同步开始")
+    tables = [
+        "mgp_list", "data_bak", "njpc_list", "data_njpc",
+        "listdata_err", "spider_heartbeat",
+    ]
     with ThreadPoolExecutor() as pool:
         futures = []
         for table in tables:
@@ -230,11 +240,11 @@ def main():
             f.add_done_callback(err_msg)
             futures.append(f)
         wait(futures)
-    logger.info("数据同步结束")
+    logger.info("[Send]数据同步结束")
 
 
 if __name__ == '__main__':
     try:
         main()
     except FunctionTimedOut:
-        logger.warning("数据同步超时")
+        logger.warning("[Send]数据同步超时")

+ 13 - 6
A数据处理/sync_data/summary_data.py

@@ -1,4 +1,11 @@
-
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-04-04
+---------
+@summary:  心跳数据汇总推送到list,目前仅汇总了列表页采集
+---------
+@author: dzr
+"""
 from datetime import datetime, time, timedelta
 
 from bson.int64 import Int64
@@ -42,11 +49,11 @@ def save(documents, collection):
         if len(data_lst) % 100 == 0:
             collection.insert_many(data_lst)
             data_lst.clear()
-            logger.info(f"{collection.name}-批量保存{count}条数据--已完成")
+            logger.info(f"[Summary]{collection.name}-批量保存{count}条数据--已完成")
 
     # 提交剩余数据
     collection.insert_many(data_lst)
-    logger.info(f"{collection.name}-批量保存{count}条数据--已完成")
+    logger.info(f"[Summary]{collection.name}-批量保存{count}条数据--已完成")
 
 
 def summary_data(document, runtime, only_count_list_page=False):
@@ -67,7 +74,7 @@ def summary_data(document, runtime, only_count_list_page=False):
             "create_at": Int64(datetime.now().timestamp())
         }
         if len(spider_item) > 1:
-            logger.warning(f"{spidercode} -> {site} --spidercode业务对应关系错误")
+            logger.warning(f"[Summary]{spidercode} -> {site} --映射关系错误")
             data["warning"] = "spidercode业务对应关系错误"
 
         if only_count_list_page:
@@ -116,7 +123,7 @@ def feapder_crawl_aggregate_of_list_pages(datestr=None):
         save(results, summary_table_of_list_pages)
     finally:
         client.close()
-        logger.info("feapder数据汇总结束")
+        logger.info("[Summary]feapder数据汇总结束")
 
 
 def py_spiders_crawl_aggregate_of_list_pages(datestr=None):
@@ -170,7 +177,7 @@ def py_spiders_crawl_aggregate_of_list_pages(datestr=None):
         save(results, summary_table_of_list_pages)
     finally:
         client.close()
-        logger.info("py_spiders数据汇总结束")
+        logger.info("[Summary]py_spiders数据汇总结束")
 
 
 if __name__ == '__main__':