liumiaomiao 5 dienas atpakaļ
vecāks
revīzija
53059b1b4d
2 mainītis faili ar 69 papildinājumiem un 11 dzēšanām
  1. 10 4
      app.py
  2. 59 7
      client_mongo_mysql_liantong.py

+ 10 - 4
app.py

@@ -171,15 +171,19 @@ def check(obj: any, rules) -> any:
             # 开始执行函数
             if func(obj, catch_content):
                 qa["0000"] = f"{field}:不存在!!"
-                field_qa["%s_qa" % field] = qa
-            continue
+                # # 只有当qa不为空时才添加到field_qa
+                if qa:
+                    field_qa["%s_qa" % field] = qa
+                continue
         # 字段存在检查,判断字段值的类型是否正确,判断类型是否正确如值为null的,并把英文括号转换为中文括号
         if field in obj and field in fieldtype_checker.errors_tables:
             func_type = fieldtype_checker.errors_tables[field]
             value = func_type(obj.get(field))
             if value is True:
                 qa["0001"] = f"{field}:类型不正确、空值"
-                field_qa["%s_qa" % field] = qa
+                # # 只有当qa不为空时才添加到field_qa
+                if qa:
+                    field_qa["%s_qa" % field] = qa
                 continue
         if field not in check_chain:
             continue
@@ -204,7 +208,9 @@ def check(obj: any, rules) -> any:
             else:
                 # 参数不满足要求
                 qa[err] = f"{field}:必须参数(字段)缺失"
-        field_qa["%s_qa" % field] = qa
+        # 只有当qa不为空时才添加到field_qa
+        if qa:
+            field_qa["%s_qa" % field] = qa
     score=bid_score(field_qa,obj)
     field_qa["score"]=score
     return field_qa

+ 59 - 7
client_mongo_mysql_liantong.py

@@ -14,10 +14,10 @@ ReluClient = MongoDBInterface(ReluMongodb)
 
 # 评估服务配置
 a2s_ip = "172.20.100.235:9090"
-# topic = "quality_bid"
+topic = "quality_bid"
 #本地测试用的主题
-topic = "test_quality_bid"
-timeout = 180
+# topic = "test_quality_bid"
+timeout = 300
 
 
 # 开始评估
@@ -62,8 +62,10 @@ def insert_batch_data(conn, params):
     """
     执行批量插入数据
     """
-    query = """INSERT IGNORE INTO bid_analysis_liantong (mongoid,toptype,subtype, site, spidercode, channel,comeintime, area, city, district, score, error_type, create_time) 
-               VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s,%s)"""
+    query = """INSERT IGNORE INTO bid_analysis_liantong (mongoid,toptype,subtype, site, spidercode, channel,comeintime, area, city, district, score, error_type, create_time,
+                agency,agencyperson,agencytel,bidamount,bidendtime,bidopentime,bidstarttime,bidway,budget,buyer,buyerperson,buyertel,com_package,docendtime,
+                docstarttime,est_purchase_time,href,projectcode,projectname,publishtime,s_winner,title,winnerorder,winnerperson,winnertel) 
+               VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
     MysqlUtil.insert_data(conn, query, params)
 
 def insert_dynamic_error_field(conn, cleaned_key, error_ids, mongoid):
@@ -93,6 +95,14 @@ def has_non_empty_qa(data):
             return True
     return False
 
+def parse_timestamp(timestamp):
+    if not timestamp:
+        return None
+    try:
+        return datetime.fromtimestamp(int(timestamp))
+    except (ValueError, TypeError):
+        return None
+
 def batch_load_data():
     """
     批量数据质量检查
@@ -113,7 +123,7 @@ def batch_load_data():
     conn = MysqlUtil.connect_to_mysql(host='172.20.45.129', port='4000', user='root', password='=PDT49#80Z!RVv52_z',database='quality')
 
     max_id = ObjectId("0" * 24)
-    # max_id = ObjectId("68864d00f0c6ad8b095e65f9")
+    # max_id = ObjectId("688363ebf0c6ad8b095e2245")
     # 查询条件:_id >= max_id, appid匹配,且 createtime 在 [start_date, end_date] 之间
     query = {
         "_id": {"$gte": max_id},
@@ -124,6 +134,7 @@ def batch_load_data():
 
     while True:
         client = MongoClient('mongodb://127.0.0.1:27087/', unicode_decode_error_handler="ignore",directConnection=True).jyqyfw  # 修改为你的连接地址
+        # client = MongoClient('mongodb://172.20.17.61:27080/', unicode_decode_error_handler="ignore",directConnection=True).jyqyfw  # 正式环境
         coll_user = client["usermail"]
         try:
             for item in coll_user.find(query).sort("_id", 1):
@@ -153,16 +164,57 @@ def batch_load_data():
                     area = item.get("area", "")
                     city = item.get("city", "")
                     district = item.get("district", "")
+                    #---
+                    agency = item.get("agency", "")
+                    agencyperson = item.get("agencyperson", "")
+                    agencytel = item.get("agencytel", "")
+                    bidamount = item.get("bidamount", "")
+                    bidendtime = item.get("bidendtime", "")
+                    bidendtime = parse_timestamp(bidendtime)
+                    bidopentime = item.get("bidopentime", "")
+                    bidopentime = parse_timestamp(bidopentime)
+                    bidstarttime = item.get("bidstarttime", "")
+                    bidstarttime = parse_timestamp(bidstarttime)
+                    bidway = item.get("bidway", "")
+                    budget = item.get("budget", "")
+                    buyer = item.get("buyer", "")
+                    buyerperson = item.get("buyerperson", "")
+                    buyertel = item.get("buyertel", "")
+                    com_package = item.get("com_package", "")  #json串
+                    com_package =json.dumps(com_package)
+                    docendtime = item.get("docendtime", "")
+                    docendtime = parse_timestamp(docendtime)
+                    docstarttime = item.get("docstarttime", "")
+                    docstarttime = parse_timestamp(docstarttime)
+                    est_purchase_time = item.get("est_purchase_time", "")
+                    est_purchase_time = parse_timestamp(est_purchase_time)
+                    href = item.get("href", "")
+                    projectcode = item.get("projectcode", "")
+                    projectname = item.get("projectname", "")
+                    publishtime = item.get("publishtime", "")
+                    publishtime = parse_timestamp(publishtime)
+                    s_winner = item.get("s_winner", "")
+                    title = item.get("title", "")
+                    winnerorder = item.get("winnerorder", "")#json串
+                    winnerorder = json.dumps(winnerorder)
+                    winnerperson = item.get("winnerperson", "")
+                    winnertel = item.get("winnertel", "")
+
+                    #---
                     score = data.get("score", "")
                     error_type_data = json.dumps(data)
                     create_time = today_date
 
-                    params = (item["_id"], toptype, subtype, site, spidercode,channel, comeintime, area, city, district, score, error_type_data,create_time)
+                    params = (item["_id"], toptype, subtype, site, spidercode,channel, comeintime, area, city, district, score, error_type_data,create_time,
+                              agency,agencyperson,agencytel,bidamount,bidendtime,bidopentime,bidstarttime,bidway,budget,buyer,buyerperson,buyertel,com_package,docendtime,
+                              docstarttime,est_purchase_time,href,projectcode,projectname,publishtime,s_winner,title,winnerorder,winnerperson,winnertel)
                     insert_batch_data(conn, params)
             print("---- 数据处理完成 ----")
             break
         except Exception as e:
             print(f"错误: {e}")
+            import traceback
+            traceback.print_exc()  # 打印完整堆栈信息
             time.sleep(10)