Browse Source

文件导入mysql,monggo同步mysql

lizhikun 6 months ago
parent
commit
80b2788f2a

+ 85 - 0
tools/mongo同步至mysql/Data_synchronization.py

@@ -0,0 +1,85 @@
+from pymongo import MongoClient
+from bson import ObjectId  # 导入 ObjectId
+import pymysql
+from lib.mogodb_helper import MongoDBInterface
+
+# MongoDB 配置信息
+MongodbConfigLocal = {
+    "ip_port": "127.0.0.1:27088",
+    "user": "viewdata",
+    "password": "viewdata",
+    "db": "qfw",
+    "col": "bidding"  # 替换为实际集合名称
+}
+
+# MySQL 配置信息
+mysql_config = {
+    "host": "192.168.3.217",
+    "user": "root",
+    "password": "=PDT49#80Z!RVv52_z",
+    "database": "quality",
+    "port": 4000
+}
+
+# 字段映射
+field_mapping = {
+    "toptype": "toptype_ai",
+    "subtype": "subtype_ai",
+    "area": "area_ai",
+    "city": "city_ai",
+    "buyer": "buyer_ai",
+    "projectname": "projectname_ai",
+    "projectcode": "projectcode_ai",
+    "budget": "budget_ai",
+    "s_winner": "s_winner_ai",
+    "bidamount": "bidamount_ai",
+    "multipackage": "multipackage_ai"
+}
+
+def main():
+    # 实例化 MongoDBInterface
+    mongo_db_interface = MongoDBInterface(MongodbConfigLocal)
+
+    # 使用 MySQL 的 with 语句管理连接
+    with pymysql.connect(
+            host=mysql_config["host"],
+            port=mysql_config["port"],
+            user=mysql_config["user"],
+            password=mysql_config["password"],
+            database=mysql_config["database"]
+    ) as mysql_conn:
+        with mysql_conn.cursor() as mysql_cursor:
+            # 从 MySQL 中读取 _id 列表
+            mysql_cursor.execute("SELECT _id FROM bid_llizhikun")
+            ids = mysql_cursor.fetchall()
+
+            for (_id,) in ids:
+                # 将 _id 转换为 ObjectId 类型
+                try:
+                    object_id = ObjectId(_id)
+                except Exception as e:
+                    print(f"Invalid ObjectId: {_id}, skipping. Error: {e}")
+                    continue
+
+                # 使用 MongoDBInterface 的 find_by_id 方法从 MongoDB 查询数据
+                mongo_data = mongo_db_interface.find_by_id(MongodbConfigLocal["col"], object_id)
+                if not mongo_data:
+                    continue
+
+                # 构造更新数据
+                update_fields = {field_mapping[key]: mongo_data.get(key, None) for key in field_mapping}
+
+                # 构造更新 SQL
+                update_sql = f"""
+                UPDATE bid_llizhikun
+                SET {", ".join([f"{field} = %s" for field in update_fields.keys()])}
+                WHERE _id = %s
+                """
+                update_values = list(update_fields.values()) + [_id]
+
+                # 执行更新操作
+                mysql_cursor.execute(update_sql, update_values)
+                mysql_conn.commit()
+
+if __name__ == "__main__":
+    main()

+ 58 - 0
tools/从excle导入mysql/File_import_mysql.py

@@ -0,0 +1,58 @@
+import pandas as pd
+import pymysql
+import logging
+
+# 设置日志记录
+logging.basicConfig(filename='insert_errors.log', level=logging.ERROR, format='%(asctime)s - %(message)s')
+
+# 读取Excel文件
+data = pd.read_excel('标准样本数据汇总.xlsx', engine='openpyxl', keep_default_na=False, na_values=[''])
+
+# 替换空值为None
+data = data.replace({pd.NA: None, pd.NaT: None, float('nan'): None})
+
+# 连接数据库
+try:
+    connection = pymysql.connect(
+        host='192.168.3.217',
+        user='root',
+        password='=PDT49#80Z!RVv52_z',
+        database='quality',
+        port=4000,
+        connect_timeout=60
+    )
+    cursor = connection.cursor()
+
+    sql = """
+    INSERT INTO bid_llizhikun 
+    (_id, site, toptype, subtype, area, city, buyer, projectname, projectcode, budget, 
+    s_winner, bidamount, multipackage, label, href, jyhref) 
+    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+    """
+
+    # 批量插入
+    batch_size = 100
+    total_inserted = 0
+
+    for i in range(0, len(data), batch_size):
+        batch_data = data.iloc[i:i + batch_size]
+        try:
+            cursor.executemany(sql, [tuple(row) for _, row in batch_data.iterrows()])
+            connection.commit()
+            total_inserted += len(batch_data)
+            print(f"成功插入 {len(batch_data)} 行, 总计: {total_inserted}")
+        except Exception as e:
+            logging.error(f"插入错误 (批次 {i}-{i + batch_size}): {e}")
+            logging.error(f"失败的数据: {batch_data}")
+            connection.rollback()
+
+    print(f"数据插入完成: 成功插入 {total_inserted} 行")
+
+except Exception as e:
+    logging.error(f"数据库连接失败: {e}")
+
+finally:
+    if 'cursor' in locals():
+        cursor.close()
+    if 'connection' in locals():
+        connection.close()