liumiaomiao hace 6 meses
padre
commit
cc17470933
Se han modificado 2 ficheros con 138 adiciones y 139 borrados
  1. 50 76
      client_spider_mongo.py
  2. 88 63
      util/spider_statistics.py

+ 50 - 76
client_spider_mongo.py

@@ -10,9 +10,9 @@ import json
 from datetime import datetime, timedelta
 from elasticsearch import Elasticsearch
 
+#直接给es数据打分,并将结果存入mongo库,不在拉取数据打分
 
 ReluClient = MongoDBInterface(ReluMongodb)
-
 # 评估服务配置
 a2s_ip = "192.168.3.240:9090"
 # a2s_ip = "172.17.0.11:9090"
@@ -23,18 +23,19 @@ timeout = 180
 
 # 获取当前时间
 now = datetime.now()
-current_datetime = now.strftime("%Y-%m-%d %H:%M:%S")
-# 获取今天的日期
-today = datetime.today()
-# 获取昨天的日期
-yesterday = today - timedelta(days=1)
-# 获取昨天0点的时间
-yesterday_midnight = datetime(yesterday.year, yesterday.month, yesterday.day)
-# 获取今天0点的时间
-today_midnight = datetime(today.year, today.month, today.day)
-# 转换为Unix时间戳
-start_date = int(yesterday_midnight.timestamp())
-end_date = int(today_midnight.timestamp())
+# current_datetime = int(now.timestamp())
+
+# 计算昨天的时间
+yesterday = now - timedelta(days=1)
+# 获取昨天早上8点的时间
+yesterday_8_am = datetime(yesterday.year, yesterday.month, yesterday.day, 8, 0, 0)
+# 转换为时间戳(秒级)
+current_datetime = int(yesterday_8_am.timestamp())
+
+# 时间段
+start_date = int(datetime(2025, 1, 23, 8, 0, 0).timestamp())  # 2025-01-20 00:00:00
+end_date = int(datetime(2025, 1, 23, 12, 0, 0).timestamp())  # 2025-01-20 23:59:59
+
 
 # ES 连接配置
 es_host = "http://127.0.0.1:19800"
@@ -66,45 +67,22 @@ def get_rule(company, version):
     rule_id = ReluClient.find_rule_by_company(ReluMongodb["col"], company, version)
     return rule_id
 
-def find_error_id(conn, cleaned_key, sub_value):
-    """
-    查找 error_dict 中的 id
-    """
-    query = """SELECT id FROM error_dict WHERE fields = %s AND error = %s"""
-    params = (cleaned_key, sub_value)
-    result = MysqlUtil.query_data(conn, query, params)
-    #[(10,)]
-    # 检查查询结果是否为空
-    if not result:
-        print(f"Error: No matching record found for fields={cleaned_key}, error={sub_value}")
-        return None  # 或者返回一个默认值,根据需求而定
-
-    record = result[0][0]
-    return record
-
-def insert_batch_data(conn, params):
-    """
-    执行批量插入数据
-    """
-    query = """INSERT IGNORE INTO bid_analysis (mongoid,toptype,subtype, site, spidercode, channel,comeintime, area, city, district, score, error_type, spider_modified_time) 
-               VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s,%s)"""
-    MysqlUtil.insert_data(conn, query, params)
-
-def insert_dynamic_error_field(conn, cleaned_key, error_ids, mongoid):
-    """
-    动态插入 error_ids 到相应的 cleaned_key_error 字段
+def insert_batch_data_mongo(collection,params):
     """
-    # 构造动态插入 SQL 语句,更新指定的 cleaned_key_error 字段
-    query = f"""
-        UPDATE bid_analysis 
-        SET {cleaned_key}_error = %s 
-        WHERE mongoid = %s
+    执行批量插入数据到 MongoDB
     """
