123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- # coding:utf-8
- import time
- from a2s.tools import json_serialize, json_deserialize
- from a2s.a2s_client import a2s_execute
- from docs.config import ReluMongodb
- from util.mogodb_helper import MongoDBInterface
- from pymongo import MongoClient
- from bson import ObjectId
- ReluClient = MongoDBInterface(ReluMongodb)
- # 评估服务配置
- a2s_ip = "192.168.3.240:9090"
- # a2s_ip = "172.17.0.11:9090"
- topic = "quality_bid"
- #本地测试用的主题
- # topic = "test_quality_bid"
- timeout = 180
- # 开始评估
- def start_quality(data: dict, rules_id: int, a2s_ip, topic, timeout, retry=3):
- # 本次不使用SSL,所以channel是不安全的
- row = {"data": data, "rules_id": rules_id}
- bytes_data = json_serialize(row)
- for t in range(retry):
- try:
- resp_data = a2s_execute(a2s_ip, topic, timeout, bytes_data)
- if resp_data is None:
- continue
- result = json_deserialize(resp_data)
- return result
- except Exception as e:
- print(e)
- return {}
- # 获取规则ID
- def get_rule(company, version):
- rule_id = ReluClient.find_rule_by_company(ReluMongodb["col"], company, version)
- return rule_id
- def batch_load_purchasinglist():
- """
- 批量数据质量检查
- """
- # 规则查询,根据必要条件 公司名称(用户ID)、版本号
- rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.1")
- print(rules_id)
- max_id = ObjectId("0" * 24)
- # max_id = ObjectId("5f8e5e1650cded0641ae3f7e")
- while True:
- # db = MongoClient('192.168.3.71', 29099, unicode_decode_error_handler="ignore").re4art
- # coll_user = db["zc_classify_test"]
- db = MongoClient('192.168.3.149', 27180, unicode_decode_error_handler="ignore").data_quality
- coll_user = db["bidding_20241205_ai"]
- num=0
- try:
- for item in coll_user.find({"_id":{"$gte":max_id}}).sort("_id",1):
- # for item in coll_user.find({"_id":ObjectId("60f84adf1a75b8f4460ead47")}):
- # for item in coll_user.find().sort("_id",1):
- max_id = item["_id"]
- print(max_id)
- purchasinglist =item.get("purchasinglist")
- purchasingsource =item.get("purchasingsource")
- #不存在标的物
- if not (purchasinglist and purchasingsource):
- continue
- item["_id"] = str(item["_id"])
- # item['detail']=item['details'].replace("\\\"", "\"")
- result = start_quality(item, rules_id, a2s_ip, topic, timeout)
- print(result)
- # 示例:result:{'code': 200, 'msg': '成功', 'data': {'title_qa': {'0201': '非汉字占比>55%'}}}
- code=result.get("code")
- if code!=200:
- #数据出错
- coll_user.update_one({"_id": max_id}, {"$set": {"flag": 1}})
- continue
- data=result.get("data",{})
- # coll_user.update_one({"_id":max_id},{"$set":{"data":data}})
- coll_user.update_one({"_id":max_id},{"$set":data})
- break
- except Exception as e:
- print(e)
- time.sleep(10)
- continue
- def batch_load_data():
- """
- 批量数据质量检查
- """
- # 规则查询,根据必要条件 公司名称(用户ID)、版本号
- rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.2")
- print(rules_id)
- max_id = ObjectId("0" * 24)
- # max_id = ObjectId("655ec5609aed6eb2ffa654ca")
- while True:
- # db = MongoClient('192.168.3.206', 27080, unicode_decode_error_handler="ignore").data_quality
- # coll_user = db["bidding_20231221"]
- db = MongoClient('192.168.3.149', 27180, unicode_decode_error_handler="ignore").data_quality
- coll_user = db["bidding_20241219"]
- try:
- for item in coll_user.find({"_id":{"$gte":max_id}}).sort("_id",1):
- # for item in coll_user.find({"_id":ObjectId("65838f83185812a17f85760f")}):
- # for item in coll_user.find().sort("_id",1):
- max_id = item["_id"]
- print(max_id)
- item["_id"] = str(item["_id"])
- # item['detail']=item['details'].replace("\\\"", "\"")
- result = start_quality(item, rules_id, a2s_ip, topic, timeout)
- print(result)
- # 示例:result:{'code': 200, 'msg': '成功', 'data': {'title_qa': {'0201': '非汉字占比>55%'}}}
- code=result.get("code")
- if code!=200:
- #数据出错
- coll_user.update_one({"_id":max_id},{"$set":{"flag":1}})
- continue
- data=result.get("data",{})
- # coll_user.update_one({"_id":max_id},{"$set":{"data":data}})
- coll_user.update_one({"_id":max_id},{"$set":data})
- break
- except Exception as e:
- print(e)
- time.sleep(10)
- continue
- def batch_load_data_test():
- """
- 批量数据质量检查本地测试
- :return:
- """
- row_data = {
- "title":"你好周杰伦成交通知书...采购计划任务",
- "winner": "第一峡江县文化广电新闻出版旅游局",
- "detail": "一、采购人名称: 峡江县文化广电新闻出版旅游局 <br/> 二、供应商名称: 中国广电江西网络有限公司峡江县分公司 <br/> 四、中标金额: 140万 <br/> 三、采购项目名称: 峡江县文化广电新闻出版旅游局服务工程项目 <br/> 四、采购项目编号: 2881401000001829192 <br/> 五、合同编号: 2023M0816360823000201 <br/> 六、合同内容:<br/> <table> <tbody><tr> <td>序号</td> <td>标项名称</td> <td>规格型号</td> <td>单位</td> <td>数量</td> <td>单价(元)</td> <td colspan=\"1\">总123价(元)</td> </tr> <tr> <td>1</td> <td>有线电视 宽带网络安装 通信系统工程</td> <td></td> <td>项</td> <td>1.00</td> <td>30416</td> <td colspan=\"1\">30416</td> </tr> </tbody></table><br/> <br/> 服务要求或标的基本概况: <br/> 七、其它事项:<br/> / <br/> 八、联系方式<br/> 1、 采购人名称: 峡江县文化广电新闻出版旅游局 <br/> 联系人: 傅昕尧 <br/> 联系电话: 1569022**** <br/> 传真: <br/> 地址: 玉华路71号 <br/> 2、供应商名称: 中国广电江西网络有限公司峡江县分公司 <br/> 地址: 江西省吉安市峡江县江西省吉安市峡江县文化广播电视局 <br/>附件信息:<br/>关于通信系统工程的服务工程合同(2023M0816360823000201).pdf",
- "buyer": "文化广电新闻出版旅游局",
- "budget": 3100,
- "subtype":"中标",
- "area":"河南",
- "projectcode": "3333",
- "buyerclass":"学校"
- }
- # 规则查询,根据必要条件 公司名称(用户ID)、版本号
- rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.4")
- print(rules_id)
- # 评估调用
- result = start_quality(row_data, rules_id, a2s_ip, topic, timeout)
- print(result)
- def batch_load_data_debug():
- # 规则查询,根据必要条件 公司名称(用户ID)、版本号
- rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.4")
- print(rules_id)
- db = MongoClient('192.168.3.206', 27080, unicode_decode_error_handler="ignore").data_quality
- coll_user = db["bidding_20231122"]
- for item in coll_user.find({"_id": ObjectId("655ec5319aed6eb2ffa5d7ce")}):
- # item['detail'] = item['details'].replace("\\\"", "\"")
- item["_id"] = str(item["_id"])
- result = start_quality(item, rules_id, a2s_ip, topic, timeout)
- print(result)
- if __name__ == '__main__':
- batch_load_data()
- # batch_load_data_test()
- # batch_load_data_debug()
- # batch_load_purchasinglist()#标的物检查函数
|