liumiaomiao 1 周之前
父节点
当前提交
3f61c0bfb2
共有 4 个文件被更改,包括 219 次插入10 次删除
  1. 11 7
      app.py
  2. 170 0
      client_mongo_mysql_liantong.py
  3. 30 2
      tables/fields/NoField.py
  4. 8 1
      tables/fields/budget.py

+ 11 - 7
app.py

@@ -557,15 +557,19 @@ if __name__ == '__main__':
     # #     }
     # })
     result = check(row, rules={
-        "detail": {
+        "entname": {
             },
-        "city": {
-            "0302": {
-                "name": "城市不在[3,11]个字之间",
-                "parent_name": "长度异常类型",
-                "parent_code": "03",
-            }
+        "price": {
         },
+        "sort": {
+        },
+        # "city": {
+        #     "0302": {
+        #         "name": "城市不在[3,11]个字之间",
+        #         "parent_name": "长度异常类型",
+        #         "parent_code": "03",
+        #     }
+        # },
         # "publishtime": {
         #             "0201": {
         #                 "name": "发布时间 > 开标时间  ",

+ 170 - 0
client_mongo_mysql_liantong.py

@@ -0,0 +1,170 @@
+# coding:utf-8
+import time
+from a2s.tools import json_serialize, json_deserialize
+from a2s.a2s_client import a2s_execute
+from docs.config import ReluMongodb
+from util.mogodb_helper import MongoDBInterface
+from pymongo import MongoClient
+from util.mysql_tool import MysqlUtil
+import json
+from datetime import datetime, timedelta
+from bson import ObjectId
+
+ReluClient = MongoDBInterface(ReluMongodb)
+
+# 评估服务配置
+a2s_ip = "172.20.100.235:9090"
+# topic = "quality_bid"
+#本地测试用的主题
+topic = "test_quality_bid"
+timeout = 180
+
+
+# 开始评估
+def start_quality(data: dict, rules_id: int, a2s_ip, topic, timeout, retry=3):
+    # 本次不使用SSL,所以channel是不安全的
+    row = {"data": data, "rules_id": rules_id}
+    bytes_data = json_serialize(row)
+    for t in range(retry):
+        print("topic",topic)
+        try:
+            resp_data = a2s_execute(a2s_ip, topic, timeout, bytes_data)
+            if resp_data is None:
+                continue
+            result = json_deserialize(resp_data)
+            return result
+        except Exception as e:
+            print(e)
+    return {}
+
+# 获取规则ID
+def get_rule(company, version):
+    rule_id = ReluClient.find_rule_by_company(ReluMongodb["col"], company, version)
+    return rule_id
+
+def find_error_id(conn, cleaned_key, sub_value):
+    """
+    查找 error_dict 中的 id
+    """
+    query = """SELECT id FROM error_dict WHERE fields = %s AND error = %s"""
+    params = (cleaned_key, sub_value)
+    result = MysqlUtil.query_data(conn, query, params)
+    #[(10,)]
+    # 检查查询结果是否为空
+    if not result:
+        print(f"Error: No matching record found for fields={cleaned_key}, error={sub_value}")
+        return None  # 或者返回一个默认值,根据需求而定
+
+    record = result[0][0]
+    return record
+
+def insert_batch_data(conn, params):
+    """
+    执行批量插入数据
+    """
+    query = """INSERT IGNORE INTO bid_analysis_liantong (mongoid,toptype,subtype, site, spidercode, channel,comeintime, area, city, district, score, error_type, create_time) 
+               VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s,%s)"""
+    MysqlUtil.insert_data(conn, query, params)
+
+def insert_dynamic_error_field(conn, cleaned_key, error_ids, mongoid):
+    """
+    动态插入 error_ids 到相应的 cleaned_key_error 字段
+    """
+    # 构造动态插入 SQL 语句,更新指定的 cleaned_key_error 字段
+    query = f"""
+        UPDATE bid_analysis_liantong 
+        SET {cleaned_key}_error = %s 
+        WHERE mongoid = %s
+    """
+    # 拼接多个 error_id,用分隔符分开
+    error_ids_str = ','.join(map(str, error_ids))
+    params = (error_ids_str, mongoid )
+
+    MysqlUtil.update_data(conn, query, params)
+
+def has_non_empty_qa(data):
+    # 获取data字典
+    data_dict = data.get('data', {})
+
+    # 遍历所有键值对
+    for key, value in data_dict.items():
+        # 检查键以'_qa'结尾且值不为空
+        if key.endswith('_qa') and value:  # value不为None、空字典、空列表等
+            return True
+    return False
+
+def batch_load_data():
+    """
+    批量数据质量检查
+    """
+    # 获取今天的日期(字符串格式)
+    today_date = datetime.now().strftime("%Y-%m-%d")
+    yesterday_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
+    # 获取昨天 00:00:00 的时间戳
+    start_date = int(datetime.strptime(f"{yesterday_date} 00:00:00", "%Y-%m-%d %H:%M:%S").timestamp())
+    # print("start_date",start_date)
+    # 获取今天 00:00:00 的时间戳
+    end_date = int(datetime.strptime(f"{today_date} 00:00:00", "%Y-%m-%d %H:%M:%S").timestamp())
+    # print("end_date", end_date)
+    # 规则查询,根据必要条件 公司名称(用户ID)、版本号
+    rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.2")
+    print(rules_id)
+    # 初始化mysql
+    conn = MysqlUtil.connect_to_mysql(host='172.20.45.129', port='4000', user='root', password='=PDT49#80Z!RVv52_z',database='quality')
+
+    max_id = ObjectId("0" * 24)
+    # max_id = ObjectId("655ec5609aed6eb2ffa654ca")
+    # 查询条件:_id >= max_id, appid匹配,且 createtime 在 [start_date, end_date] 之间
+    query = {
+        "_id": {"$gte": max_id},
+        "appid": "jyGQ1XQQsEAwNeSENOFR9D",
+        "createtime": {"$gte": start_date, "$lte": end_date}
+    }
+
+    while True:
+        client = MongoClient('mongodb://127.0.0.1:27087/', unicode_decode_error_handler="ignore",directConnection=True).jyqyfw  # 修改为你的连接地址
+        coll_user = client["usermail"]
+        try:
+            for item in coll_user.find(query).sort("_id", 1):
+                print("------数据处理开始--------")
+                max_id = item["_id"]
+                item["_id"] = str(item["_id"])
+                print(f"正在处理数据: {max_id}")
+                # 质量检查逻辑
+                result = start_quality(item, rules_id, a2s_ip, topic, timeout)
+                print(result)
+                code = result.get("code")
+                if code != 200:
+                    # 数据出错,跳过
+                    continue
+                #只将有错误的数据存库
+                if has_non_empty_qa(result):
+                    data = result.get("data", {})
+
+                    # 数据插入到 MySQL
+                    toptype = item.get("toptype", "")
+                    subtype = item.get("subtype", "")
+                    site = item.get("site", "")
+                    spidercode = item.get("spidercode", "")
+                    channel = item.get("channel", "")
+                    comeintime = item.get("comeintime", "")
+                    comeintime = datetime.fromtimestamp(comeintime)
+                    area = item.get("area", "")
+                    city = item.get("city", "")
+                    district = item.get("district", "")
+                    score = item.get("score", "")
+                    error_type_data = json.dumps(data)
+                    create_time = today_date
+
+                    params = (item["_id"], toptype, subtype, site, spidercode,channel, comeintime, area, city, district, score, error_type_data,create_time)
+                    insert_batch_data(conn, params)
+            print("---- 数据处理完成 ----")
+            break
+        except Exception as e:
+            print(f"错误: {e}")
+            time.sleep(10)
+
+
+if __name__ == '__main__':
+    batch_load_data()
+

+ 30 - 2
tables/fields/NoField.py

@@ -32,7 +32,24 @@ class NoFieldChecker(object):
             "purchasinglist":self.check_purchasinglist,
             "detail":self.check_detail,
             "href":self.check_href,
-            "est_purchase_time":self.check_est_purchase_time
+            "est_purchase_time":self.check_est_purchase_time,
+            "docstarttime":self.check_docstarttime,
+            "docendtime":self.check_docendtime,
+            "bidstarttime":self.check_bidstarttime,
+            "bidendtime":self.check_bidendtime,
+            "bidopentime":self.check_bidopentime,
+            "bidway":self.check_bidway,
+            "buyerperson":self.check_buyerperson,
+            "buyertel":self.check_buyertel,
+            "agency":self.check_agency,
+            "agencyperson":self.check_agencyperson,
+            "agencytel":self.check_agencytel,
+            "winnerperson":self.check_winnerperson,
+            "winnertel":self.check_winnertel,
+            "entname":self.check_entname,
+            "sortstr":self.check_sort,
+            "price":self.check_price,
+            "winnerorder":self.check_winnerorder
         }
 
     def check_bidamount(self,obj,catch_content: CatchContentObject) -> bool:
@@ -279,7 +296,7 @@ class NoFieldChecker(object):
         :param obj:代表一个item
         :return:返回true 代表异常
         """
-        if obj.get("toptype") != "结果" and obj.get("toptype") != "预告":
+        if obj.get("toptype") == "招标":
             if not obj.get("bidopentime"):
                 return True
             return False
@@ -436,4 +453,15 @@ class NoFieldChecker(object):
                     return False  # 只要有一个entname存在,就返回False
 
             return True  # 所有条目都没有entname,返回True
+        return False
+    def check_winnerorder(self, obj, catch_content: CatchContentObject) -> bool:
+        """
+        候选人信息为空检测
+        :param obj:代表一个item
+        :return:返回true 代表异常
+        """
+        subtype = obj.get("subtype", "")
+        if subtype in ["中标", "成交"]:
+            if "winnerorder" not in obj:
+                return True  # 如果没有winnerorder字段,视为无
         return False

+ 8 - 1
tables/fields/budget.py

@@ -92,4 +92,11 @@ class BudgetChecker(object):
         """
         if  budget < 0:
             return True
-        return False
+        return False
+
+    @staticmethod
+    def check0401(budget: float, bidamount: float) -> bool:
+        """
+        预算金额 < 中标金额,视为异常
+        :return: 返回true 代表异常
+        """