liumiaomiao 9 kuukautta sitten
vanhempi
commit
d5dedb15a2

+ 23 - 0
lib/clickhouse_tools.py

@@ -0,0 +1,23 @@
+from clickhouse_driver import Client
+import logging
+# Configure logging
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+class ClickhouseUtil:
+    @staticmethod
+    def connect_to_clickhouse(host, port, user, password, database):
+        try:
+            client = Client(host=host, port=port, user=user, password=password, database=database)
+            return client
+        except Exception as e:
+            logger.error("Failed to connect to ClickHouse: %s", e)
+            raise
+    @staticmethod
+    def execute_sql(client, query):
+        try:
+            result = client.execute(query)
+            return result
+        except Exception as e:
+            logger.error("Failed to execute SQL query: %s", e)
+            raise

+ 11 - 10
lib/mongo_tools.py

@@ -2,12 +2,14 @@
 # -*- coding:utf-8 -*-
 # author : zhaolongyue
 #date : 2023-07-03
+from urllib.parse import quote_plus
+
 from pymongo import MongoClient
 from bson import ObjectId
 
 class MongoUtil:
     @staticmethod
-    def get_coon(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None, authpass=None):
+    def get_coon(host="192.168.3.167:27088", database=None, collection=None, authdb=None, authuser=None, authpass=None):
         """
         获取mongo数据库连接
         :param host:
@@ -23,14 +25,13 @@ class MongoUtil:
             raise RuntimeError('database is None')
         if collection is None:
             raise RuntimeError('collection is None')
-        conn = MongoClient(host, port, unicode_decode_error_handler="ignore")
-        print(conn)
-        if authdb is not None:
-            db_auth = conn[authdb]
-            db_auth.authenticate(authuser, authpass)
-        db = conn[database]
-        collection = db[collection]
-        return collection
+        username = quote_plus(authuser)
+        password = quote_plus(authpass)
+        # conn = MongoClient(host, port, database,collection,username,password,unicode_decode_error_handler="ignore",directConnection=True)
+        conn = MongoClient(f'mongodb://{username}:{password}@{host}/',
+                         unicode_decode_error_handler="ignore", directConnection=True)[database][collection]
+
+        return conn
 
 class CoonUtil:
     @staticmethod
@@ -55,7 +56,7 @@ class MongoSentence:
         if nosql is None:
             return coon.find({}).count()
         else:
-            return coon.count(nosql)
+            return coon.count_documents(nosql)
 
     @staticmethod
     def find_all(coon, columns=None):

+ 78 - 3
lib/monitor_tools.py

@@ -4,13 +4,15 @@
 #从es库中导出数据到测试环境mongo库
 from lib.es_tools import esutil
 from datetime import datetime, timedelta
-from lib.mongo_tools import Data_save
+from lib.mongo_tools import MongoUtil,Data_save,MongoSentence
+from lib.mysql_tools import MysqlUtil
+from lib.clickhouse_tools import ClickhouseUtil, logger
 
 # 定义一周的时间范围,转换为Unix时间戳格式
 end_date = int(datetime.now().timestamp())
 start_date = int((datetime.now() - timedelta(days=7)).timestamp())
 
-class EScount:
+class monitorTools:
     #标准库bidding-es 每周统计入库数量
     def es_bidding(self):
         """
@@ -122,6 +124,79 @@ class EScount:
         print("拟在建es入库数据总量:", count)
         return count
 
