# coding:utf-8 import time from a2s.tools import json_serialize, json_deserialize from a2s.a2s_client import a2s_execute from docs.config import ReluMongodb from util.mogodb_helper import MongoDBInterface from pymongo import MongoClient from bson import ObjectId ReluClient = MongoDBInterface(ReluMongodb) # 评估服务配置 a2s_ip = "172.20.100.235:9090" # a2s_ip = "47.94.18.75:9090" topic = "quality_bid" #本地测试用的主题 # topic = "test_quality_bid" timeout = 180 # 开始评估 def start_quality(data: dict, rules_id: int, a2s_ip, topic, timeout, retry=3): # 本次不使用SSL,所以channel是不安全的 row = {"data": data, "rules_id": rules_id} bytes_data = json_serialize(row) for t in range(retry): try: resp_data = a2s_execute(a2s_ip, topic, timeout, bytes_data) if resp_data is None: continue result = json_deserialize(resp_data) return result except Exception as e: print(e) return {} # 获取规则ID def get_rule(company, version): rule_id = ReluClient.find_rule_by_company(ReluMongodb["col"], company, version) return rule_id def batch_load_purchasinglist(): """ 批量数据质量检查 """ # 规则查询,根据必要条件 公司名称(用户ID)、版本号 rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.1") print(rules_id) max_id = ObjectId("0" * 24) # max_id = ObjectId("5f8e5e1650cded0641ae3f7e") while True: # db = MongoClient('192.168.3.71', 29099, unicode_decode_error_handler="ignore").re4art # coll_user = db["zc_classify_test"] db = MongoClient('192.168.3.149', 27180, unicode_decode_error_handler="ignore").data_quality coll_user = db["bidding_20241205_ai"] num=0 try: for item in coll_user.find({"_id":{"$gte":max_id}}).sort("_id",1): # for item in coll_user.find({"_id":ObjectId("60f84adf1a75b8f4460ead47")}): # for item in coll_user.find().sort("_id",1): max_id = item["_id"] print(max_id) purchasinglist =item.get("purchasinglist") purchasingsource =item.get("purchasingsource") #不存在标的物 if not (purchasinglist and purchasingsource): continue item["_id"] = str(item["_id"]) # item['detail']=item['details'].replace("\\\"", "\"") result = start_quality(item, rules_id, a2s_ip, topic, timeout) print(result) # 示例:result:{'code': 200, 'msg': '成功', 'data': {'title_qa': {'0201': '非汉字占比>55%'}}} code=result.get("code") if code!=200: #数据出错 coll_user.update_one({"_id": max_id}, {"$set": {"flag": 1}}) continue data=result.get("data",{}) # coll_user.update_one({"_id":max_id},{"$set":{"data":data}}) coll_user.update_one({"_id":max_id},{"$set":data}) break except Exception as e: print(e) time.sleep(10) continue def batch_load_data(): """ 批量数据质量检查 """ # 规则查询,根据必要条件 公司名称(用户ID)、版本号 rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.2") print(rules_id) max_id = ObjectId("0" * 24) # max_id = ObjectId("655ec5609aed6eb2ffa654ca") while True: # db = MongoClient('192.168.3.206', 27080, unicode_decode_error_handler="ignore").data_quality # coll_user = db["bidding_20231221"] db = MongoClient('172.20.45.129', 27002, unicode_decode_error_handler="ignore").data_quality coll_user = db["bidding_20241219"] try: for item in coll_user.find({"_id":{"$gte":max_id}}).sort("_id",1): # for item in coll_user.find({"_id":ObjectId("65838f83185812a17f85760f")}): # for item in coll_user.find().sort("_id",1): max_id = item["_id"] print(max_id) item["_id"] = str(item["_id"]) # item['detail']=item['details'].replace("\\\"", "\"") result = start_quality(item, rules_id, a2s_ip, topic, timeout) print(result) # 示例:result:{'code': 200, 'msg': '成功', 'data': {'title_qa': {'0201': '非汉字占比>55%'}}} code=result.get("code") if code!=200: #数据出错 coll_user.update_one({"_id":max_id},{"$set":{"flag":1}}) continue data=result.get("data",{}) # coll_user.update_one({"_id":max_id},{"$set":{"data":data}}) coll_user.update_one({"_id":max_id},{"$set":data}) break except Exception as e: print(e) time.sleep(10) continue def batch_load_data_test(): """ 批量数据质量检查本地测试 :return: """ row_data = { "title":"你好周杰伦成交通知书...采购计划任务", "winner": "第一峡江县文化广电新闻出版旅游局", "detail": "一、采购人名称: 峡江县文化广电新闻出版旅游局
二、供应商名称: 中国广电江西网络有限公司峡江县分公司
四、中标金额: 140万
三、采购项目名称: 峡江县文化广电新闻出版旅游局服务工程项目
四、采购项目编号: 2881401000001829192
五、合同编号: 2023M0816360823000201
六、合同内容:
序号 标项名称 规格型号 单位 数量 单价(元) 总123价(元)
1 有线电视 宽带网络安装 通信系统工程 1.00 30416 30416


服务要求或标的基本概况:
七、其它事项:
/
八、联系方式
1、 采购人名称: 峡江县文化广电新闻出版旅游局
联系人: 傅昕尧
联系电话: 1569022****
传真:
地址: 玉华路71号
2、供应商名称: 中国广电江西网络有限公司峡江县分公司
地址: 江西省吉安市峡江县江西省吉安市峡江县文化广播电视局
附件信息:
关于通信系统工程的服务工程合同(2023M0816360823000201).pdf", "buyer": "文化广电新闻出版旅游局", "budget": 3100, "subtype":"中标", "area":"河南", "projectcode": "3333", "buyerclass":"学校" } # 规则查询,根据必要条件 公司名称(用户ID)、版本号 rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.4") print(rules_id) # 评估调用 result = start_quality(row_data, rules_id, a2s_ip, topic, timeout) print(result) def batch_load_data_debug(): # 规则查询,根据必要条件 公司名称(用户ID)、版本号 rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.4") print(rules_id) db = MongoClient('192.168.3.206', 27080, unicode_decode_error_handler="ignore").data_quality coll_user = db["bidding_20231122"] for item in coll_user.find({"_id": ObjectId("655ec5319aed6eb2ffa5d7ce")}): # item['detail'] = item['details'].replace("\\\"", "\"") item["_id"] = str(item["_id"]) result = start_quality(item, rules_id, a2s_ip, topic, timeout) print(result) if __name__ == '__main__': batch_load_data() # batch_load_data_test() # batch_load_data_debug() # batch_load_purchasinglist()#标的物检查函数