liumiaomiao 1 gadu atpakaļ
vecāks
revīzija
6b289b49f3
8 mainītis faili ar 441 papildinājumiem un 40 dzēšanām
  1. 1 1
      app.py
  2. 5 6
      client.py
  3. 39 0
      es.py
  4. 2 2
      result_export.py
  5. 0 31
      test.py
  6. 85 0
      util/es_tools.py
  7. 255 0
      util/mongo_tools.py
  8. 54 0
      write_rule.py

+ 1 - 1
app.py

@@ -197,7 +197,7 @@ if __name__ == '__main__':
     "title" : "二连浩特",
     "dataging" : int(0),
     # "bidopentime":int(1798739414),
-    # "publishtime" : int(1698739410),
+    "publishtime" : int(1698739410),
     # "subtype" : "招标",
     "purchasinglist" : [
         {

+ 5 - 6
client.py

@@ -12,7 +12,7 @@ ReluClient = MongoDBInterface(ReluMongodb)
 a2s_ip = "192.168.3.240:9090"
 topic = "quality_bid"
 #本地测试用的主题
-# topic_test = "test_quality_bid"
+# topic = "test_quality_bid"
 timeout = 120
 
 # 开始评估
@@ -91,16 +91,16 @@ def batch_load_data():
     rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.2")
     print(rules_id)
 
-    # max_id = ObjectId("0" * 24)
-    max_id = ObjectId("655ec5609aed6eb2ffa654ca")
+    max_id = ObjectId("0" * 24)
+    # max_id = ObjectId("655ec5609aed6eb2ffa654ca")
     while True:
         db = MongoClient('192.168.3.206', 27080, unicode_decode_error_handler="ignore").data_quality
-        coll_user = db["bidding_20231122"]
+        coll_user = db["bidding_20231221"]
         # db = MongoClient('192.168.3.166', 27082, unicode_decode_error_handler="ignore").zhengluming_27082
         # coll_user = db["update_data"]
         try:
             for item in coll_user.find({"_id":{"$gte":max_id}}).sort("_id",1):
-            # for item in coll_user.find({"_id":ObjectId("65485a930023f3bdb1621a1d")}):
+            # for item in coll_user.find({"_id":ObjectId("655ec5579aed6eb2ffa63d6d")}):
             # for item in coll_user.find().sort("_id",1):
                 max_id = item["_id"]
                 print(max_id)
@@ -157,7 +157,6 @@ def batch_load_data_debug():
         result = start_quality(item, rules_id, a2s_ip, topic, timeout)
         print(result)
 
-
 if __name__ == '__main__':
     batch_load_data()
     # batch_load_data_test()

+ 39 - 0
es.py

@@ -0,0 +1,39 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# author : liumiaomiao
+#从es库中导出数据到测试环境mongo库
+from util.es_tools import esutil
+from pymongo import MongoClient
+def ES_bidding(es_query):
+    """
+    操作样例:直接拉取数据
+    """
+    db_config = {
+        # es
+        'es_host': '127.0.0.1',
+        'es_port': 9800,
+        'es_http_auth': ('test_dataQuality','t9s3Ed0gBBowf5'),  # 重新申请
+        'timeout': 10000,
+        # 'index': "projectset",
+        'index': "bidding",
+        'size': 1000,
+        # mongo存的数据库表
+        'mg_host': '192.168.3.206',
+        'mg_port': 27080,
+        'database': 'data_quality',
+        'collection': 'bidding_20231221'
+    }
+    query = es_query
+    # 传入查询语句query 以及配置信息
+    esutil.es_query_save(query, **db_config)
+
+def run():
+    # 根据ES语句查找bidding
+    es_query = {"track_total_hits": True,
+                "query": {"bool": {"must": [{"range": {"publishtime": {"from": "1703001600", "to": "1703048399"}}}]}}}
+    # es_query = {"track_total_hits": True,
+    #             "query": {"bool": {"must": [{"range": {"publishtime": {"from": "1691337600", "to": "1691424000"}}},
+    #                                         {"terms": {"subtype": ["中标", "合同","成交"]}}]}}}
+    ES_bidding(es_query)
+
+run()

+ 2 - 2
result_export.py

@@ -6,7 +6,7 @@ from openpyxl import load_workbook
 host = '192.168.3.206'  # MongoDB主机地址
 port = 27080  # MongoDB端口
 dbname = 'data_quality'  # 数据库名称
-collection_name = 'bidding_20231122'  # 集合名称
+collection_name = 'bidding_20231221'  # 集合名称
 
 # 创建MongoDB连接
 client = MongoClient(host, port)
@@ -14,7 +14,7 @@ db = client[dbname]
 collection = db[collection_name]
 
 # 从MongoDB读取数据
-data = pd.DataFrame(list(collection.find().limit(1000)))
+data = pd.DataFrame(list(collection.find()))
 
 # 定义字段中英文映射
 column_name_mapping = {

+ 0 - 31
test.py

@@ -1,31 +0,0 @@
-from docs.config import ReluMongodb
-from util.mogodb_helper import MongoDBInterface
-
-ReluClient = MongoDBInterface(ReluMongodb)
-
-data = {
-    "rules_id": 1,
-    "company_name":"拓普",
-    "version":"v1.0",
-    "rules": {
-        "budget": {"0101": {
-                "name": "互相校验(预算和中标金额的比例)",
-                "parent_name": "金额错误",
-                "parent_code": "01",
-            }, "0102":{
-                "name": "过大过小[100,10亿]",
-                "parent_name": "金额错误",
-                "parent_code": "01",
-            }},
-        "area": {"0101": {
-            "name": "全国类数据",
-            "parent_name": "全国类型",
-            "parent_code": "01",
-        }, },
-        "multipackage": {"1000": "haha",
-                         "0203": "yaya"},
-    }
-}
-
-ReluClient.db[ReluMongodb["col"]].insert_one(data)
-print()

+ 85 - 0
util/es_tools.py

@@ -0,0 +1,85 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# author : zhaolongyue
+#date : 2023-07-03
+from pymongo import MongoClient
+from util import mongo_tools
+from elasticsearch import Elasticsearch
+class EsUtil:
+    @staticmethod
+    def es_query_save(query, **kwargs):
+        """
+        es直接到mongo
+        :param query:
+        :param kwargs:
+        :return:
+        """
+        coon = mongo_tools.CoonUtil.get_coon(**kwargs)
+        es = EsUtil.get_es(**kwargs)
+        result_all = EsUtil.get_es_result(es, query, **kwargs)
+        # print(result_all)
+        total = result_all['hits']['total']["value"]
+        results = result_all['hits']['hits']
+        scroll_id = result_all['_scroll_id']
+        print("数据总量:", total)
+        count = 0
+        for i in range(0, int(total / kwargs['size']) + 1):
+            # scroll参数必须指定否则会报错
+            query_scroll = EsUtil.get_es_scroll(es, scroll_id, **kwargs)
+            results += query_scroll
+            save_bulk = []
+            for res in results:
+                count += 1
+                es_result = res["_source"]
+                save_bulk.append(es_result)
+            results = []
+            mongo_tools.MongoSentence.insert_many(coon, save_bulk)
+            print(count, "数据已保存")
+    def get_es(**kwargs):
+        """
+		获取es连接
+		:param kwargs:
+		:return:
+		"""
+        es = Elasticsearch(
+            [{"host": kwargs["es_host"], "http_auth": kwargs["es_http_auth"], "port": kwargs["es_port"]}])
+        # print(es)
+        return es
+    @staticmethod
+    def get_es_count(es, query, **kwargs):
+        """
+               直接查询
+               :param es:
+               :param query:
+               :param kwargs:
+               :return:
+               """
+        result_all_count = es.count(index=kwargs["index"], body=query, request_timeout=kwargs["timeout"])
+        return result_all_count
+    @staticmethod
+    def get_es_result(es, query, **kwargs):
+        """
+        直接查询
+        :param es:
+        :param query:
+        :param kwargs:
+        :return:
+        """
+        result_all = es.search(index=kwargs["index"], body=query, request_timeout=kwargs["timeout"],
+                               scroll='2m', size=kwargs["size"])
+
+        return result_all
+
+    @staticmethod
+    def get_es_scroll(es, scroll_id,  **kwargs):
+        """
+        游标scroll_id
+        :param es:
+        :param scroll_id:
+        :param kwargs:
+        :return:
+        """
+        query_scroll = es.scroll(scroll_id=scroll_id, scroll='2m',
+                                 request_timeout=kwargs["timeout"])['hits']['hits']
+        return query_scroll
+esutil=EsUtil()

+ 255 - 0
util/mongo_tools.py

@@ -0,0 +1,255 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# author : zhaolongyue
+#date : 2023-07-03
+from pymongo import MongoClient
+from bson import ObjectId
+
+class MongoUtil:
+    @staticmethod
+    def get_coon(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None, authpass=None):
+        """
+        获取mongo数据库连接
+        :param host:
+        :param port:
+        :param database:
+        :param collection:
+        :param authdb:
+        :param authuser:
+        :param authpass:
+        :return:
+        """
+        if database is None:
+            raise RuntimeError('database is None')
+        if collection is None:
+            raise RuntimeError('collection is None')
+        conn = MongoClient(host, port, unicode_decode_error_handler="ignore")
+        print(conn)
+        if authdb is not None:
+            db_auth = conn[authdb]
+            db_auth.authenticate(authuser, authpass)
+        db = conn[database]
+        collection = db[collection]
+        return collection
+
+
+class CoonUtil:
+    @staticmethod
+    def get_coon(**kwargs):
+        """
+        获取mongo连接
+        :param kwargs:
+        :return:
+        """
+        coon = MongoUtil.get_coon(host=kwargs["mg_host"], port=kwargs["mg_port"],
+                                              database=kwargs["database"], collection=kwargs["collection"])
+        return coon
+class MongoSentence:
+    @staticmethod
+    def count(coon, nosql=None):
+        """
+        count数据量
+        :param coon:
+        :param nosql:
+        :return:
+        """
+        if nosql is None:
+            return coon.find({}).count()
+        else:
+            return coon.count(nosql)
+
+    @staticmethod
+    def find_all(coon, columns=None):
+        """
+        无查询条件返回指定字段的全量数据
+        :param coon:
+        :param columns:
+        :return:
+        """
+        # data = DataFrame(list(self.collection.find({})))
+        # data.drop(["_id"],axis=customer_program,inplace=True)
+        # return data
+        vlist = []
+        if columns is None:
+            vlist = coon.find({"item" :"5f0bcb65fc58d361fb9027f6"})
+        else:
+            cols = {}
+            for c in columns:
+                cols[c] = 1
+            vlist = coon.find({"item" :"5f0bcb65fc58d361fb9027f6"}, cols).batch_size(1000)
+        return vlist
+
+    @staticmethod
+    def find_by_Nosql(coon, nosql={}, columns=None):
+        vlist = []
+        # print(nosql)
+        if columns is None:
+            vlist = coon.find(nosql)
+        else:
+            cols = {}
+            for c in columns:
+                cols[c] = 1
+            vlist = coon.collection.find(nosql, cols)
+        return vlist
+
+    @staticmethod
+    def update_ir_ent_name_by_id(coon, oid, obj):
+        coon.update_one({"_id": ObjectId(oid)}, {"$set": {"IR_ENTNAME": obj}})
+
+    @staticmethod
+    def update_by_id(coon, oid, obj):
+        coon.update_one({"_id": ObjectId(oid)}, {"$set": obj})
+
+
+    @staticmethod
+    def find_one_by_company_name(coon, company):
+        return coon.find({"company_name": company}).count()
+
+    @staticmethod
+    def save(coon, obj):
+        """
+        插入数据
+        :param coon:
+        :param obj:
+        :return:
+        """
+        coon.save(obj)
+
+
+    @staticmethod
+    def insert_many(coon, bulk):
+        """
+        批量插入
+        :param coon:
+        :param bulk:
+        :return:
+        """
+        coon.insert_many(bulk)
+
+    @staticmethod
+    def delcol_by_id(coon, id, column):
+        """
+        删除数据
+        :param coon:
+        :param id:
+        :param column:
+        :return:
+        """
+        coon.collection.update_one({"_id": id}, {"$unset": {column: ""}})
+
+    @staticmethod
+    def find_one_by_id(coon, nosql, column):
+        return coon.collection.find_one(nosql, column)
+
+# 这个是删表操作
+    @staticmethod
+    def clear(coon):
+        """
+        删除表
+        :param coon:
+        :return:
+        """
+        coon.collection.drop()
+
+
+
+class Data_get():
+    @staticmethod
+    #连接数据库数据表
+    def get_con(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None,
+                     authpass=None):
+
+        """
+        获取mongo数据库连接
+        :param host:
+        :param port:
+        :param database:
+        :param collection:
+        :param authdb:
+        :param authuser:
+        :param authpass:
+        :return:
+        """
+        if database is None:
+            raise RuntimeError('database is None')
+        if collection is None:
+            raise RuntimeError('collection is None')
+        con = MongoClient(host, port, unicode_decode_error_handler="ignore")
+        # print(con)
+        if authdb is not None:
+            db_auth = con[authdb]
+            db_auth.authenticate(authuser, authpass)
+        db = con[database]
+        collection = db[collection]
+        return collection
+
+    @staticmethod
+    #随机获取id
+    def get_id_sample(con):
+        id_list = []
+        for item in con.aggregate([{"$match":{"repeat":0}},{'$sample': {'size': 100}}, {"$project": {"_id": 1}}]):
+            id_list.append(str(item["_id"]))
+        return id_list
+
+    @staticmethod
+    #获取id
+    def get_id_mongo(con):
+        id_list = []
+
+    @staticmethod
+    #根据ids,从数据库获取数据
+    def data_ids_mongo(ids,con,save_con):
+        id_list = ids
+        for id in id_list:
+            query = {"_id": ObjectId(id)}
+            list_item = list(con.find(query))
+            if list_item:
+                Data_save.save(save_con,list_item[0])
+
+class Data_save():
+    @staticmethod
+    def save_con(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None,
+                     authpass=None):
+        """
+        获取mongo数据库连接
+        :param host:
+        :param port:
+        :param database:
+        :param collection:
+        :param authdb:
+        :param authuser:
+        :param authpass:
+        :return:
+        """
+        if database is None:
+            raise RuntimeError('database is None')
+        if collection is None:
+            raise RuntimeError('collection is None')
+        con = MongoClient(host, port, unicode_decode_error_handler="ignore")
+        # print(con)
+        if authdb is not None:
+            db_auth = con[authdb]
+            db_auth.authenticate(authuser, authpass)
+        db = con[database]
+        collection = db[collection]
+        return collection
+
+    @staticmethod
+    def save(con, obj):
+        """
+        插入数据
+        :param coon:
+        :param obj:
+        :return:
+        """
+        con.save(obj)
+
+    @staticmethod
+    def insert_many(con, bulk):
+        """
+        批量插入
+        :param con:
+        :param bulk:
+        :return:
+        """
+        con.insert_many(bulk)

+ 54 - 0
write_rule.py

@@ -0,0 +1,54 @@
+from docs.config import ReluMongodb
+from util.mogodb_helper import MongoDBInterface
+
+ReluClient = MongoDBInterface(ReluMongodb)
+
+data = {
+    "rules_id": 4,
+    "company_name":"北京剑鱼信息技术有限公司",
+    "version":"v1.4",
+    "rules": {
+        "area" : {
+            "0101" : {
+                "name" : "全国类数据",
+                "parent_name" : "全国类型",
+                "parent_code" : "01"
+            },
+            "0301" : {
+                "name" : "省份不在[2,3]个字之间",
+                "parent_name" : "长度异常类型",
+                "parent_code" : "03"
+            },
+            "0302" : {
+                "name" : "城市不在[3,11]个字之间",
+                "parent_name" : "长度异常类型",
+                "parent_code" : "03"
+            },
+            "0303" : {
+                "name" : "区县不在[2,15]个字之间",
+                "parent_name" : "长度异常类型",
+                "parent_code" : "03"
+            }
+        },
+        "bidamount" : {
+            "0101" : {
+                "name" : "互相校验(预算和中标金额的比例)",
+                "parent_name" : "金额错误",
+                "parent_code" : "01"
+            },
+            "0102" : {
+                "name" : "过大过小[100,10亿]",
+                "parent_name" : "金额错误",
+                "parent_code" : "01"
+            },
+            "0104" : {
+                "name" : "中标金额存在费率,折扣率",
+                "parent_name" : "金额错误",
+                "parent_code" : "01"
+            }
+        }
+    }
+}
+
+ReluClient.db[ReluMongodb["col"]].insert_one(data)
+print()