12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- from pymongo import MongoClient
- from bson import ObjectId # 导入 ObjectId
- import pymysql
- from lib.mogodb_helper import MongoDBInterface
- # MongoDB 配置信息
- MongodbConfigLocal = {
- "ip_port": "127.0.0.1:27088",
- "user": "viewdata",
- "password": "viewdata",
- "db": "qfw",
- "col": "bidding" # 替换为实际集合名称
- }
- # MySQL 配置信息
- mysql_config = {
- "host": "192.168.3.217",
- "user": "root",
- "password": "=PDT49#80Z!RVv52_z",
- "database": "quality",
- "port": 4000
- }
- # 字段映射
- field_mapping = {
- "toptype": "toptype_ai",
- "subtype": "subtype_ai",
- "area": "area_ai",
- "city": "city_ai",
- "buyer": "buyer_ai",
- "projectname": "projectname_ai",
- "projectcode": "projectcode_ai",
- "budget": "budget_ai",
- "s_winner": "s_winner_ai",
- "bidamount": "bidamount_ai",
- "multipackage": "multipackage_ai"
- }
- def main():
- # 实例化 MongoDBInterface
- mongo_db_interface = MongoDBInterface(MongodbConfigLocal)
- # 使用 MySQL 的 with 语句管理连接
- with pymysql.connect(
- host=mysql_config["host"],
- port=mysql_config["port"],
- user=mysql_config["user"],
- password=mysql_config["password"],
- database=mysql_config["database"]
- ) as mysql_conn:
- with mysql_conn.cursor() as mysql_cursor:
- # 从 MySQL 中读取 _id 列表
- mysql_cursor.execute("SELECT _id FROM bid_llizhikun")
- ids = mysql_cursor.fetchall()
- for (_id,) in ids:
- # 将 _id 转换为 ObjectId 类型
- try:
- object_id = ObjectId(_id)
- except Exception as e:
- print(f"Invalid ObjectId: {_id}, skipping. Error: {e}")
- continue
- # 使用 MongoDBInterface 的 find_by_id 方法从 MongoDB 查询数据
- mongo_data = mongo_db_interface.find_by_id(MongodbConfigLocal["col"], object_id)
- if not mongo_data:
- continue
- # 构造更新数据
- update_fields = {field_mapping[key]: mongo_data.get(key, None) for key in field_mapping}
- # 构造更新 SQL
- update_sql = f"""
- UPDATE bid_llizhikun
- SET {", ".join([f"{field} = %s" for field in update_fields.keys()])}
- WHERE _id = %s
- """
- update_values = list(update_fields.values()) + [_id]
- # 执行更新操作
- mysql_cursor.execute(update_sql, update_values)
- mysql_conn.commit()
- if __name__ == "__main__":
- main()
|