Data_synchronization.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. from pymongo import MongoClient
  2. from bson import ObjectId # 导入 ObjectId
  3. import pymysql
  4. from lib.mogodb_helper import MongoDBInterface
  5. # MongoDB 配置信息
  6. MongodbConfigLocal = {
  7. "ip_port": "127.0.0.1:27088",
  8. "user": "viewdata",
  9. "password": "viewdata",
  10. "db": "qfw",
  11. "col": "bidding" # 替换为实际集合名称
  12. }
  13. # MySQL 配置信息
  14. mysql_config = {
  15. "host": "192.168.3.217",
  16. "user": "root",
  17. "password": "=PDT49#80Z!RVv52_z",
  18. "database": "quality",
  19. "port": 4000
  20. }
  21. # 字段映射
  22. field_mapping = {
  23. "toptype": "toptype_ai",
  24. "subtype": "subtype_ai",
  25. "area": "area_ai",
  26. "city": "city_ai",
  27. "buyer": "buyer_ai",
  28. "projectname": "projectname_ai",
  29. "projectcode": "projectcode_ai",
  30. "budget": "budget_ai",
  31. "s_winner": "s_winner_ai",
  32. "bidamount": "bidamount_ai",
  33. "multipackage": "multipackage_ai"
  34. }
  35. def main():
  36. # 实例化 MongoDBInterface
  37. mongo_db_interface = MongoDBInterface(MongodbConfigLocal)
  38. # 使用 MySQL 的 with 语句管理连接
  39. with pymysql.connect(
  40. host=mysql_config["host"],
  41. port=mysql_config["port"],
  42. user=mysql_config["user"],
  43. password=mysql_config["password"],
  44. database=mysql_config["database"]
  45. ) as mysql_conn:
  46. with mysql_conn.cursor() as mysql_cursor:
  47. # 从 MySQL 中读取 _id 列表
  48. mysql_cursor.execute("SELECT _id FROM bid_llizhikun")
  49. ids = mysql_cursor.fetchall()
  50. for (_id,) in ids:
  51. # 将 _id 转换为 ObjectId 类型
  52. try:
  53. object_id = ObjectId(_id)
  54. except Exception as e:
  55. print(f"Invalid ObjectId: {_id}, skipping. Error: {e}")
  56. continue
  57. # 使用 MongoDBInterface 的 find_by_id 方法从 MongoDB 查询数据
  58. mongo_data = mongo_db_interface.find_by_id(MongodbConfigLocal["col"], object_id)
  59. if not mongo_data:
  60. continue
  61. # 构造更新数据
  62. update_fields = {field_mapping[key]: mongo_data.get(key, None) for key in field_mapping}
  63. # 构造更新 SQL
  64. update_sql = f"""
  65. UPDATE bid_llizhikun
  66. SET {", ".join([f"{field} = %s" for field in update_fields.keys()])}
  67. WHERE _id = %s
  68. """
  69. update_values = list(update_fields.values()) + [_id]
  70. # 执行更新操作
  71. mysql_cursor.execute(update_sql, update_values)
  72. mysql_conn.commit()
  73. if __name__ == "__main__":
  74. main()