from pymongo import MongoClient from bson import ObjectId # 导入 ObjectId import pymysql from lib.mogodb_helper import MongoDBInterface # MongoDB 配置信息 MongodbConfigLocal = { "ip_port": "127.0.0.1:27088", "user": "viewdata", "password": "viewdata", "db": "qfw", "col": "bidding" # 替换为实际集合名称 } # MySQL 配置信息 mysql_config = { "host": "192.168.3.217", "user": "root", "password": "=PDT49#80Z!RVv52_z", "database": "quality", "port": 4000 } # 字段映射 field_mapping = { "toptype": "toptype_ai", "subtype": "subtype_ai", "area": "area_ai", "city": "city_ai", "buyer": "buyer_ai", "projectname": "projectname_ai", "projectcode": "projectcode_ai", "budget": "budget_ai", "s_winner": "s_winner_ai", "bidamount": "bidamount_ai", "multipackage": "multipackage_ai" } def main(): # 实例化 MongoDBInterface mongo_db_interface = MongoDBInterface(MongodbConfigLocal) # 使用 MySQL 的 with 语句管理连接 with pymysql.connect( host=mysql_config["host"], port=mysql_config["port"], user=mysql_config["user"], password=mysql_config["password"], database=mysql_config["database"] ) as mysql_conn: with mysql_conn.cursor() as mysql_cursor: # 从 MySQL 中读取 _id 列表 mysql_cursor.execute("SELECT _id FROM bid_llizhikun") ids = mysql_cursor.fetchall() for (_id,) in ids: # 将 _id 转换为 ObjectId 类型 try: object_id = ObjectId(_id) except Exception as e: print(f"Invalid ObjectId: {_id}, skipping. Error: {e}") continue # 使用 MongoDBInterface 的 find_by_id 方法从 MongoDB 查询数据 mongo_data = mongo_db_interface.find_by_id(MongodbConfigLocal["col"], object_id) if not mongo_data: continue # 构造更新数据 update_fields = {field_mapping[key]: mongo_data.get(key, None) for key in field_mapping} # 构造更新 SQL update_sql = f""" UPDATE bid_llizhikun SET {", ".join([f"{field} = %s" for field in update_fields.keys()])} WHERE _id = %s """ update_values = list(update_fields.values()) + [_id] # 执行更新操作 mysql_cursor.execute(update_sql, update_values) mysql_conn.commit() if __name__ == "__main__": main()