liumiaomiao 5 ay önce
ebeveyn
işleme
cab8f7962b

+ 34 - 3
lib/monitor_tools.py

@@ -125,6 +125,28 @@ class monitorTools:
         print("拟在建es入库数据总量:", count)
         return count
 
+    #医械通es,每周统计入库数量
+    def medical_es(self):
+        """
+        es链接
+        """
+        db_config = {
+            # es
+            'es_host': '127.0.0.1',
+            'es_port': 19800,
+            'es_http_auth': ('jianyuGr', 'we3g8glKfe#'),  # 重新申请
+            'timeout': 10000,
+            'index': "bidding"
+        }
+        query = {
+            "query": {"bool": {"must": [{"range": {"comeintime": {"from": f"{start_date}", "to": f"{end_date}"}}},{"term": {"bid_field": "0101"}}]}}}
+        # 传入查询语句query 以及配置信息
+        # es=esutil.get_es(db_config["es_host"], db_config["es_http_auth"], db_config["es_port"],db_config["index"])
+        counts = esutil.get_es_count(query, **db_config)
+        count = counts['count']
+        print("医械通es每周入库数据量:", count)
+        return count
+
     #标准库bidding-mongo 每周统计入库数量
     def bidding(self):
         collection = MongoUtil.get_coon(host='127.0.0.1:27088', database='qfw',collection='bidding',authuser='viewdata',authpass='viewdata')
@@ -188,12 +210,12 @@ class monitorTools:
         # SQL 查询
         mysql_query = "SELECT COUNT(*) FROM jianyu_subjectdb.dwd_f_nzj_baseinfo WHERE createtime >= %s AND createtime <= %s"
         params = (start_date, end_date)
-        conn=MysqlUtil.connect_to_mysql(host='192.168.3.149',port='4000',user='datagroup',password='Dgrpdb#2024@36',database='jianyu_subjectdb')
+        conn=MysqlUtil.connect_to_mysql(host='127.0.0.1',port='4000',user='jydev',password='JSuytest#s211',database='jianyu_subjectdb')
         count=MysqlUtil.execute_sql(conn,mysql_query,params)
         print("拟在建baseinfo-mysql每周统计入库数量", count)
         return count
 
-     #人脉数据,每周统计入库数量
+    #人脉数据,每周统计入库数量
     def connections(self):
         try:
             query = f"SELECT COUNT(*) FROM information.transaction_info_all WHERE create_time >={start_date} AND create_time <={end_date}"
@@ -207,9 +229,18 @@ class monitorTools:
             logger.error("An error occurred: %s", e)
             raise
 
+    #医械通,每周统计入库数量
+    def medical(self):
+        collection = MongoUtil.get_coon(host='127.0.0.1:27088', database='qfw', collection='bidding',
+                                        authuser='viewdata', authpass='viewdata')
+        query = {"comeintime": {"$gte": start_date, "$lt": end_date},"bid_field":"0101"}
+        count = MongoSentence.count(collection, query)
+        print("医械通每周统计入库数量", count)
+        return count
+
     #统计结果入库
     def save_to_mongo(self,title,count):
