liumiaomiao 6 月之前
父節點
當前提交
a5a1b44932
共有 2 個文件被更改,包括 336 次插入0 次删除
  1. 266 0
      client_spider_mongo.py
  2. 70 0
      util/spider_statistics.py

+ 266 - 0
client_spider_mongo.py

@@ -0,0 +1,266 @@
+# coding:utf-8
+import time
+from a2s.tools import json_serialize, json_deserialize
+from a2s.a2s_client import a2s_execute
+from docs.config import ReluMongodb
+from util.mogodb_helper import MongoDBInterface
+from pymongo import MongoClient
+from util.mysql_tool import MysqlUtil
+import json
+from datetime import datetime, timedelta
+from elasticsearch import Elasticsearch
+
+
+ReluClient = MongoDBInterface(ReluMongodb)
+
+# 评估服务配置
+a2s_ip = "192.168.3.240:9090"
+# a2s_ip = "172.17.0.11:9090"
+topic = "quality_bid"
+#本地测试用的主题
+# topic = "test_quality_bid"
+timeout = 180
+
+# 获取当前时间
+now = datetime.now()
+current_datetime = now.strftime("%Y-%m-%d %H:%M:%S")
+# 获取今天的日期
+today = datetime.today()
+# 获取昨天的日期
+yesterday = today - timedelta(days=1)
+# 获取昨天0点的时间
+yesterday_midnight = datetime(yesterday.year, yesterday.month, yesterday.day)
+# 获取今天0点的时间
+today_midnight = datetime(today.year, today.month, today.day)
+# 转换为Unix时间戳
+start_date = int(yesterday_midnight.timestamp())
+end_date = int(today_midnight.timestamp())
+
+# ES 连接配置
+es_host = "http://127.0.0.1:19800"
+es_username = "jianyuGr"
+es_password = "we3g8glKfe#"
+
+# 初始化 Elasticsearch 客户端
+es_client = Elasticsearch(es_host,http_auth=(es_username, es_password),retry_on_timeout=True) # 使用基本认证
+
+# 开始评估
+def start_quality(data: dict, rules_id: int, a2s_ip, topic, timeout, retry=3):
+    # 本次不使用SSL,所以channel是不安全的
+    row = {"data": data, "rules_id": rules_id}
+    bytes_data = json_serialize(row)
+    for t in range(retry):
+        print("topic",topic)
+        try:
+            resp_data = a2s_execute(a2s_ip, topic, timeout, bytes_data)
+            if resp_data is None:
+                continue
+            result = json_deserialize(resp_data)
+            return result
+        except Exception as e:
+            print(e)
+    return {}
+
+# 获取规则ID
+def get_rule(company, version):
+    rule_id = ReluClient.find_rule_by_company(ReluMongodb["col"], company, version)
+    return rule_id
+
+def find_error_id(conn, cleaned_key, sub_value):
+    """
+    查找 error_dict 中的 id
+    """
+    query = """SELECT id FROM error_dict WHERE fields = %s AND error = %s"""
+    params = (cleaned_key, sub_value)
+    result = MysqlUtil.query_data(conn, query, params)
+    #[(10,)]
+    # 检查查询结果是否为空
+    if not result:
+        print(f"Error: No matching record found for fields={cleaned_key}, error={sub_value}")
+        return None  # 或者返回一个默认值,根据需求而定
+
+    record = result[0][0]
+    return record
+
+def insert_batch_data(conn, params):
+    """
+    执行批量插入数据
+    """
+    query = """INSERT IGNORE INTO bid_analysis (mongoid,toptype,subtype, site, spidercode, channel,comeintime, area, city, district, score, error_type, spider_modified_time) 
+               VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s,%s)"""
+    MysqlUtil.insert_data(conn, query, params)
+
+def insert_dynamic_error_field(conn, cleaned_key, error_ids, mongoid):
+    """
+    动态插入 error_ids 到相应的 cleaned_key_error 字段
+    """
+    # 构造动态插入 SQL 语句,更新指定的 cleaned_key_error 字段
+    query = f"""
+        UPDATE bid_analysis 
+        SET {cleaned_key}_error = %s 
+        WHERE mongoid = %s
+    """
+    # 拼接多个 error_id,用分隔符分开
+    error_ids_str = ','.join(map(str, error_ids))
+    params = (error_ids_str, mongoid )
+
+    MysqlUtil.update_data(conn, query, params)
+
+def get_last_processed_id():
+    """
+    获取上次处理的最大 ID (例如从数据库或文件中读取)
+    """
+    # 这里假设从文件读取中断 ID,你也可以从数据库或 Redis 等存储获取
+    try:
+        with open('docs/last_processed_id.txt', 'r') as f:
+            last_id = f.read().strip()
+            if last_id:
+                return last_id
+            else:
+                return None
+    except FileNotFoundError:
+        return None
+
+def save_last_processed_id(last_id):
+    """
+    保存当前处理的最大 ID,用于恢复
+    """
+    with open('docs/last_processed_id.txt', 'w') as f:
+        f.write(str(last_id))
+
+def batch_load_data():
+    """
+    批量数据质量检查
+    """
+    # 规则查询,根据必要条件 公司名称(用户ID)、版本号
+    rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.2")
+    print(rules_id)
+
+    # 初始化mysql
+    # conn = MysqlUtil.connect_to_mysql(host='192.168.3.14', port='4000', user='DataScBi', password='DT#Sc20221123Ht',database='quality')
+    conn = MysqlUtil.connect_to_mysql(host='192.168.3.217', port='4000', user='root', password='=PDT49#80Z!RVv52_z',database='quality')
+    # 初始化爬虫代码库
+    collection = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["lua_logs_auditor"]
+
+    # 获取上次处理的 ID,如果没有,则从头开始
+    last_processed_id = get_last_processed_id()
+    print(f"上次处理的 ID: {last_processed_id}")
+
+    # 获取ES数据
+    es_query = {
+        "query": {
+            "bool": {
+                "filter": [
+                    {
+                        "range": {
+                            "comeintime": {
+                                "gte": start_date,
+                                "lt": end_date
+                            }
+                        }
+                    }
+                ]
+            }
+        },
+        "sort": [
+            {"_id": {"order": "asc"}}  # 如果 comeintime 相同,再按 _id 排序
+        ],
+        "size": 100  # 每次返回的数据量
+    }
+
+    # 如果有上次处理的 ID,使用 `search_after` 进行分页
+    if last_processed_id:
+        es_query["search_after"] = [last_processed_id]  # 确保传入的是字符串类型的 _id
+
+    try:
+        # 使用 scroll API 来分批获取数据
+        response = es_client.search(index="bidding", body=es_query, size=100)
+        hits = response['hits']['hits']
+
+        while hits:
+            print(f"---- 批次开始 ----")
+            max_id = None
+            for hit in hits:
+                item = hit["_source"]
+                print("------一条数据开始--------")
+                max_id = hit["_id"]
+                print(f"正在处理数据: {max_id}")
+                item["_id"] = str(hit["_id"])
+
+                # 质量检查逻辑
+                result = start_quality(item, rules_id, a2s_ip, topic, timeout)
+                print(result)
+
+                code = result.get("code")
+                if code != 200:
+                    # 数据出错,跳过
+                    continue
+
+                data = result.get("data", {})
+
+                # 数据插入到 MySQL
+                toptype = item.get("toptype", "")
+                subtype = item.get("subtype", "")
+                site = item.get("site", "")
+                spidercode = item.get("spidercode", "")
+                channel = item.get("channel", "")
+                comeintime = item.get("comeintime", "")
+                comeintime = datetime.fromtimestamp(comeintime)
+                area = item.get("area", "")
+                city = item.get("city", "")
+                district = item.get("district", "")
+                score = item.get("score", "")
+                error_type_data = json.dumps(data)
+                spider_modified_time = current_datetime
+
+                info = collection.find_one({"code": spidercode})
+                if info:
+                    spider_modified_time = info.get("modifytime", "")
+                    spider_modified_time = datetime.fromtimestamp(spider_modified_time)
+
+                params = (item["_id"], toptype, subtype, site, spidercode,channel, comeintime, area, city, district, score, error_type_data,spider_modified_time)
+                insert_batch_data(conn, params)
+
+                # 遍历错误原因字典并提取非空字典中的值
+                for key, value in data.items():
+                    error_ids = []
+                    if isinstance(value, dict) and value:
+                        cleaned_key = key[:-3] if key.endswith('_qa') else key  # 去除 '_qa' 后缀
+                        for sub_key, sub_value in value.items():
+                            error_id = find_error_id(conn, cleaned_key, sub_value)
+                            if error_id:
+                                error_ids.append(error_id)
+                            print(f"  {sub_key}: {sub_value}")
+
+                        # 插入错误ID到cleaned_key_error字段
+                        insert_dynamic_error_field(conn, cleaned_key, error_ids, item["_id"])
+                print("------一条数据结束------")
+            # 保存当前批次处理的最大 ID
+            if max_id:
+                save_last_processed_id(max_id)
+                print(f"保存当前处理的最大 ID: {max_id}")
+            # 批次结束的打印信息
+            print("---- 当前批次数据处理完成 ----")
+
+            # 获取下一批数据
+            search_after = hits[-1]["_id"]  # 获取当前批次最后一条数据的 _id 作为下一批的起始点
+            es_query["search_after"] = [search_after]  # 保持 _id 类型一致
+            response = es_client.search(index="bidding", body=es_query, size="100")
+            hits = response['hits']['hits']
+
+            # 如果没有更多数据,跳出循环
+            if not hits:
+                print("没有更多数据,结束批次处理")
+                break
+        print("数据处理完成")
+    except Exception as e:
+        print(f"错误: {e}")
+        time.sleep(10)
+    finally:
+        if conn.is_connected():
+            conn.close()  # 确保连接关闭
+            print("MySQL 连接已关闭")
+
+if __name__ == '__main__':
+    batch_load_data()
+

