liumiaomiao 9 months ago
commit
b4f3c4c790

+ 85 - 0
lib/es_tools.py

@@ -0,0 +1,85 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# author : zhaolongyue
+#date : 2023-07-03
+from pymongo import MongoClient
+from lib import mongo_tools
+from elasticsearch import Elasticsearch
+class EsUtil:
+    @staticmethod
+    def es_query_save(query, **kwargs):
+        """
+        es直接到mongo
+        :param query:
+        :param kwargs:
+        :return:
+        """
+        coon = mongo_tools.CoonUtil.get_coon(**kwargs)
+        es = EsUtil.get_es(**kwargs)
+        result_all = EsUtil.get_es_result(es, query, **kwargs)
+        # print(result_all)
+        total = result_all['hits']['total']["value"]
+        results = result_all['hits']['hits']
+        scroll_id = result_all['_scroll_id']
+        print("数据总量:", total)
+        count = 0
+        for i in range(0, int(total / kwargs['size']) + 1):
+            # scroll参数必须指定否则会报错
+            query_scroll = EsUtil.get_es_scroll(es, scroll_id, **kwargs)
+            results += query_scroll
+            save_bulk = []
+            for res in results:
+                count += 1
+                es_result = res["_source"]
+                save_bulk.append(es_result)
+            results = []
+            mongo_tools.MongoSentence.insert_many(coon, save_bulk)
+            print(count, "数据已保存")
+    def get_es(**kwargs):
+        """
+		获取es连接
+		:param kwargs:
+		:return:
+		"""
+        es = Elasticsearch(
+            [{"host": kwargs["es_host"], "http_auth": kwargs["es_http_auth"], "port": kwargs["es_port"]}])
+        # print(es)
+        return es
+    @staticmethod
+    def get_es_count(es, query, **kwargs):
+        """
+               直接查询
+               :param es:
+               :param query:
+               :param kwargs:
+               :return:
+               """
+        result_all_count = es.count(index=kwargs["index"], body=query, request_timeout=kwargs["timeout"])
+        return result_all_count
+    @staticmethod
+    def get_es_result(es, query, **kwargs):
+        """
+        直接查询
+        :param es:
+        :param query:
+        :param kwargs:
+        :return:
+        """
+        result_all = es.search(index=kwargs["index"], body=query, request_timeout=kwargs["timeout"],
+                               scroll='2m', size=kwargs["size"])
+
+        return result_all
+
+    @staticmethod
+    def get_es_scroll(es, scroll_id,  **kwargs):
+        """
+        游标scroll_id
+        :param es:
+        :param scroll_id:
+        :param kwargs:
+        :return:
+        """
+        query_scroll = es.scroll(scroll_id=scroll_id, scroll='2m',
+                                 request_timeout=kwargs["timeout"])['hits']['hits']
+        return query_scroll
+esutil=EsUtil()

+ 255 - 0
lib/mongo_tools.py