+    #标准库bidding-mongo 每周统计入库数量
+    def bidding(self):
+        collection = MongoUtil.get_coon(host='127.0.0.1:27088', database='qfw',collection='bidding',authuser='viewdata',authpass='viewdata')
+        query = { "comeintime": {"$gte": start_date, "$lt": end_date}}
+        count=MongoSentence.count(collection,query)
+        print("标准库bidding-mongo 每周统计入库数量",count)
+        return count
+
+    #高质量库bidding-mongo 每周统计入库数量
+    def bidding_ai(self):
+        collection = MongoUtil.get_coon(host='127.0.0.1:27088', database='qfw',collection='bidding_ai',authuser='viewdata',authpass='viewdata')
+        query = { "comeintime": {"$gte": start_date, "$lt": end_date}}
+        count=MongoSentence.count(collection,query)
+        print("高质量库bidding-mongo 每周统计入库数量",count)
+        return count
+
+    #标准库bidding-mongo碎片化数据 每周统计入库数量
+    def bidding_fragment(self):
+        collection = MongoUtil.get_coon(host='127.0.0.1:27088', database='qfw',collection='bidding',authuser='viewdata',authpass='viewdata')
+        # 定义要监控的字段值
+        tags = [
+            "情报_法务",
+            "情报_财务审计",
+            "情报_招标代理",
+            "情报_管理咨询",
+            "情报_保险",
+            "情报_工程设计咨询",
+            "情报_安防",
+            "情报_印务商机",
+            "情报_环境采购",
+            "情报_家具招投标"
+        ]
+        data={}
+        for tag in tags:
+            query = {"comeintime": {"$gte": start_date, "$lt": end_date},"tag_topinformation":tag}
+            count=MongoSentence.count(collection,query)
+            print(f"标准库bidding-mongo{tag}每周统计入库数量",count)
+            data[tag]=count
+        return data
+
+    #拟在建baseinfo-mysql 每周统计入库数量
+    def nzj(self):
+        # MySQL 数据库连接配置
+        # mysql_db_config = {
+        #     'host': '192.168.3.149',
+        #     'port': 4000,
+        #     'user': 'datagroup',
+        #     'password': 'Dgrpdb#2024@36',
+        #     'database': 'jianyu_subjectdb',
+        #     'charset': 'utf8mb4'
+        # }
+        # SQL 查询
+        mysql_query = f"SELECT COUNT(*) FROM dwd_f_nzj_baseinfo WHERE create_time >={start_date} AND create_time <={end_date}"
+        conn=MysqlUtil.connect_to_mysql(host='192.168.3.149',port='4000',user='datagroup',password='Dgrpdb#2024@36',database='jianyu_subjectdb')
+        count=MysqlUtil.execute_sql(conn,mysql_query)
+        print("拟在建baseinfo-mysql每周统计入库数量", count)
+        return count
+
+     #人脉数据,每周统计入库数量
+    def connections(self):
+        try:
+            query = f"SELECT COUNT(*) FROM information.transaction_info_all WHERE create_time >={start_date} AND create_time <={end_date}"
+            # conn=ClickhouseUtil.connect_to_clickhouse(host='192.168.3.207',port='19000',user='jytop',password='pwdTopJy123',database='information')
+            conn=ClickhouseUtil.connect_to_clickhouse(host='127.0.0.1',port='9000',user='biservice',password='Bi_top95215#',database='information')
+            count=ClickhouseUtil.execute_sql(conn,query)
+            result=count[0][0]
+            print("人脉数据每周统计入库数量", result)
+            return result
+        except Exception as e:
+            logger.error("An error occurred: %s", e)
+            raise
+
+    #统计结果入库
     def save_to_mongo(self,title,count):
         collection=Data_save.save_con(host='192.168.3.149',port=27180,database='data_quality',collection='statistics')
         now = datetime.now()
@@ -135,6 +210,6 @@ class EScount:
         Data_save.insert_one(collection,document)
 
 
-escount=EScount()
+monitor=monitorTools()
 
 

+ 29 - 0
lib/mysql_tools.py

@@ -0,0 +1,29 @@
+import mysql.connector
+class MysqlUtil:
+    @staticmethod
+    def connect_to_mysql(host,port,user,password,database):
+        # 创建数据库连接
+        connection = mysql.connector.connect(
+            host=host,  # 数据库主机地址
+            user=user,  # 数据库用户名
+            port=port,
+            password=password,  # 数据库密码
+            database=database  # 数据库名称
+        )
+        return  connection
+
+    @staticmethod
+    def execute_sql(connection,query):
+        if connection.is_connected():
+            print('Connected to MySQL database')
+            # 创建一个cursor对象,用于执行SQL语句
+            cursor = connection.cursor()
+            # 执行SQL查询
+            result=cursor.execute(query)
+            cursor.close()
+            connection.close()
+            print('MySQL connection is closed')
+            return result
+
+
+

+ 9 - 9
tools/mongo断流监控/es_monitor.py

@@ -2,20 +2,20 @@
 # -*- coding:utf-8 -*-
 # author : liumiaomiao
 
-from lib.monitor_tools import escount
+from lib.monitor_tools import monitor
 
 #es-bidding
-es_bidding_count=escount.es_bidding()
+es_bidding_count=monitor.es_bidding()
 #es-bidding_ai
-es_bidding_ai_count=escount.es_bidding_ai()
+es_bidding_ai_count=monitor.es_bidding_ai()
 #es-bidding_碎片化
-es_bidding_fragment_count=escount.es_bidding_fragment()
+es_bidding_fragment_count=monitor.es_bidding_fragment()
 #es-拟在建
-es_nzj_count=escount.es_nzj()
+es_nzj_count=monitor.es_nzj()
 #存库
