Python实现将MongoDB中的数据导入到MySQL

作者:小小鸟爱吃辣条 时间:2024-01-21 04:41:01 

本文主要介绍了一个将 MongoDB 中的数据导入到 MySQL 中的 Python 工具类 MongoToMysql。该工具类实现了获取 MongoDB 数据类型、创建 MySQL 表结构以及将数据从 MongoDB 推送到 MySQL 等功能。

通过该工具类,用户可以轻松地将 MongoDB 中的数据导入到 MySQL 中,实现数据的转移和使用。

使用该工具类,用户需要传入相应的参数,包括 MongoDB 的连接信息,MySQL 的连接信息,以及表名、是否设置最大长度、批处理大小和表描述等信息。具体使用可以参考代码中的注释。

实现代码

import pymysql
from loguru import logger

class MongoToMysql:
   def __init__(self, mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port, mysql_user,
                mysql_password, mysql_db,table_name=None,set_max_length=False,batch_size=10000,table_description=''):
       self.mongo_host = mongo_host
       self.mongo_port = mongo_port
       self.mongo_db = mongo_db
       self.mongo_collection = mongo_collection
       self.mysql_host = mysql_host
       self.mysql_port = mysql_port
       self.mysql_user = mysql_user
       self.mysql_password = mysql_password
       self.mysql_db = mysql_db
       self.table_name = table_name
       self.set_max_length = set_max_length
       self.batch_size = batch_size
       self.table_description = table_description
       self.data_types = self.get_mongo_data_types()
       self.create_mysql_table(self.data_types,set_max_length= self.set_max_length,table_description=self.table_description)
       self.push_data_to_mysql(self.batch_size)

def get_mongo_data_types(self):
       logger.debug('正在获取mongo中字段的类型!')
       client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
       db = client[self.mongo_db]
       collection = db[self.mongo_collection]
       data_types = {}
       for field in collection.find_one().keys():
           data_types[field] = type(collection.find_one()[field]).__name__
       return data_types

def check_mysql_table_exists(self):
       logger.debug('检查是否存在该表,有则删之!')
       conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
                              password=self.mysql_password, db=self.mysql_db)
       cursor = conn.cursor()
       sql = f"DROP TABLE IF EXISTS {self.mongo_collection}"
       cursor.execute(sql)
       conn.commit()
       conn.close()

def get_max_length(self, field):
       logger.debug(f'正在获取字段 {field} 最大长度......')
       client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
       db = client[self.mongo_db]
       collection = db[self.mongo_collection]
       max_length = 0
       for item in collection.find({},{field:1,'_id':0}):
           value = item.get(field)
           if isinstance(value, str) and len(value) > max_length:
               max_length = len(value)
       return max_length

def create_mysql_table(self, data_types,set_max_length,table_description):
       logger.debug('正在mysql中创建表结构!')
       self.check_mysql_table_exists()
       conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
                              password=self.mysql_password, db=self.mysql_db)
       cursor = conn.cursor()
       if self.table_name:
           table_name = self.table_name
       else:
           table_name = self.mongo_collection
       fields = []
       for field, data_type in data_types.items():
           if data_type == 'int':
               fields.append(f"{field} INT")
           elif data_type == 'float':
               fields.append(f"{field} FLOAT")
           elif data_type == 'bool':
               fields.append(f"{field} BOOLEAN")
           else:
               if set_max_length:
                   fields.append(f"{field} TEXT)")
               else:
                   max_length = self.get_max_length(field)
                   fields.append(f"{field} VARCHAR({max_length + 200})")
       fields_str = ','.join(fields)
       sql = f"CREATE TABLE {table_name} (id INT PRIMARY KEY AUTO_INCREMENT,{fields_str}) COMMENT='{table_description}'"
       cursor.execute(sql)
       conn.commit()
       conn.close()

def push_data_to_mysql(self, batch_size=10000):
       logger.debug('--- 正在准备从mongo中每次推送10000条数据到mysql ----')
       client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
       db = client[self.mongo_db]
       collection = db[self.mongo_collection]
       conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
                              password=self.mysql_password, db=self.mysql_db)
       cursor = conn.cursor()
       if self.table_name:
           table_name = self.table_name
       else:
           table_name = self.mongo_collection
       # table_name = self.mongo_collection
       data = []
       count = 0
       for item in collection.find():
           count += 1
           row = []
           for field, data_type in self.data_types.items():
               value = item.get(field)
               if value is None:
                   row.append(None)
               elif data_type == 'int':
                   row.append(str(item.get(field, 0)))
               elif data_type == 'float':
                   row.append(str(item.get(field, 0.0)))
               elif data_type == 'bool':
                   row.append(str(item.get(field, False)))
               else:
                   row.append(str(item.get(field, '')))
           data.append(row)
           if count % batch_size == 0:
               placeholders = ','.join(['%s'] * len(data[0]))
               sql = f"INSERT INTO {table_name} VALUES (NULL,{placeholders})"
               cursor.executemany(sql, data)
               conn.commit()
               data = []
               logger.debug(f'--- 已完成推送:{count} 条数据! ----')
       if data:
           placeholders = ','.join(['%s'] * len(data[0]))
           sql = f"INSERT INTO {table_name} VALUES (NULL,{placeholders})"
           cursor.executemany(sql, data)
           conn.commit()
           logger.debug(f'--- 已完成推送:{count} 条数据! ----')
       conn.close()