+ 70 - 0
util/spider_statistics.py

@@ -0,0 +1,70 @@
+from util.mysql_tool import MysqlUtil
+import pandas as pd
+from datetime import datetime,timedelta
+
+# 连接到MySQL数据库
+conn = MysqlUtil.connect_to_mysql(host='192.168.3.217', port='4000', user='root', password='=PDT49#80Z!RVv52_z',database='quality')
+cursor = conn.cursor()
+
+# 获取当天的日期,作为batch字段
+today_date = datetime.now().strftime('%Y-%m-%d')
+
+# 获取当前时间
+now = datetime.now()
+current_datetime = now.strftime("%Y-%m-%d %H:%M:%S")
+# 获取今天的日期
+today = datetime.today()
+# 获取昨天的日期
+yesterday = today - timedelta(days=1)
+# 获取昨天0点的时间
+yesterday_midnight = datetime(yesterday.year, yesterday.month, yesterday.day)
+# 获取今天0点的时间
+today_midnight = datetime(today.year, today.month, today.day)
+# 转换为Unix时间戳
+start_date = int(yesterday_midnight.timestamp())
+end_date = int(today_midnight.timestamp())
+
+# 查询去重后的site, channel和spidercode
+query = """
+SELECT DISTINCT site, channel, spidercode
+FROM bid_analysis where comeintime< end_date ;
+"""
+
+cursor.execute(query)
+
+# 获取查询结果
+data = cursor.fetchall()
+
+# 构建DataFrame,用于处理和插入
+statistics_data = []
+for row in data:
+    site, channel, spidercode = row
+    statistics_data.append({
+        'batch': today_date,
+        'site': site,
+        'channel': channel,
+        'spidercode': spidercode,
+        'created_at': today_date  # 使用当天日期作为创建时间
+    })
+
+# 将数据转换为pandas DataFrame
+df = pd.DataFrame(statistics_data)
+
+# 准备插入statistics表的SQL语句
+insert_query = """
+INSERT INTO statistics (batch, site, channel, spidercode, created_at)
+VALUES (%s, %s, %s, %s, %s);
+"""
+
+# 将DataFrame的数据插入到数据库
+for index, row in df.iterrows():
+    cursor.execute(insert_query, (row['batch'], row['site'], row['channel'], row['spidercode'], row['created_at']))
+
+# 提交事务
+conn.commit()
+
+# 关闭数据库连接
+cursor.close()
+conn.close()
+
+print(f"成功将 {len(df)} 条记录插入到statistics表中.")