Forráskód Böngészése

update:修复延时流程丢数据bug

dongzhaorui 2 éve
szülő
commit
5d4e710d92
1 módosított fájl, 113 hozzáadás és 48 törlés
  1. 113 48
      A数据处理/sync_data/send_data.py

+ 113 - 48
A数据处理/sync_data/send_data.py

@@ -26,7 +26,6 @@ from redis.exceptions import DataError
 from log import logger
 
 
-# redis
 class Encoder(RedisEncoder):
 
     def encode(self, value):
@@ -58,13 +57,12 @@ class Encoder(RedisEncoder):
         return value
 
 
+# redis
+redis.connection.Encoder = Encoder
 REDIS_HOST = "172.17.4.232"
 REDIS_PORT = 7361
 REDISDB_USER_PASS = "k5ZJR5KV4q7DRZ92DQ"
 REDIS_DB = 10
-
-redis.connection.Encoder = Encoder
-
 pool = redis.ConnectionPool(
     host=REDIS_HOST,
     port=REDIS_PORT,
@@ -72,6 +70,7 @@ pool = redis.ConnectionPool(
     db=REDIS_DB
 )
 rcli = redis.StrictRedis(connection_pool=pool, decode_responses=True)
+redis_prefix = "savemongo"
 
 # mongo
 MONGO_HOST = "172.17.4.87"
@@ -86,11 +85,16 @@ ES_PORT = 9800
 ES_INDEX = "biddingall"
 ecli = Elasticsearch([{"host": ES_HOST, "port": ES_PORT}])
 
-# 延时间隔
-DELAY = 43200
+
+def err_msg(worker):
+    err = worker.exception()
+    if err:
+        logger.exception("[Send]worker err: {}".format(err))
+    return worker
 
 
 def literal_eval(node_or_string):
+    """反序列化数据"""
     try:
         return ast.literal_eval(node_or_string)
     except ValueError as e:
@@ -143,10 +147,6 @@ def es_query(title, publish_time):
     return total
 
 
-def get_redis_key(table, prefix="savemongo:"):
-    return prefix + table
-
-
 def rpush(name, values, is_redis_cluster=False):
     """“将“values”推到列表“name”的尾部”"""
     if isinstance(values, list):
@@ -168,6 +168,7 @@ def handle_big_document(item):
 
 def insert_one(table, item: Dict):
     """MongoDB 单条入库"""
+    table = "".join(table.split(f"{redis_prefix}:"))
     if item is not None:
         item.pop("_id", "")
         if item.get("comeintime"):
@@ -180,14 +181,93 @@ def insert_one(table, item: Dict):
             if "BSON document too large" in ''.join(e.args):
                 handle_big_document(item)  # MongoDB文档保存要求 BSON 大小限制 16 MB
 
-            rpush(get_redis_key(table), item)
-            logger.error("[Send]" + table + f"--推送失败,原因:{''.join(e.args)}")
+            # rpush(get_redis_key(table), item)
+            rpush(table, item)
+            logger.error(f"[Send]{table}--推送失败,原因:{''.join(e.args)}")
+
+
+def delay_push_to_db(table_name, data, delay_time=43200):
+    """
+    第三方数据,需延时入库,推送爬虫生产库
 
+    @param table_name: 表名
+    @param data: 延时的数据
+    @param delay_time: 延时时长,单位:秒
+    @return:
+    """
+    site = data.get("item").get("site")
+    title = data.get("item").get("title")
+    time_diff = int(time.time()) - data.get("comeintime")
+    if time_diff <= delay_time:
+        rpush(table_name, data)
+        logger.info(f"[Send]{site}-{title}-等待{time_diff}秒--延时推送")
+    else:
+        logger.info(f"[Send]{site}-{title}-等待{time_diff}秒--延时入库")
+        insert_one(table_name, data)
+    return True
+
+
+def es_retrieval_push_to_db(table_name, data):
+    """
+    通过es(近3月增量数据)进行数据去重,推送爬虫生产库
 
-def sync_data(table):
-    redis_key = get_redis_key(table)
+    @param table_name: 表名
+    @param data: 判重数据
+    @return:
+    """
+    site = data.get("item").get("site")
+    title = data.get("item").get("title")
+    pt = data.get("item").get("publishtime")
+    if not title or not pt:  # es检索必须提供标题和发布时间,否则数据按照垃圾数据丢弃处理
+        return False
+
+    count = es_query(title.strip(), pt)
+    if count == 0:
+        insert_one(table_name, data)
+    logger.info(f"[Send]{site}-{title}-检索到{count}条--ES检索")
+    return True
+
+
+def mixture_process_push_to_db(table_name, data, delay_time=43200):
+    """
+    延时 + es检索 混合检索数据,推送爬虫生产库
+
+    @param table_name: 表名
+    @param data: 判重数据
+    @param delay_time: 延时时长,单位:秒
+    @return:
+    """
+    site = data.get("item").get("site")
+    title = data.get("item").get("title")
+    pt = data.get("item").get("publishtime")
+    if not title and not pt:  # es检索必须提供标题和发布时间,否则数据按照垃圾数据丢弃处理
+        return False
+
+    is_continue = False
+    time_diff = int(time.time()) - data.get("comeintime")
+    count = es_query(title.strip(), pt)
+    if count == 0:
+        if time_diff <= delay_time:
+            rpush(table_name, data)
+        else:
+            insert_one(table_name, data)
+        is_continue = True
+
+    msg = "保持轮询检索" if is_continue else "删除重复数据"
+    logger.info(f"[Send]{site}-{title}-{msg}--混合检索")
+    return True
+
+
+def sync_data(table: str):
+    """
+    保存数据
+
+    @param table:
+    @return:
+    """
+    redis_key = table
     total = rcli.llen(redis_key)
-    logger.info(f"[Send]同步表名:{table},推送总数:{total}")
+    logger.info(f"[Send]同步数据表名:{table},推送总数:{total}")
     for _ in range(total):
         obj = rcli.lpop(redis_key)
         if obj is None:
@@ -196,55 +276,40 @@ def sync_data(table):
 
         try:
             item = literal_eval(obj)
-            if table != "mgp_list":
+            if all([not table.endswith(char) for char in ["mgp_list", "bidding"]]):
                 insert_one(table, item)
             else:
-                title = item.get("item").get("title")
-                # 延时推送流程
-                if item.get("is_delay"):
-                    site = item.get("item").get("site")
-                    t_diff = int(time.time()) - item.get("comeintime")
-                    if t_diff <= DELAY:
-                        rpush(redis_key, item)
-                        logger.info(f"[Send]{site}-{title}-等待{t_diff}秒--延时入库")
-                # es检索流程
-                elif item.get("if_es"):
-                    pt = item.get("item").get("publishtime")
-                    if title is not None and es_query(title.strip(), pt) == 0:
-                        insert_one(table, item)
+                is_delay = item.get("is_delay")  # 延时推送
+                is_es_retrieval = item.get("if_es")  # es检索
+                if is_delay and is_es_retrieval:
+                    mixture_process_push_to_db(table, item)
+                elif is_delay and not is_es_retrieval:
+                    delay_push_to_db(table, item)
+                elif not is_delay and is_es_retrieval:
+                    es_retrieval_push_to_db(table, item)
                 else:
                     insert_one(table, item)
         except Exception as e:
-            rpush(redis_key, obj)
-            logger.error("[Send]" + table + f"--推送失败,原因:{''.join(e.args)}")
-
-
-def err_msg(worker):
-    err = worker.exception()
-    if err:
-        logger.exception("[Send]worker err: {}".format(err))
-    return worker
+            rpush(table, obj)
+            logger.error(f"[Send]{table}--推送失败,原因:{''.join(e.args)}")
 
 
 @func_set_timeout(60 * 20)
 def main():
-    logger.info("[Send]数据同步开始")
-    tables = [
-        "mgp_list", "data_bak", "njpc_list", "data_njpc",
-        "listdata_err", "spider_heartbeat",
-    ]
-    with ThreadPoolExecutor() as pool:
+    logger.info("[Send]同步数据开始")
+    with ThreadPoolExecutor() as threadPool:
         futures = []
-        for table in tables:
-            f = pool.submit(sync_data, table)
+        for key in rcli.keys(f"{redis_prefix}:*"):
+            table = key.decode()
+            f = threadPool.submit(sync_data, table)
             f.add_done_callback(err_msg)
             futures.append(f)
         wait(futures)
-    logger.info("[Send]数据同步结束")
+    logger.info("[Send]同步数据结束")
 
 
 if __name__ == '__main__':
     try:
         main()
     except FunctionTimedOut:
-        logger.warning("[Send]数据同步超时")
+        logger.warning("[Send]同步数据超时")