if __name__ == '__main__':
   """MySQL"""
   mongo_host = '127.0.0.1'
   mongo_port = 27017
   mongo_db = 'db_name'
   mongo_collection = 'collection_name'

"""MongoDB"""
   mysql_host = '127.0.0.1'
   mysql_port = 3306
   mysql_user = 'root'
   mysql_password = '123456'
   mysql_db = 'mysql_db'

table_description = ''  # 表描述

mongo_to_mysql = MongoToMysql(mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port,
                                 mysql_user, mysql_password, mysql_db,table_description=table_description)

#
   # table_name = None  # 默认为None 则MySQL中的表名跟Mongo保持一致
   # set_max_length = False # 默认为False 根据mongo中字段最大长度 加200 来设置字段的VARCHART长度 , 否则定义TEXT类型
   # batch_size = 10000   # 控制每次插入数据量的大小
   # table_description = '' # 表描述
   # mongo_to_mysql = MongoToMysql(mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port,
   #                               mysql_user, mysql_password, mysql_db,table_name,set_max_length,batch_size,table_description)

代码使用了 PyMongo、PyMySQL 和 Loguru 等 Python 库,并封装了一个 MongoToMysql 类。在类的初始化时,会自动获取 MongoDB 中字段的类型,并根据字段类型创建 MySQL 表结构。在将数据从 MongoDB 推送到 MySQL 时,还可以控制每次插入数据量的大小,以避免一次性插入大量数据造成系统崩溃或性能下降。

需要注意的是,在创建 MySQL 表结构时,如果用户选择了设置最大长度,则会创建 TEXT 类型的字段,否则会根据 MongoDB 中字段的最大长度加上200来设置 VARCHAR 类型的字段长度。

总之,本文介绍的 MongoToMysql 工具类非常方便实用,对于需要将 MongoDB 数据迁移到 MySQL 的用户来说,是一种很好的解决方案。

来源:https://juejin.cn/post/7229152434801721402

标签:Python,MongoDB,MySQL
0
投稿

猜你喜欢

  • php完全过滤HTML,JS,CSS等标签

    2023-10-09 08:07:34
  • vue绑定class与行间样式style详解

    2024-05-08 10:12:12
  • 详解pandas.DataFrame中删除包涵特定字符串所在的行

    2023-08-23 23:37:45
  • MySQL 管理

    2024-01-13 14:42:43
  • Python OpenGL绘制一场烟花盛会

    2021-02-08 06:12:44
  • Windows 下 MySQL 8.X 的安装教程

    2024-01-14 03:05:33
  • Win8.1下安装Python3.6提示0x80240017错误的解决方法

    2021-12-24 08:36:51
  • PHP字符编码问题之GB2312 VS UTF-8解决方法

    2024-04-29 13:57:28
  • 基于javascript实现九宫格大转盘效果

    2024-04-17 10:33:13
  • python中有帮助函数吗

    2021-06-15 15:50:02
  • python每5分钟从kafka中提取数据的例子

    2022-05-15 16:35:52
  • 分享Bootstrap简单表格、表单、登录页面

    2024-04-10 13:50:42
  • 利用pyinstaller打包exe文件的基本教程

    2022-06-09 12:32:13
  • pytorch 计算ConvTranspose1d输出特征大小方式

    2021-08-14 10:16:16
  • pandas读取CSV文件时查看修改各列的数据类型格式

    2023-09-26 16:44:58
  • 使用python求解二次规划的问题

    2022-05-15 21:40:59
  • pycharm 使用心得(二)设置字体大小

    2022-02-19 15:37:04
  • 总结python 三种常见的内存泄漏场景

    2023-02-18 16:37:49
  • Linux下使用Jenkins自动化构建.NET Core应用

    2024-05-13 09:16:36
  • 数据库查询的分页优化技巧

    2009-05-17 10:31:00
  • asp之家 网络编程 m.aspxhome.com