-escount.save_to_mongo("es_bidding",es_bidding_count)
-escount.save_to_mongo("es_bidding_ai",es_bidding_ai_count)
-escount.save_to_mongo("es_bidding_fragment",es_bidding_fragment_count)
-escount.save_to_mongo("es_nzj",es_nzj_count)
+monitor.save_to_mongo("es_bidding",es_bidding_count)
+monitor.save_to_mongo("es_bidding_ai",es_bidding_ai_count)
+monitor.save_to_mongo("es_bidding_fragment",es_bidding_fragment_count)
+monitor.save_to_mongo("es_nzj",es_nzj_count)
 
 

+ 22 - 82
tools/mongo断流监控/mongo_monitor.py

@@ -1,82 +1,22 @@
-# -*- coding: utf-8 -*-
-
-from pymongo import MongoClient
-from urllib.parse import quote_plus
-from datetime import datetime, timedelta
-import pandas as pd
-import os
-
-# MongoDB 连接配置
-username = quote_plus("viewdata")
-password = quote_plus("viewdata")
-source_client = MongoClient(
-    f'mongodb://{username}:{password}@127.0.0.1:27088/',
-    unicode_decode_error_handler="ignore",
-    directConnection=True
-)
-source_db = source_client['qfw_ai']
-source_collection = source_db['bidding']
-
-# 定义一周的时间范围,转换为Unix时间戳格式
-end_date = int(datetime.now().timestamp())
-start_date = int((datetime.now() - timedelta(days=7)).timestamp())
-
-# 定义要监控的字段值
-tags = [
-    "情报_法务",
-    "情报_财务审计",
-    "情报_招标代理",
-    "情报_管理咨询",
-    "情报_保险",
-    "情报_工程设计咨询",
-    "情报_安防",
-    "情报_印务商机",
-    "情报_环境采购",
-    "情报_家具招投标"
-]
-
-# 查询条件
-query = {
-    "comeintime": {"$gte": start_date, "$lt": end_date},
-    "tag_topinformation": {"$in": tags}
-}
-
-# 初始化字典,将所有标签的计数设置为0
-data = {tag: 0 for tag in tags}
-
-# 统计每个标签的数量
-results = source_collection.aggregate([
-    {"$match": query},
-    {"$unwind": "$tag_topinformation"},  # 展开数组元素
-    {"$match": {"tag_topinformation": {"$in": tags}}},  # 再次匹配展开后的标签值
-    {"$group": {"_id": "$tag_topinformation", "count": {"$sum": 1}}}
-])
-
-# 更新字典中有数据的标签的数量
-for result in results:
-    print(f"标签: {result['_id']}, 计数: {result['count']}")  # 调试信息
-    data[result["_id"]] = result["count"]
-
-# 检查数据字典以确保所有标签都被更新
-print("数据字典内容:", data)  # 打印整个数据字典
-
-# 创建DataFrame
-date_range = f"{datetime.fromtimestamp(start_date).strftime('%Y/%m/%d')}-{datetime.fromtimestamp(end_date).strftime('%Y/%m/%d')}"
-df = pd.DataFrame([data], index=[date_range])
-
-# Excel文件路径
-output_file = "weekly_data_statistics.xlsx"
-
-# 检查文件是否存在,不存在则创建
-if not os.path.exists(output_file):
-    df.to_excel(output_file, sheet_name="Weekly Statistics", index_label="日期")
-else:
-    with pd.ExcelWriter(output_file, mode="a", engine="openpyxl", if_sheet_exists="overlay") as writer:
-        existing_df = pd.read_excel(output_file, sheet_name="Weekly Statistics", index_col=0)
-        if date_range in existing_df.index:
-            existing_df.update(df)  # 更新已有行
-        else:
-            existing_df = pd.concat([existing_df, df])  # 添加新行
-        existing_df.to_excel(writer, sheet_name="Weekly Statistics", index_label="日期")
-
-print(f"统计结果已保存到 {output_file}")
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# author : liumiaomiao
+
+from lib.monitor_tools import monitor
+
+#标准库-bidding
+bidding_count=monitor.bidding()
+#高质量库-mongo
+bidding_ai_count=monitor.bidding_ai()
+#nzj-mysql
+nzj_count=monitor.nzj()
+#bidding_碎片化
+bidding_fragment_count=monitor.bidding_fragment()
+#人脉clickhouse数据
+connections_count=monitor.connections()
+#存库
+monitor.save_to_mongo("bidding",bidding_count)
+monitor.save_to_mongo("bidding_ai",bidding_ai_count)
+monitor.save_to_mongo("bidding_fragment",bidding_fragment_count)
+monitor.save_to_mongo("nzj",nzj_count)
+monitor.save_to_mongo('connections',connections_count)