-    # 拼接多个 error_id,用分隔符分开
-    error_ids_str = ','.join(map(str, error_ids))
-    params = (error_ids_str, mongoid )
-
-    MysqlUtil.update_data(conn, query, params)
+    # 将参数转换为字典列表
+    documents=["mongoid","toptype","subtype","site","spidercode","channel","comeintime", "area","city","district","error_type","spider_modified_time","spider_important","site_important","create_time"]
+    doc={}
+    for indx,param in enumerate(params):
+        doc[documents[indx]] =param
+    print(f"{doc}数据")
+    # 插入数据
+    try:
+        collection.insert_one(doc)  # `ordered=False` 忽略重复数据错误
+        print(f"{doc}数据已成功插入到 MongoDB")
+    except Exception as e:
+        print("插入数据时发生错误:", e)
 
 def get_last_processed_id():
     """
@@ -136,11 +114,16 @@ def batch_load_data():
     rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.2")
     print(rules_id)
 
-    # 初始化mysql
-    # conn = MysqlUtil.connect_to_mysql(host='192.168.3.14', port='4000', user='DataScBi', password='DT#Sc20221123Ht',database='quality')
-    conn = MysqlUtil.connect_to_mysql(host='192.168.3.217', port='4000', user='root', password='=PDT49#80Z!RVv52_z',database='quality')
+
     # 初始化爬虫代码库
-    collection = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["lua_logs_auditor"]
+    collection_spider = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["lua_logs_auditor"]
+    # 初始化爬虫config代码库
+    collection_config = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["luaconfig"]
+    # 初始化site代码库
+    collection_site = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["site"]
+    #初始化mongo库
+    conn = MongoClient('192.168.3.149', 27180, unicode_decode_error_handler="ignore").data_quality
+    coll_user = conn["bid_analysis"]
 
     # 获取上次处理的 ID,如果没有,则从头开始
     last_processed_id = get_last_processed_id()
@@ -198,42 +181,33 @@ def batch_load_data():
 
                 data = result.get("data", {})
 
-                # 数据插入到 MySQL
+                # 数据插入到 mongo
                 toptype = item.get("toptype", "")
                 subtype = item.get("subtype", "")
                 site = item.get("site", "")
                 spidercode = item.get("spidercode", "")
                 channel = item.get("channel", "")
                 comeintime = item.get("comeintime", "")
-                comeintime = datetime.fromtimestamp(comeintime)
                 area = item.get("area", "")
                 city = item.get("city", "")
                 district = item.get("district", "")
-                score = item.get("score", "")
-                error_type_data = json.dumps(data)
                 spider_modified_time = current_datetime
-
-                info = collection.find_one({"code": spidercode})
+                spider_important = False
+                site_important = 0
+                create_time = current_datetime
+                info = collection_spider.find_one({"code": spidercode})
                 if info:
                     spider_modified_time = info.get("modifytime", "")
-                    spider_modified_time = datetime.fromtimestamp(spider_modified_time)
-
-                params = (item["_id"], toptype, subtype, site, spidercode,channel, comeintime, area, city, district, score, error_type_data,spider_modified_time)
-                insert_batch_data(conn, params)
+                info_config=collection_config.find_one({"code": spidercode})
+                if info_config:
+                    spider_important = info_config.get("spiderimportant","")
+                info_site = collection_site.find_one({"site": site})
+                if info_site:
+                    site_important = info_site.get("important","")
+                params = (item["_id"], toptype, subtype, site, spidercode,channel, comeintime, area, city, district, data,spider_modified_time,spider_important,site_important,create_time)
+                insert_batch_data_mongo(coll_user, params)
 
-                # 遍历错误原因字典并提取非空字典中的值
-                for key, value in data.items():
-                    error_ids = []
-                    if isinstance(value, dict) and value:
-                        cleaned_key = key[:-3] if key.endswith('_qa') else key  # 去除 '_qa' 后缀
-                        for sub_key, sub_value in value.items():
-                            error_id = find_error_id(conn, cleaned_key, sub_value)
-                            if error_id:
-                                error_ids.append(error_id)
-                            print(f"  {sub_key}: {sub_value}")
 
-                        # 插入错误ID到cleaned_key_error字段
-                        insert_dynamic_error_field(conn, cleaned_key, error_ids, item["_id"])
                 print("------一条数据结束------")
             # 保存当前批次处理的最大 ID
             if max_id:

+ 88 - 63
util/spider_statistics.py

@@ -1,70 +1,95 @@
-from util.mysql_tool import MysqlUtil
-import pandas as pd
-from datetime import datetime,timedelta
+from pymongo import MongoClient
+from datetime import datetime, timedelta
 
-# 连接到MySQL数据库
-conn = MysqlUtil.connect_to_mysql(host='192.168.3.217', port='4000', user='root', password='=PDT49#80Z!RVv52_z',database='quality')
-cursor = conn.cursor()
+# 连接到MongoDB
+client = MongoClient("mongodb://192.168.3.149:27180/")
+db = client['data_quality']
+collection = db['bid_analysis']
 
-# 获取当天的日期,作为batch字段
-today_date = datetime.now().strftime('%Y-%m-%d')
+# 定义两个任意时间段的边界
+# 时间段1
+period1_start = int(datetime(2025, 1, 20, 0, 0, 0).timestamp())  # 2025-01-20 00:00:00
+period1_end = int(datetime(2025, 1, 20, 23, 59, 59).timestamp())  # 2025-01-20 23:59:59
 
-# 获取当前时间
-now = datetime.now()
-current_datetime = now.strftime("%Y-%m-%d %H:%M:%S")
-# 获取今天的日期
-today = datetime.today()
-# 获取昨天的日期
-yesterday = today - timedelta(days=1)
-# 获取昨天0点的时间
-yesterday_midnight = datetime(yesterday.year, yesterday.month, yesterday.day)
-# 获取今天0点的时间
-today_midnight = datetime(today.year, today.month, today.day)
-# 转换为Unix时间戳
-start_date = int(yesterday_midnight.timestamp())
-end_date = int(today_midnight.timestamp())
+# 时间段2
+period2_start = int(datetime(2025, 1, 21, 0, 0, 0).timestamp())  # 2025-01-21 00:00:00
+period2_end = int(datetime(2025, 1, 21, 23, 59, 59).timestamp())  # 2025-01-21 23:59:59
 
-# 查询去重后的site, channel和spidercode
-query = """
-SELECT DISTINCT site, channel, spidercode
-FROM bid_analysis where comeintime< end_date ;
-"""
+# 聚合查询:计算两个时间段的数量和波动率
+pipeline = [
+    {
+        "$project": {
+            "spidercode": 1,
+            "create_time": 1,
+            "date": {"$toDate": {"$multiply": ["$create_time", 1000]}}  # 转换为毫秒级日期
+        }
+    },
+    {
+        "$group": {
+            "_id": "$spidercode",
+            "total_count": {"$sum": 1},
+            "period1_count": {
+                "$sum": {
+                    "$cond": [
+                        {"$and": [
+                            {"$gte": ["$create_time", period1_start]},
+                            {"$lte": ["$create_time", period1_end]}
+                        ]},
+                        1,
+                        0
+                    ]
+                }
+            },
+            "period2_count": {
+                "$sum": {
+                    "$cond": [
+                        {"$and": [
+                            {"$gte": ["$create_time", period2_start]},
+                            {"$lte": ["$create_time", period2_end]}
+                        ]},
+                        1,
+                        0
+                    ]
+                }
+            }
+        }
+    },
+    {
+        "$project": {
+            "spidercode": "$_id",
+            "total_count": 1,
+            "period1_count": 1,
+            "period2_count": 1,
+            "volatility": {
+                "$cond": [
+                    {"$eq": ["$period1_count", 0]},
+                    None,  # 如果 Period1 Count 为 0,则波动率为 None
+                    {
+                        "$divide": [
+                            {"$subtract": ["$period2_count", "$period1_count"]},
+                            "$period1_count"
+                        ]
+                    }
+                ]
+            }
+        }
+    }
+]
 
-cursor.execute(query)
+# 执行聚合查询
+result = collection.aggregate(pipeline)
 
-# 获取查询结果
-data = cursor.fetchall()
+# 打印结果
+for doc in result:
+    print(doc)
 
-# 构建DataFrame,用于处理和插入
-statistics_data = []
-for row in data:
-    site, channel, spidercode = row
-    statistics_data.append({
-        'batch': today_date,
-        'site': site,
-        'channel': channel,
-        'spidercode': spidercode,
-        'created_at': today_date  # 使用当天日期作为创建时间
-    })
-
-# 将数据转换为pandas DataFrame
-df = pd.DataFrame(statistics_data)
-
-# 准备插入statistics表的SQL语句
-insert_query = """
-INSERT INTO statistics (batch, site, channel, spidercode, created_at)
-VALUES (%s, %s, %s, %s, %s);
-"""
-
-# 将DataFrame的数据插入到数据库
-for index, row in df.iterrows():
-    cursor.execute(insert_query, (row['batch'], row['site'], row['channel'], row['spidercode'], row['created_at']))
-
-# 提交事务
-conn.commit()
-
-# 关闭数据库连接
-cursor.close()
-conn.close()
-
-print(f"成功将 {len(df)} 条记录插入到statistics表中.")
+'''
+示例输出:
+{
+    "spidercode": "spider1",
+    "total_count": 200,
+    "period1_count": 60,
+    "period2_count": 70,
+    "volatility": 0.1667
+}
+'''