-        collection=Data_save.save_con(host='192.168.3.149',port=27180,database='data_quality',collection='statistics')
+        collection=Data_save.save_con(host='172.20.45.129',port=27002,database='data_quality',collection='statistics')
         now = datetime.now()
         timestamp = int(now.timestamp())
         document = {

+ 0 - 31
tools/mongo断流监控/mongo_monitor.py

@@ -1,31 +0,0 @@
-#!/usr/bin/env python
-# -*- coding:utf-8 -*-
-# author : liumiaomiao
-
-from lib.monitor_tools import monitor
-
-#标准库-bidding
-bidding_count=monitor.bidding()
-# #存库
-# monitor.save_to_mongo("bidding",bidding_count)
-# print("bidding入库成功")
-# #高质量库-mongo
-# bidding_ai_count=monitor.bidding_ai()
-# monitor.save_to_mongo("bidding_ai",bidding_ai_count)
-# print("bidding_ai入库成功")
-# # nzj-mysql
-# nzj_count=monitor.nzj()
-# monitor.save_to_mongo("nzj",nzj_count)
-# print("nzj入库成功")
-#bidding_碎片化
-bidding_fragment_count=monitor.bidding_fragment()
-monitor.save_to_mongo("bidding_fragment",bidding_fragment_count)
-print("bidding_fragment入库成功")
-# #人脉clickhouse数据
-# connections_count=monitor.connections()
-# monitor.save_to_mongo('connections',connections_count)
-# print("connections入库成功")
-
-
-
-

+ 11 - 8
tools/mongo断流监控/es_monitor.py → tools/mongo,es断流监控/es_monitor.py

@@ -9,16 +9,19 @@ es_bidding_count=monitor.es_bidding()
 #存库
 monitor.save_to_mongo("es_bidding",es_bidding_count)
 print("es_bidding入库成功")
-# #es-bidding_ai
-# es_bidding_ai_count=monitor.es_bidding_ai()
-# monitor.save_to_mongo("es_bidding_ai",es_bidding_ai_count)
-# print("es_bidding_ai入库成功")
+
+#es-拟在建
+es_nzj_count=monitor.es_nzj()
+monitor.save_to_mongo("es_nzj",es_nzj_count)
+print("es_nzj入库成功")
+
 #es-bidding_碎片化
 es_bidding_fragment_count=monitor.es_bidding_fragment()
 monitor.save_to_mongo("es_bidding_fragment",es_bidding_fragment_count)
 print("es_bidding_fragment入库成功")
-# #es-拟在建
-es_nzj_count=monitor.es_nzj()
-monitor.save_to_mongo("es_nzj",es_nzj_count)
-print("es_nzj入库成功")
 
+#医械通
+medical_count = monitor.medical_es()
+monitor.save_to_mongo("medical_es",medical_count)
+print(medical_count)
+print("医械通数据入库成功")

+ 37 - 0
tools/mongo,es断流监控/mongo_monitor.py

@@ -0,0 +1,37 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# author : liumiaomiao
+
+from lib.monitor_tools import monitor
+
+#标准库-bidding
+bidding_count=monitor.bidding()
+#存库
+monitor.save_to_mongo("bidding",bidding_count)
+print("bidding入库成功")
+
+#nzj-mysql,需要查正式环境tidb库,需要
+nzj_count=monitor.nzj()
+print(nzj_count)
+monitor.save_to_mongo("nzj",nzj_count)
+print("nzj入库成功")
+
+#人脉clickhouse数据
+connections_count=monitor.connections()
+monitor.save_to_mongo('connections',connections_count)
+print("connections入库成功")
+
+#医械通
+medical_count=monitor.medical()
+print(medical_count)
+monitor.save_to_mongo('medical',medical_count)
+print("医械通数据入库成功")
+
+# #bidding_碎片化
+# bidding_fragment_count=monitor.bidding_fragment()
+# monitor.save_to_mongo("bidding_fragment",bidding_fragment_count)
+# print("bidding_fragment入库成功")
+
+
+
+

+ 14 - 6
tools/周报表格导出/DataExport_forTesting.py

@@ -4,8 +4,8 @@ import pandas as pd
 import openpyxl
 # 数据入库量及数据监控时效 导出execl
 # MongoDB连接配置
-host = '192.168.3.149'
-port = 27180
+host = '172.20.45.129'
+port = 27002
 dbname = 'data_quality'
 collection_name = 'statistics'
 
@@ -39,6 +39,7 @@ pipeline_mongo = [
                 {"bidding_ai.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
                 {"connections.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
                 {"nzj.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
+                {"medical.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
                 {"bidding_fragment.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}}
             ]
         }
@@ -56,6 +57,7 @@ bidding_count = 0
 bidding_ai_count = 0
 connections_count = 0
 nzj_count = 0
+medical_count = 0
 bidding_fragment_data = {
     "情报_法务": 0,
     "情报_财务审计": 0,
@@ -79,6 +81,8 @@ for doc in data_mongo:
         connections_count += doc['connections'].get('count', 0)
     if 'nzj' in doc:
         nzj_count += doc['nzj'].get('count', 0)
+    if 'medical' in doc :
+        medical_count += doc['medical'].get('count', 0)
     if 'bidding_fragment' in doc:
         for key, value in doc['bidding_fragment'].get('count', {}).items():
             if key in bidding_fragment_data:
@@ -94,6 +98,7 @@ pipeline_es = [
                 {"es_bidding.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
                 {"es_bidding_ai.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
                 {"es_nzj.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
+                {"medical_es.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
                 {"es_bidding_fragment.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}}
             ]
         }
@@ -110,6 +115,7 @@ data_es = list(collection.aggregate(pipeline_es))
 es_bidding_count = 0
 es_bidding_ai_count = 0
 es_nzj_count = 0
+es_medical_count = 0
 es_bidding_fragment_data = {
     "情报_法务": 0,
     "情报_财务审计": 0,
@@ -131,6 +137,8 @@ for doc in data_es:
         es_bidding_ai_count += doc['es_bidding_ai'].get('count', 0)
     if 'es_nzj' in doc:
         es_nzj_count += doc['es_nzj'].get('count', 0)
+    if 'es_medical_count' in doc:
+        es_medical_count += doc['es_medical'].get('count', 0)
     if 'es_bidding_fragment' in doc:
         for key, value in doc['es_bidding_fragment'].get('count', {}).items():
             if key in es_bidding_fragment_data:
@@ -191,11 +199,11 @@ for doc in data_timeliness:
 date_range = f"{start_time.strftime('%Y/%m/%d')}-{end_time.strftime('%Y/%m/%d')}"
 
 # 构建Excel数据
-columns = ['日期', '标讯每周入库数据量', '高质量库每周入库数据量', '人脉管理数据', '拟在建数据量(全国)'] + list(bidding_fragment_data.keys())
-data_row_mongo = [date_range, bidding_count, bidding_ai_count, connections_count, nzj_count] + list(bidding_fragment_data.values())
+columns = ['日期', '标讯每周入库数据量', '高质量库每周入库数据量', '人脉管理数据', '拟在建数据量(全国)','医械通'] + list(bidding_fragment_data.keys())
+data_row_mongo = [date_range, bidding_count, bidding_ai_count, connections_count, nzj_count,medical_count] + list(bidding_fragment_data.values())
 
-columns_es = ['日期', '标讯每周入库数据量', '高质量库每周数据入库量', '拟在建数据量(全国)'] + list(es_bidding_fragment_data.keys())
-data_row_es = [date_range, es_bidding_count, es_bidding_ai_count, es_nzj_count] + list(es_bidding_fragment_data.values())
+columns_es = ['日期', '标讯每周入库数据量', '高质量库每周数据入库量', '拟在建数据量(全国)','医械通'] + list(es_bidding_fragment_data.keys())
+data_row_es = [date_range, es_bidding_count, es_bidding_ai_count, es_nzj_count,es_medical_count] + list(es_bidding_fragment_data.values())
 
 columns_timeliness = ['日期'] + list(timeliness_data.keys())
 data_row_timeliness = [date_range] + list(timeliness_data.values())

BIN
tools/周报表格导出/mongo_data_statistics_combined1.xlsx


+ 6 - 6
tools/波动率计算/一份数据基于时间段计算波动率.py

@@ -1,19 +1,19 @@
 from pymongo import MongoClient
-from datetime import datetime, timedelta
+from datetime import datetime
 
 # 连接到MongoDB
-client = MongoClient("mongodb://192.168.3.149:27180/")
+client = MongoClient("mongodb://172.20.45.129:27002/")
 db = client['data_quality']
 collection = db['bid_analysis']
 
 # 定义两个任意时间段的边界
 # 时间段1
-period1_start = int(datetime(2025, 1, 23, 0, 0, 0).timestamp())  # 2025-01-20 00:00:00
-period1_end = int(datetime(2025, 1, 23, 23, 59, 59).timestamp())  # 2025-01-20 23:59:59
+period1_start = int(datetime(2025, 2, 12, 0, 0, 0).timestamp())  # 2025-01-20 00:00:00
+period1_end = int(datetime(2025, 2, 12, 23, 59, 59).timestamp())  # 2025-01-20 23:59:59
 
 # 时间段2
-period2_start = int(datetime(2025, 1, 24, 0, 0, 0).timestamp())  # 2025-01-21 00:00:00
-period2_end = int(datetime(2025, 1, 24, 23, 59, 59).timestamp())  # 2025-01-21 23:59:59
+period2_start = int(datetime(2025, 2, 19, 0, 0, 0).timestamp())  # 2025-01-21 00:00:00
+period2_end = int(datetime(2025, 2, 19, 23, 59, 59).timestamp())  # 2025-01-21 23:59:59
 
 # 聚合查询:计算两个时间段的数量和波动率
 pipeline = [

+ 3 - 3
tools/波动率计算/两份样本数据以站点为分组波动率输出.py

@@ -1,12 +1,12 @@
 from pymongo import MongoClient
 
 # 连接到MongoDB
-client = MongoClient("mongodb://192.168.3.149:27180/")
+client = MongoClient("mongodb://172.20.45.129:27002/")
 db = client['data_quality']
 
 # 假设有两个表:table1 和 table2
-table1_collection = db["bidding_20250117"]
-table2_collection = db["bidding_20250123"]
+table1_collection = db["bidding_20250206"]
+table2_collection = db["bidding_20250220"]
 
 # 聚合查询:计算table1中每个site的数量
 pipeline_table1 = [

+ 3 - 3
tools/波动率计算/两份样本数据基于爬虫代码输出波动率.py

@@ -1,12 +1,12 @@
 from pymongo import MongoClient
 
 # 连接到MongoDB
-client = MongoClient("mongodb://192.168.3.149:27180/")
+client = MongoClient("mongodb://172.20.45.129:27002/")
 db = client['data_quality']
 
 # 假设有两个表:table1 和 table2
-table1_collection = db["bidding_20250117"]
-table2_collection = db["bidding_20250123"]
+table1_collection = db["bidding_20250206"]
+table2_collection = db["bidding_20250220"]
 
 # 聚合查询:计算table1中每个spidercode的数量
 pipeline_table1 = [

+ 71 - 0
tools/爬虫相关/1、bid_analysis表抽取数据到抽取表.py

@@ -0,0 +1,71 @@
+from pymongo import MongoClient
+from datetime import datetime, timedelta
+
+# 连接到MongoDB
+client = MongoClient('mongodb://172.20.45.129:27002/')  # 连接到MongoDB,调整为你的MongoDB连接字符串
+db = client['data_quality']  # 选择数据库
+source_collection = db['bid_analysis']  # 假设源集合名为 bid_analysis
+target_collection = db['bid_extract']  # 目标集合 bid_extract
+
+# 在每次抽取前,先清空 bid_extract 表
+target_collection.delete_many({})  # 删除所有文档
+
+# 设置时间段(例如:2025-02-01 到 2025-02-15)
+start_date = datetime(2024, 1, 1)
+end_date = datetime(2026, 1, 1)
+
+# 将时间转换为Unix时间戳(秒)
+start_timestamp = int(start_date.timestamp())
+end_timestamp = int(end_date.timestamp())
+
+# # 获取当前时间并计算近一周的时间范围
+# one_week_ago = datetime.now() - timedelta(weeks=2)
+# one_week_ago_timestamp = int(one_week_ago.timestamp())
+#
+# print(f"Start timestamp: {start_timestamp}")
+# print(f"End timestamp: {end_timestamp}")
+# print(f"One week ago timestamp: {one_week_ago_timestamp}")
+# print(f"Current time: {datetime.now()}")
+
+# 聚合操作
+pipeline = [
+    # 1. 筛选指定时间段的数据
+    {
+        "$match": {
+            "create_time": {
+                "$gte": start_timestamp,
+                "$lte": end_timestamp
+            }
+        }
+    },
+    # 2. 根据spidercode去重,保留site, channel等字段
+    {
+        "$group": {
+            "_id": "$spidercode",  # 按spidercode去重
+            "site": { "$first": "$site" },
+            "channel": { "$first": "$channel" }
+        }
+    },
+    # 4. 选择要输出的字段
+    {
+        "$project": {
+            "_id": 0,  # 移除原来的 _id
+            "spidercode": "$_id",  # 使用原来的 _id 值作为 spidercode
+            "site": 1,
+            "channel": 1
+        }
+    },
+    # 5. 将处理后的结果存入bid_extract表
+    {
+        "$out": "bid_extract"
+    }
+]
+
+# 执行聚合操作
+result = source_collection.aggregate(pipeline)
+
+# 打印出每条数据的结果以调试
+for doc in result:
+    print(doc)
+
+print("聚合操作已完成,数据已存入 'bid_extract' 集合。")

+ 142 - 0
tools/爬虫相关/2、bid_analysis表错误原因及数量统计输出,存入抽取表.py

@@ -0,0 +1,142 @@
+from pymongo import MongoClient
+from collections import Counter
+
+# 连接到 MongoDB
+client = MongoClient('mongodb://172.20.45.129:27002/')  # 替换为你的 MongoDB 连接字符串
+db = client['data_quality']  # 选择数据库
+collection = db['bid_analysis']  # 选择集合
+bid_extract_collection = db['bid_extract']  # 假设 bid_extract 表名为 'bid_extract'
+
+# 假设你想要过滤出 create_time 为某个特定值的数据
+# specific_create_time = 1739289600  # 替换为你需要的 create_time 值
+
+# 聚合管道:按 spidercode 和 create_time 分组并统计每个 spidercode 的 error_type 错误
+pipeline = [
+    # {
+    #     "$match": {  # 过滤条件,筛选出 create_time 为特定值的数据
+    #         "create_time": specific_create_time
+    #     }
+    # },
+    {
+        "$group": {  # 按 spidercode 和 create_time 分组
+            "_id": {"spidercode": "$spidercode", "create_time": "$create_time"},  # 按 spidercode 和 create_time 分组
+            "errors": {"$push": "$error_type"}  # 将每个文档的 error_type 字段收集到数组中
+        }
+    }
+]
+
+# 执行聚合查询
+cursor = collection.aggregate(pipeline)
+
+# 错误类型统计字典
+error_count = {
+    "budget_qa": 0,
+    "title_qa": 0,
+    "projectname_qa": 0,
+    "projectcode_qa": 0,
+    "buyer_qa": 0,
+    "area_qa": 0,
+    "com_package_qa": 0
+}
+
+# 提取错误原因
+def extract_error_reason(error_data):
+    reasons = Counter()  # 使用 Counter 来统计每个错误原因的频次
+    if isinstance(error_data, dict):
+        for key, value in error_data.items():
+            if value:
+                # reason = value.split(':')[-1]  # 提取冒号后的错误原因
+                reasons[value] += 1  # 使用 Counter 自动统计出现次数
+    return reasons
+
+
+# 遍历聚合结果并更新数据库
+for result in cursor:
+    spidercode = result['_id']['spidercode']
+    create_time = result['_id']['create_time']  # 获取每个不同的 create_time(批次)
+
+    # 检查 bid_extract 表中是否已存在该 spidercode
+    existing_document = bid_extract_collection.find_one({"spidercode": spidercode})
+
+    # 如果找不到对应的 spidercode,跳过此记录
+    if not existing_document:
+        print(f"Spidercode {spidercode} 未找到,跳过此记录。")
+        continue
+
+    errors = result['errors']
+    total_errors = 0
+    total_corrects = 0  # 假设 total_corrects 可以通过某种方式计算
+
+    total_count = collection.count_documents({
+        "spidercode": spidercode,
+        "create_time": create_time  # 增加 create_time 限制,确保统计是按批次进行的
+    })
+
+    # 初始化错误类型和错误原因字典,每个批次之前清空
+    error_reasons = {
+        "budget_qa": Counter(),
+        "title_qa": Counter(),
+        "projectname_qa": Counter(),
+        "projectcode_qa": Counter(),
+        "buyer_qa": Counter(),
+        "area_qa": Counter(),
+        "com_package_qa": Counter()
+    }
+
+    # 检查每个 spidercode 的错误类型
+    for document in errors:
+        if document:
+            has_error = False  # 标记该文档是否有错误
+            for error_field in error_count:
+                error_data = document.get(error_field, None)
+                if error_data:
+                    # 提取错误原因并记录
+                    reasons = extract_error_reason(error_data)
+                    error_count[error_field] += 1
+                    error_reasons[error_field].update(reasons)  # 使用 update 以更新频次
+                    has_error = True  # 标记为有错误
+            # 如果文档有错误,计入错误数量
+            if has_error:
+                total_errors += 1
+            else:
+                total_corrects += 1  # 如果该文档没有错误,增加正确计数
+
+    # 计算错误率和正确率
+    error_rate = (total_errors / total_count) * 100 if total_count > 0 else 0
+    correct_rate = (total_corrects / total_count) * 100 if total_count > 0 else 0
+    error_rate = f"{error_rate: .2f} %"
+    correct_rate = f"{correct_rate: .2f} %"
+
+    # 初始化 reason_error_counts 字典
+    reason_error_counts = {}
+
+    # 获取每个错误类型对应的具体错误原因及其数量
+    for error_type, counter in error_reasons.items():
+        reason_error_counts[error_type] = {
+            reason: count for reason, count in counter.items()
+        }
+
+    # 为每个批次生成一个独立的 update_info
+    batch_update_info = {
+        "batch_id": create_time,  # 使用实际的 create_time 作为 batch_id
+        "update_info": {
+            **{f"{reason} 错误数量": count for error_type, reasons in reason_error_counts.items() for reason, count in reasons.items()},
+            "总数量": total_count,
+            "正确数量": total_corrects,
+            "错误数量": total_errors,
+            "正确率": correct_rate,
+            "错误率": error_rate
+        }
+    }
+
+    # 确保不同批次的更新被追加
+    bid_extract_collection.update_one(
+        {"spidercode": spidercode},  # 查找对应的 spidercode
+        {
+            "$push": {
+                "batch_updates": batch_update_info  # 直接追加新的批次数据
+            }
+        }
+    )
+
+    print(f"已更新 spidercode: {spidercode} 的批次信息,批次 ID: {create_time}")

+ 97 - 0
tools/爬虫相关/3、抽取表完善爬虫字段.py

@@ -0,0 +1,97 @@
+from pymongo import MongoClient
+from bson import ObjectId
+from datetime import datetime, timedelta
+
+# 获取当前时间并计算近一周的时间范围
+one_week_ago = datetime.now() - timedelta(weeks=1)
+one_week_ago_timestamp = int(one_week_ago.timestamp())
+
+# 统一 MongoDB 连接
+mongo_client = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)
+db_editor = mongo_client["editor"]
+collection_spider = db_editor["lua_logs_auditor"]
+collection_config = db_editor["luaconfig"]
+collection_site = db_editor["site"]
+
+# 连接 bid_extract 数据库
+db_data_quality = MongoClient("mongodb://172.20.45.129:27002/", unicode_decode_error_handler="ignore")["data_quality"]
+coll_user = db_data_quality["bid_extract"]
+
+# 定义 `_id` 的最小值
+max_id = ObjectId("000000000000000000000000")
+
+
+def update_batch_data_mongo(collection, spidercode, params):
+    """ 以 spidercode 作为唯一标识,更新 MongoDB 数据 """
+    if not spidercode:
+        print("无效的 spidercode,跳过更新")
+        return
+
+    documents = ["spider_modified_time", "new_spider", "spider_important", "site_important"]
+    update_data = {documents[indx]: param for indx, param in enumerate(params)}
+
+    try:
+        result = collection.update_one(
+            {"spidercode": spidercode},  # 以 spidercode 作为唯一标识
+            {"$set": update_data}  # 更新数据
+        )
+        if result.matched_count > 0:
+            print(f"已更新 spidercode: {spidercode}, 更新内容: {update_data}")
+        else:
+            print(f"未找到 spidercode: {spidercode}, 跳过更新")
+    except Exception as e:
+        print("更新数据时发生错误:", e)
+
+
+def get_newest_code(spidercode):
+    """ 获取某个 spidercode 在一周内的最新 modifytime """
+    if not spidercode:
+        return 946656000  # 默认值
+
+    pipeline = [
+        {"$match": {
+            "comeintime": {"$gte": one_week_ago_timestamp},
+            "types": "审核",
+            "platform": {"$exists": False},
+            "code": spidercode
+        }},
+        {"$sort": {"comeintime": -1}},  # 按时间降序
+        {"$limit": 1},  # 只取最新的一条
+        {"$project": {"modifytime": 1, "_id": 0}}  # 只返回 `modifytime`
+    ]
+
+    result = list(collection_spider.aggregate(pipeline))
+    return result[0]["modifytime"] if result else 946656000
+
+
+# 遍历 `bid_extract` 表
+for item in coll_user.find({"_id": {"$gte": max_id}}).sort("_id", 1):
+    print("------ 处理新数据 ------")
+
+    new_spider = True  # 是否近期修改爬虫
+    spider_important = False
+    site_important = 0
+
+    spidercode = item.get("spidercode", "")
+    site = item.get("site", "")
+
+    # 获取 `spidercode` 的最新 `modifytime`
+    spider_modified_time = get_newest_code(spidercode)
+
+    if spider_modified_time == 946656000:
+        new_spider = False
+
+    # 查询 `luaconfig` 获取 `spiderimportant`
+    info_config = collection_config.find_one({"code": spidercode}, {"spiderimportant": 1, "_id": 0})
+    if info_config:
+        spider_important = info_config.get("spiderimportant", False)
+
+    # 查询 `site` 获取 `important`
+    info_site = collection_site.find_one({"site": site}, {"important": 1, "_id": 0})
+    if info_site:
+        site_important = info_site.get("important", 0)
+
+    params = (spider_modified_time, new_spider, spider_important, site_important)
+    update_batch_data_mongo(coll_user, spidercode, params)
+
+    print("------ 处理完成 ------")

+ 70 - 0
tools/爬虫相关/4、bid_extract计算爬虫增长率并发邮件提醒.py

@@ -0,0 +1,70 @@
+from pymongo import MongoClient
+import smtplib
+from email.mime.text import MIMEText
+
+
+def send_email(subject, body, to_email):
+    sender_email = "your_email@example.com"
+    sender_password = "your_password"
+    smtp_server = "smtp.example.com"
+
+    msg = MIMEText(body, "plain", "utf-8")
+    msg["Subject"] = subject
+    msg["From"] = sender_email
+    msg["To"] = to_email
+
+    try:
+        server = smtplib.SMTP(smtp_server, 587)
+        server.starttls()
+        server.login(sender_email, sender_password)
+        server.sendmail(sender_email, [to_email], msg.as_string())
+        server.quit()
+        print("邮件发送成功")
+    except Exception as e:
+        print("邮件发送失败", e)
+
+
+def calculate_growth_rate(data):
+    batch_updates = data.get("batch_updates", [])
+
+    if len(batch_updates) < 2:
+        print("不足两个批次,无法计算增长率。")
+        return data
+
+    # 按 batch_id 进行排序
+    batch_updates.sort(key=lambda x: x["batch_id"])
+
+    for i in range(1, len(batch_updates)):
+        prev_total = batch_updates[i - 1]["update_info"].get("总数量", 0)
+        latest_total = batch_updates[i]["update_info"].get("总数量", 0)
+
+        if prev_total == 0:
+            growth_rate = "N/A"
+        else:
+            growth_rate_value = ((latest_total - prev_total) / prev_total) * 100
+            growth_rate = f"{growth_rate_value:.2f} %"
+
+            # 低于 20% 发送邮件提醒
+            if growth_rate_value < 20:
+                subject = "数据增长率低于 20% 提醒"
+                body = f"批次 {batch_updates[i]['batch_id']} 的增长率仅为 {growth_rate},请关注!"
+                send_email(subject, body, "recipient@example.com")
+
+        batch_updates[i]["update_info"]["增长率"] = growth_rate
+
+    return data
+
+
+# 连接到 MongoDB
+client = MongoClient('mongodb://172.20.45.129:27002/')  # 替换为你的 MongoDB 连接字符串
+db = client['data_quality']  # 选择数据库
+col = db['bid_extract']  # 假设 bid_extract 表名为 'bid_extract'
+
+# 查询一条数据
+for document in col.find().sort("_id", 1):
+    if document:
+        updated_document = calculate_growth_rate(document)
+        col.update_one({"_id": document["_id"]}, {"$set": {"batch_updates": updated_document["batch_updates"]}})
+        print("更新成功:", updated_document["batch_updates"][-1])
+    else:
+        print("未找到匹配的文档。")

+ 95 - 0
tools/爬虫相关/5、根据抽取表生成分析表格.py

@@ -0,0 +1,95 @@
+import pandas as pd
+from pymongo import MongoClient
+from datetime import datetime, timedelta
+
+# MongoDB连接配置
+mongodb_uri = "mongodb://172.20.45.129:27002/"  # MongoDB本地连接
+client = MongoClient(mongodb_uri)
+db = client["data_quality"]  # 替换为实际数据库
+collection = db["bid_extract"]  # 替换为实际表名
+
+# 从MongoDB中读取数据并转换为列表
+documents = list(collection.find({"spider_important": True}))  # 将游标转换为列表
+
+
+# 准备Excel数据
+basic_info = []
+batch_info_dict = {}  # 用来存储每个spidercode下所有批次的错误数据
+
+# 遍历MongoDB文档
+for doc in documents:
+
+    # 填充基本信息
+    basic_info.append({
+        'spidercode': doc['spidercode'],
+        '站点': doc['site'],
+        '栏目': doc['channel'],
+        # '增长率': doc['增长率'],
+        '站点级别': '重要' if doc['site_important'] == 1 else '一般',
+        '是否重点爬虫': '是' if doc['spider_important'] else '否',
+        '是否新增/修改爬虫': '是' if doc['new_spider'] else '否',
+    })
+    # 提取batch_updates数据
+    for batch in doc['batch_updates']:
+        batch_id = batch['batch_id'] # 获取batch_id
+        batch_info = batch['update_info']
+
+        if batch_id is None:
+            continue  # 如果batch_id为None,跳过此批次
+        # 更新batch_info_dict,按spidercode整理每个spidercode下的所有批次信息
+        if doc['spidercode'] not in batch_info_dict:
+            batch_info_dict[doc['spidercode']] = {}
+
+        batch_info_dict[doc['spidercode']][batch_id] = batch_info
+
+# 创建基本信息DataFrame
+df_basic = pd.DataFrame(basic_info)
+
+# 创建Excel文件
+with pd.ExcelWriter('爬虫数据动态.xlsx', engine='openpyxl') as writer:
+    # 第一页:基本信息
+    df_basic.to_excel(writer, sheet_name='基本信息', index=False)
+
+    # 后续每个spidercode的sheet
+    for spidercode, batches in batch_info_dict.items():
+        # 构建所有批次的数据
+        all_errors = []
+
+        # 错误原因列
+        error_row = ['错误原因']
+
+        # 批次列:这里重新创建 batch_columns 确保格式正确
+        batch_columns = [f'批次 ID {batch_id}' for batch_id in sorted(batches.keys())]  # 排序批次ID
+
+        # 初始化错误数据字典
+        error_data = {col: [] for col in error_row + batch_columns}
+
+        # 只取每个batch的所有字段,避免重复
+        dynamic_error_keys = []  # 用来存储错误原因字段的顺序
+
+        for batch_id in sorted(batches.keys()):
+            batch_info = batches[batch_id]
+            for key, value in batch_info.items():
+                if key not in dynamic_error_keys:  # 如果错误原因还未添加
+                    dynamic_error_keys.append(key)  # 添加错误原因
+
+        # 填充错误数据
+        for key in dynamic_error_keys:
+            error_data['错误原因'].append(key)
+            for batch_id in sorted(batches.keys()):
+                batch_info = batches[batch_id]
+                error_data[f'批次 ID {batch_id}'].append(batch_info.get(key, '-'))
+
+        # 将数据转换为DataFrame
+        all_errors_df = pd.DataFrame(error_data)
+
+        # 仅当 DataFrame 有数据时才创建 sheet
+        if not all_errors_df.empty:
+            # 获取spidercode并确保它不超过31个字符
+            sheet_name = spidercode[:31]  # 截取前31个字符
+
+            # 将数据写入每个spidercode的sheet
+            all_errors_df.to_excel(writer, sheet_name=sheet_name, index=False)
+
+# 输出提示
+print("Excel文件已生成:爬虫数据动态.xlsx")

+ 89 - 0
tools/爬虫相关/test.py

@@ -0,0 +1,89 @@
+import now as now
+from pymongo import MongoClient
+from bson import ObjectId
+
+from datetime import datetime, timedelta
+
+# 获取当前时间并计算近一周的时间范围
+one_week_ago = datetime.now() - timedelta(weeks=1)
+one_week_ago_timestamp = int(one_week_ago.timestamp())
+current_datetime = int(datetime.now().timestamp())
+print(one_week_ago_timestamp)
+
+# 初始化爬虫代码库
+collection_spider = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["lua_logs_auditor"]
+# 初始化爬虫config代码库
+collection_config = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["luaconfig"]
+# 初始化site代码库
+collection_site = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["site"]
+# 初始化抽取-bid_extract表
+
+db = MongoClient('172.20.45.129', 27002, unicode_decode_error_handler="ignore").data_quality
+coll_user = db["bid_extract"]
+max_id = ObjectId("0" * 24)
+
+def insert_batch_data_mongo(collection,params):
+    """
+    执行批量插入数据到 MongoDB
+    """
+    # 将参数转换为字典列表
+    documents=["spider_modified_time","spider_modified_current","spider_important","site_important"]
+    doc={}
+    for indx,param in enumerate(params):
+        doc[documents[indx]] =param
+    print(f"{doc}数据")
+    # 插入数据
+    try:
+        collection.insert_one(doc)  # `ordered=False` 忽略重复数据错误
+        print(f"{doc}数据已成功插入到 MongoDB")
+    except Exception as e:
+        print("插入数据时发生错误:", e)
+
+def get_newest_code(spidercode):
+    pipeline = [
+        # 1. 筛选符合条件的数据
+        {"$match": {
+            "comeintime": {"$gte": one_week_ago_timestamp},
+            "types": "审核",
+            "platform": {"$exists": False},
+            "code": spidercode
+        }},
+        # 2. 按 `comeintime` 降序排序,确保最新的数据在前
+        {"$sort": {"comeintime": -1}},
+        # 3. 以 `code` 去重,每个 `code` 仅保留最新的 `comeintime` 数据
+        {"$group": {
+            "_id": "$code",  # 按 `code` 分组
+            "modifytime": {"$first": "$modifytime"}  # 只保留最新文档的 `modifytime`
+        }}
+    ]
+
+    info = list(collection_spider.aggregate(pipeline))
+    if info:
+        spider_modified_time = info[0].get("modifytime")
+    else:
+        spider_modified_time = int(946656000)
+    return spider_modified_time
+
+
+for item in coll_user.find({"_id": {"$gte": max_id}}).sort("_id", 1):
+    print("------一条数据开始------")
+    spider_modified_current = True # 是否近期修改爬虫,默认为否
+    spider_important = False
+    site_important = 0
+    spidercode = item.get("spidercode","")
+    site = item.get("site","")
+
+    spider_modified_time = get_newest_code(spidercode)
+
+    if spider_modified_time == 946656000 :
+        spider_modified_current = False
+
+    info_config = collection_config.find_one({"code": spidercode})
+    if info_config:
+        spider_important = info_config.get("spiderimportant", "")
+    info_site = collection_site.find_one({"site": site})
+    if info_site:
+        site_important = info_site.get("important", "")
+    params = (spider_modified_time,spider_modified_current, spider_important, site_important)
+    insert_batch_data_mongo(coll_user, params)
+    print("------一条数据结束------")

BIN
tools/爬虫相关/爬虫数据动态.xlsx


BIN
tools/爬虫相关/爬虫数据动态1.xlsx