@@ -0,0 +1,255 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# author : zhaolongyue
+#date : 2023-07-03
+from pymongo import MongoClient
+from bson import ObjectId
+
+class MongoUtil:
+    @staticmethod
+    def get_coon(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None, authpass=None):
+        """
+        获取mongo数据库连接
+        :param host:
+        :param port:
+        :param database:
+        :param collection:
+        :param authdb:
+        :param authuser:
+        :param authpass:
+        :return:
+        """
+        if database is None:
+            raise RuntimeError('database is None')
+        if collection is None:
+            raise RuntimeError('collection is None')
+        conn = MongoClient(host, port, unicode_decode_error_handler="ignore")
+        print(conn)
+        if authdb is not None:
+            db_auth = conn[authdb]
+            db_auth.authenticate(authuser, authpass)
+        db = conn[database]
+        collection = db[collection]
+        return collection
+
+
+class CoonUtil:
+    @staticmethod
+    def get_coon(**kwargs):
+        """
+        获取mongo连接
+        :param kwargs:
+        :return:
+        """
+        coon = MongoUtil.get_coon(host=kwargs["mg_host"], port=kwargs["mg_port"],
+                                              database=kwargs["database"], collection=kwargs["collection"])
+        return coon
+class MongoSentence:
+    @staticmethod
+    def count(coon, nosql=None):
+        """
+        count数据量
+        :param coon:
+        :param nosql:
+        :return:
+        """
+        if nosql is None:
+            return coon.find({}).count()
+        else:
+            return coon.count(nosql)
+
+    @staticmethod
+    def find_all(coon, columns=None):
+        """
+        无查询条件返回指定字段的全量数据
+        :param coon:
+        :param columns:
+        :return:
+        """
+        # data = DataFrame(list(self.collection.find({})))
+        # data.drop(["_id"],axis=customer_program,inplace=True)
+        # return data
+        vlist = []
+        if columns is None:
+            vlist = coon.find({"item" :"5f0bcb65fc58d361fb9027f6"})
+        else:
+            cols = {}
+            for c in columns:
+                cols[c] = 1
+            vlist = coon.find({"item" :"5f0bcb65fc58d361fb9027f6"}, cols).batch_size(1000)
+        return vlist
+
+    @staticmethod
+    def find_by_Nosql(coon, nosql={}, columns=None):
+        vlist = []
+        # print(nosql)
+        if columns is None:
+            vlist = coon.find(nosql)
+        else:
+            cols = {}
+            for c in columns:
+                cols[c] = 1
+            vlist = coon.collection.find(nosql, cols)
+        return vlist
+
+    @staticmethod
+    def update_ir_ent_name_by_id(coon, oid, obj):
+        coon.update_one({"_id": ObjectId(oid)}, {"$set": {"IR_ENTNAME": obj}})
+
+    @staticmethod
+    def update_by_id(coon, oid, obj):
+        coon.update_one({"_id": ObjectId(oid)}, {"$set": obj})
+
+
+    @staticmethod
+    def find_one_by_company_name(coon, company):
+        return coon.find({"company_name": company}).count()
+
+    @staticmethod
+    def save(coon, obj):
+        """
+        插入数据
+        :param coon:
+        :param obj:
+        :return:
+        """
+        coon.save(obj)
+
+
+    @staticmethod
+    def insert_many(coon, bulk):
+        """
+        批量插入
+        :param coon:
+        :param bulk:
+        :return:
+        """
+        coon.insert_many(bulk)
+
+    @staticmethod
+    def delcol_by_id(coon, id, column):
+        """
+        删除数据
+        :param coon:
+        :param id:
+        :param column:
+        :return:
+        """
+        coon.collection.update_one({"_id": id}, {"$unset": {column: ""}})
+
+    @staticmethod
+    def find_one_by_id(coon, nosql, column):
+        return coon.collection.find_one(nosql, column)
+
+# 这个是删表操作
+    @staticmethod
+    def clear(coon):
+        """
+        删除表
+        :param coon:
+        :return:
+        """
+        coon.collection.drop()
+
+
+
+class Data_get():
+    @staticmethod
+    #连接数据库数据表
+    def get_con(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None,
+                     authpass=None):
+
+        """
+        获取mongo数据库连接
+        :param host:
+        :param port:
+        :param database:
+        :param collection:
+        :param authdb:
+        :param authuser:
+        :param authpass:
+        :return:
+        """
+        if database is None:
+            raise RuntimeError('database is None')
+        if collection is None:
+            raise RuntimeError('collection is None')
+        con = MongoClient(host, port, unicode_decode_error_handler="ignore")
+        # print(con)
+        if authdb is not None:
+            db_auth = con[authdb]
+            db_auth.authenticate(authuser, authpass)
+        db = con[database]
+        collection = db[collection]
+        return collection
+
+    @staticmethod
+    #随机获取id
+    def get_id_sample(con):
+        id_list = []
+        for item in con.aggregate([{"$match":{"repeat":0}},{'$sample': {'size': 100}}, {"$project": {"_id": 1}}]):
+            id_list.append(str(item["_id"]))
+        return id_list
+
+    @staticmethod
+    #获取id
+    def get_id_mongo(con):
+        id_list = []
+
+    @staticmethod
+    #根据ids,从数据库获取数据
+    def data_ids_mongo(ids,con,save_con):
+        id_list = ids
+        for id in id_list:
+            query = {"_id": ObjectId(id)}
+            list_item = list(con.find(query))
+            if list_item:
+                Data_save.save(save_con,list_item[0])
+
+class Data_save():
+    @staticmethod
+    def save_con(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None,
+                     authpass=None):
+        """
+        获取mongo数据库连接
+        :param host:
+        :param port:
+        :param database:
+        :param collection:
+        :param authdb:
+        :param authuser:
+        :param authpass:
+        :return:
+        """
+        if database is None:
+            raise RuntimeError('database is None')
+        if collection is None:
+            raise RuntimeError('collection is None')
+        con = MongoClient(host, port, unicode_decode_error_handler="ignore")
+        # print(con)
+        if authdb is not None:
+            db_auth = con[authdb]
+            db_auth.authenticate(authuser, authpass)
+        db = con[database]
+        collection = db[collection]
+        return collection
+
+    @staticmethod
+    def save(con, obj):
+        """
+        插入数据
+        :param coon:
+        :param obj:
+        :return:
+        """
+        con.save(obj)
+
+    @staticmethod
+    def insert_many(con, bulk):
+        """
+        批量插入
+        :param con:
+        :param bulk:
+        :return:
+        """
+        con.insert_many(bulk)

+ 39 - 0
tools/从es导出数据/es.py

@@ -0,0 +1,39 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# author : liumiaomiao
+#从es库中导出数据到测试环境mongo库
+from lib.es_tools import esutil
+from pymongo import MongoClient
+def ES_bidding(es_query):
+    """
+    操作样例:直接拉取数据
+    """
+    db_config = {
+        # es
+        'es_host': '127.0.0.1',
+        'es_port': 9800,
+        'es_http_auth': ('test_dataQuality','t9s3Ed0gBBowf5'),  # 重新申请
+        'timeout': 10000,
+        # 'index': "projectset",
+        'index': "bidding",
+        'size': 1000,
+        # mongo存的数据库表
+        'mg_host': '192.168.3.206',
+        'mg_port': 27080,
+        'database': 'data_quality',
+        'collection': 'bidding_20231221'
+    }
+    query = es_query
+    # 传入查询语句query 以及配置信息
+    esutil.es_query_save(query, **db_config)
+
+def run():
+    # 根据ES语句查找bidding
+    es_query = {"track_total_hits": True,
+                "query": {"bool": {"must": [{"range": {"publishtime": {"from": "1703001600", "to": "1703048399"}}}]}}}
+    # es_query = {"track_total_hits": True,
+    #             "query": {"bool": {"must": [{"range": {"publishtime": {"from": "1691337600", "to": "1691424000"}}},
+    #                                         {"terms": {"subtype": ["中标", "合同","成交"]}}]}}}
+    ES_bidding(es_query)
+
+run()

+ 57 - 0
tools/从mongo导出数据/mongo.py

@@ -0,0 +1,57 @@
+from pymongo import MongoClient
+from bson import ObjectId
+from util.mogodb_helper import MongoDBInterface
+
+#复制mongo数据源 从一个源到另一个源
+
+#mongo库 源1
+MongodbConfig = {
+    "ip_port": "127.0.0.1:27092",
+    "user": "viewdata",
+    "password": "viewdata",
+    "db": "qfw",
+}
+mdb = MongoDBInterface(MongodbConfig)
+#mongo库 源2
+MongodbConfigInsert = {
+    "ip_port": "192.168.3.166:27082",
+    "db": "zhengluming_27082",
+}
+insertdb = MongoDBInterface(MongodbConfigInsert)
+
+# 连接MongoDB数据库
+with MongoClient('192.168.3.166', 27082) as client:
+    db = client.zhengluming
+    coll = db.f_sourceinfo_HP_channel_succes_data
+    for row in coll.find({"historyId": "655707de7f39550da1f59d3d"}, {"id": 1}).sort("_id", 1):
+        _id = row.get("id", "")
+        print(row["_id"])
+        if _id:
+            m_id = ObjectId(_id)
+            result=mdb.find_by_id("bidding",m_id)
+            result["id"]=str(row["_id"])
+            if result:
+                insertdb.insert2db("update_data",result)
+
+# 关闭数据库连接
+client.close()
+
+
+"""
+
+http://172.17.162.35:8880/search
+
+# 深圳分公司安检信息系统维护服务 0.0 南方航空物流股份有限公司 广东 深圳市 ['安检']  []
+# 运输事业发展中心信息系统运维-交通委北区机房、视频会议及运输中心终端运维管理技术服务项目 825250.0 北京市运输事业发展中心 北京 北京市 ['运输事业发展中心信息系统运维 交通委北>区机房 视频会议及运输中心终端运维管理技术服务项目', '运输事业发展中心信息系统运维 交通委北区机房 视频会议及运输中心终端运维管理技术服务项目'] 北京航天星桥科技股份有限公司 []
+# 软件开发项目(软件开发-拓信智防) 0.0 指定集成公司与河北拓信智防电子科技有限公司 河北 保定市 ['车辆管理平台']  []
+# 新乡高新技术产业开发区社会治理委员会“雪亮工程”维保项目 3410000.0 新乡高新技术产业开发区社会治理委员会 河南 新乡市 ['维保', '雪亮工程']  []
+# 山西省大同市第三人民医院新建医技、急诊楼配套项目(暂估价)医用箱式中型物流传输系统采购项目 20028500.0 山西省大同市第三人民医院 山西 大同市 ['箱式中型物流']  []
+# 山西省大同市第三人民医院新建医技、急诊楼配套项目(暂估价)医用箱式中型物流传输系统采购项目 20028500.0 山西省大同市第三人民医院 山西 大同市 ['物流系统']  []
+
+
+广州市白云区三元里群英大街13号加装电梯工程 0.0 广州市白云区三元里群英大街13号加装电梯工程 广东 广州市 ['电梯']  []
+软件开发服务(二次)软件开发服务 6500000.0 呼伦贝尔市住房和城乡建设局 内蒙古 呼伦贝尔市 ['软件', '软件开发服务 二次 软件开发服务']  []
+富顺县救灾物资储备库建设工程项目 2346407.0 富顺县应急管理指挥中心 四川 自贡市 ['备库建设', '救灾物资']  []
+中共成都市委党校明志楼多媒体教室LED屏采购项目 650000.0 中共成都市委党校 四川 成都市 ['LED一体机', '触控一体机'] 成都香草山信息技术有限责任公司 []
+吉木萨尔县大有镇村庄规划(2021-2035年)编制项目 150000.0 吉木萨尔县大有镇人民政府 新疆 昌吉回族自治州 ['建材']  []
+"""

File diff suppressed because it is too large
+ 22 - 0
tools/压力测试/pressure.py


File diff suppressed because it is too large
+ 23 - 0
tools/并发测试/subsequent.py


+ 83 - 0
tools/数据打分脚本/score.py

@@ -0,0 +1,83 @@
+from pymongo import MongoClient
+from bson import ObjectId
+from docs.config import abnormal_config
+import csv
+def bid_score():
+    db = MongoClient('192.168.3.167', 27080, unicode_decode_error_handler="ignore").jyqyfw_historyData2024
+    coll_user = db["20240624Hbgd_yanzheng"]
+    # db = MongoClient('192.168.3.206', 27080, unicode_decode_error_handler="ignore").data_quality
+    # coll_user = db["bidding_20231122"]
+    score=100
+    max_id = ObjectId("0" * 24)
+    for item in coll_user.find({"_id":{"$gte":max_id}}).sort("_id",1).limit(200):
+    # for item in coll_user.find({"_id":ObjectId("655ec5319aed6eb2ffa5d77f")}):
+    # for item in coll_user.find().sort("_id",1):
+        title= item.get('title_qa')
+        projectname = item.get('projectname_qa')
+        area= item.get('area_qa')
+        projectcode= item.get('projectcode_qa')
+        #bidopentime= item.get('bidopentime_qa')
+        buyer= item.get('buyer_qa')
+        winner= item.get('winner_qa')
+        budget= item.get('budget_qa')
+        bidamount= item.get('bidamount_qa')
+        multipackage= item.get("multipackage_qa")
+        site=item.get("site")
+        toptype =item.get("toptype")
+        subtype = item.get("subtype")
+        flag=0
+        with open(abnormal_config["table_field_config"]["path7"], "r") as f:
+                reads = csv.reader(f)
+                for w in reads:
+                    result=w[0].split("\t")
+                    if result[0]==site and result[1]==toptype and result[2]==subtype:
+                        flag = 1
+                        if title:
+                            score -= 1
+                        if projectname:
+                            score -= 1
+                        if area:
+                            score -= 1
+                        if projectcode:
+                            score -= 1
+                        # if bidopentime:
+                        #     score-=1
+                        if buyer:
+                            score -= 1
+                        if winner:
+                            score -= 1
+                        if budget:
+                            score -= 1
+                        if bidamount:
+                            score -= 1
+                        if multipackage:
+                            score -= 1
+                        coll_user.update_one({"_id": item["_id"]}, {"$set": {"score": score}})
+                        print(score)
+                        score = 100
+        if flag == 0:
+            if title:
+                score-=10
+            if projectname:
+                score-=10
+            if area:
+                score-=10
+            if projectcode:
+                score-=10
+            # if bidopentime:
+            #     score-=10
+            if buyer:
+                score-=10
+            if winner:
+                score-=10
+            if budget:
+                score-=10
+            if bidamount:
+                score-=10
+            if multipackage:
+                score -= 10
+            coll_user.update_one({"_id": item["_id"]}, {"$set": {"score": score}})
+            print(score)
+            score = 100
+
+bid_score()

+ 69 - 0
tools/样本数据导出/sample_data_export.py

@@ -0,0 +1,69 @@
+from pymongo import MongoClient
+
+def sample_data(N):
+    # 连接MongoDB数据库
+    db = MongoClient('192.168.3.149', 27180, unicode_decode_error_handler="ignore").data_quality
+    coll_user = db["bidding_919ai_norepeat"]
+
+    # 统计总的数据量
+    # count_all = coll_user.estimated_document_count()
+    count_all = coll_user.count_documents({"tag": 1})
+    print("Total Document Count:", count_all)
+
+    # 把符合条件的站点名称存起来
+    site_list = {}
+    n = 0
+    site_count = coll_user.aggregate([
+                        {"$match": {"tag": 1}},
+                         {"$group": {"_id": "$site", "count": {"$sum": 1}}},
+                         {"$sort": {"count": -1}}])
+    for item in site_count:
+        if (n / count_all) <= 0.95:
+            n += item["count"]
+            site_list[item["_id"]] = item["count"]
+
+    # 计算每个站点相对于N的目标抽取数量的总和
+    total_ratio = sum([min(site_list[key] / count_all, 1) for key in site_list])
+
+    # 初始化已标记的文档数量
+    marked_count = 0
+
+    # 选取每个站点数据量
+    for key in site_list:
+        if marked_count >= N:
+            break  # 如果已经达到或超过目标数量,停止处理
+
+        # 计算每个站点的目标比例
+        target_ratio = min(site_list[key] / count_all, 1) / total_ratio
+        # 计算每个站点应该抽取的文档数量,确保至少为1
+        num = max(int(target_ratio * N), 2)
+
+        # 如果加上这个站点的数量会超过总目标,调整数量
+        num = min(num, N - marked_count)
+
+        print(f"{key} - Count: {site_list[key]}, Num: {num}, Ratio: {target_ratio}")
+
+        # 计算每次抽取的间隔
+        jiange = int(site_list[key] / num)
+
+        # 从每个站点等间隔地取数据
+        for i in range(num):
+            if marked_count >= N:
+                break  # 再次检查是否已达到目标数量
+
+            for info in coll_user.find({"tag": 1, "site": key}).sort("title", 1).skip(i*2).limit(1):
+                print(f"Updating document with _id: {info['_id']}")
+                # 更新文档,设置标记
+                update_result = coll_user.update_one({"_id": info["_id"]}, {"$set": {"flag": 9}})
+                if update_result.modified_count == 0:
+                    print("No document updated for _id:", info["_id"])
+                else:
+                    print("Document updated successfully for _id:", info["_id"])
+                marked_count += 1
+
+            if marked_count >= N:
+                break  # 再次检查是否已达到目标数量
+
+    print(f"Total marked documents: {marked_count}")
+
+sample_data(1000)

Some files were not shown because too many files changed in this diff