利用MySqlBulkLoader实现批量插入数据的示例详解

作者:姚小丹 时间:2024-01-24 08:46:00 

介绍

最近在项目中遇到插入数据瓶颈,几万、几十万、几百万的数据保存到MYSQL数据库,使用EF插入数据速度非常慢,数据量非常大时EF插入需要几十分钟,甚至几个小时,这样子的速度肯定不是我们所期望的。

后面经过了解与研究发现MySqlBulkLoader,可以批量将数据插入到数据库并且速度上面远远优于EF。

MySqlBulkLoader主要的实现方式:将需要插入的数据转成DataTable,DataTable转成一个CSV文件,将CSV文件使用批量导入的形式导入到数据库里面去。

注意:

1).数据库连接地址需要添加配置AllowLoadLocalInfile=true,允许本地文件导入;

Data Source = 数据库地址; Port = 端口; Initial Catalog = 数据库名; User Id = 用户名; Password = 密码;AllowLoadLocalInfile=true;

2).插入的时候会返回插入行数,但是检查所有的数据都正确,也没有报异常,却返回了插入数量为0,可以检查表是否有唯一索引,插入的数据是否违反了唯一索引

(以下分块展示了代码,如果需要看完整的代码直接看 5.完整的代码) 

1.将List转化为DataTable 

/// <summary>
       /// 将List转化为DataTable
       /// </summary>
       /// <returns></returns>
       public DataTable ListToDataTable<T>(List<T> data)
       {
           #region 创建一个DataTable,以实体名称作为DataTable名称

var tableName = typeof(T).Name;
           tableName = tableName.ToSnakeCase(); /*实体名称与表名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/
           DataTable dt = new DataTable
           {
               TableName = tableName
           };

#endregion

#region 拿取列名,以实体的属性名作为列名      

var properties = typeof(T).GetProperties();
           foreach (var item in properties)
           {
               var curFileName = item.Name;
               curFileName = curFileName.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/
               dt.Columns.Add(curFileName);
           }

#endregion

#region 列赋值
           foreach (var item in data)
           {
               DataRow dr = dt.NewRow();
               var columns = dt.Columns;

var curPropertyList = item.GetType().GetProperties();
               foreach (var p in curPropertyList)
               {
                   var name = p.Name;
                   name = name.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/
                   var curValue = p.GetValue(item);

int i = columns.IndexOf(name);
                   dr[i] = curValue;
               }

dt.Rows.Add(dr);
           }

#endregion  

return dt;
       }

2.将DataTable转换为标准的CSV文件 

/// <summary>
   /// csv扩展
   /// </summary>
   public static class CSVEx
   {
       /// <summary>
       ///将DataTable转换为标准的CSV文件
       /// </summary>
       /// <param name="table">数据表</param>
       /// <param name="tmpPath">文件地址</param>
       /// <returns>返回标准的CSV</returns>
       public static void ToCsv(this DataTable table, string tmpPath)
       {
           //以半角逗号(即,)作分隔符,列为空也要表达其存在。
           //列内容如存在半角逗号(即,)则用半角引号(即"")将该字段值包含起来。
           //列内容如存在半角引号(即")则应替换成半角双引号("")转义,并用半角引号(即"")将该字段值包含起来。
           StringBuilder sb = new StringBuilder();
           DataColumn colum;
           foreach (DataRow row in table.Rows)
           {
               for (int i = 0; i < table.Columns.Count; i++)
               {
                   Type _datatype = typeof(DateTime);
                   colum = table.Columns[i];
                   if (i != 0) sb.Append("\t");
                   //if (colum.DataType == typeof(string) && row[colum].ToString().Contains(","))
                   //{
                   //    sb.Append("\"" + row[colum].ToString().Replace("\"", "\"\"") + "\"");
                   //}
                   if (colum.DataType == _datatype)
                   {
                       sb.Append(((DateTime)row[colum]).ToString("yyyy/MM/dd HH:mm:ss"));
                   }
                   else sb.Append(row[colum].ToString());
               }
               sb.Append("\r\n");
           }
           StreamWriter sw = new StreamWriter(tmpPath, false, UTF8Encoding.UTF8);
           sw.Write(sb.ToString());
           sw.Close();
       }

}

3.CSV文件导入数据到数据库

/// <summary>
   /// 批量导入mysql帮助类
   /// </summary>
   public static class MySqlHelper
   {
       /// <summary>
       /// MySqlBulkLoader批量导入
       /// </summary>
       /// <param name="_mySqlConnection">数据库连接地址</param>
       /// <param name="table"></param>
       /// <param name="csvName"></param>
       /// <returns></returns>
       public static int BulkLoad(MySqlConnection _mySqlConnection, DataTable table, string csvName)
       {
           var columns = table.Columns.Cast<DataColumn>().Select(colum => colum.ColumnName).ToList();
           MySqlBulkLoader bulk = new MySqlBulkLoader(_mySqlConnection)
           {
               FieldTerminator = "\t",
               FieldQuotationCharacter = '"',
               EscapeCharacter = '"',
               LineTerminator = "\r\n",
               FileName = csvName,
               NumberOfLinesToSkip = 0,
               TableName = table.TableName,

};

bulk.Columns.AddRange(columns);
           return bulk.Load();
       }
   }

4.使用MySqlBulkLoader批量插入数据

/// <summary>
       /// 使用MySqlBulkLoader批量插入数据
       /// </summary>
       /// <typeparam name="T"></typeparam>
       /// <param name="data"></param>
       /// <returns></returns>
       /// <exception cref="Exception"></exception>
       public int BulkLoaderData<T>(List<T> data)
       {
           if (data.Count <= 0) return 0;

var connectString = "数据库连接地址";
           using (MySqlConnection connection = new MySqlConnection(connectString))
           {
               MySqlTransaction sqlTransaction = null;
               try
               {
                   if (connection.State == ConnectionState.Closed)
                   {
                       connection.Open();
                   }
                   sqlTransaction = connection.BeginTransaction();

var dt = ListToDataTable<T>(data); //将List转成dataTable
                   string tmpPath = Path.GetTempFileName();
                   dt.ToCsv(tmpPath); //将DataTable转成CSV文件
                   var insertCount = MySqlHelper.BulkLoad(connection, dt, tmpPath); //使用MySqlBulkLoader插入数据
                   sqlTransaction.Commit();

try
                   {
                       if (File.Exists(tmpPath)) File.Delete(tmpPath);
                   }
                   catch (Exception)
                   {
                       //删除文件失败

}
                   return insertCount; //返回执行成功的条数
               }
               catch (Exception e)
               {
                   if (sqlTransaction != null)
                   {
                       sqlTransaction.Rollback();
                   }
                   //执行异常
                   throw e;
               }
           }

}

5.完整的代码

namespace WebApplication1.BrantchInsert
{

/// <summary>
   /// 批量插入
   /// </summary>
   public class BulkLoader
   {

/// <summary>
       /// 测试批量插入入口
       /// </summary>
       /// <returns></returns>
       public int BrantchDataTest()
       {

#region 模拟数据
           var data = new List<CrmCouponTestDto>() {
                new CrmCouponTestDto {
                    Id=1,
                    CouponCode="test001",
                    CouponId = 1,
                    MemberId=100,
                    IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"),
                    UsageTime=Convert.ToDateTime("3000-12-31 00:00:00"),
                    UsageShopId=0,
                    UsageBillNo="",
                    EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"),
                    EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"),
                    Status=0
                },
                new CrmCouponTestDto {
                    Id=2,
                    CouponCode="test002",
                    CouponId = 1,
                      MemberId=101,
                    IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"),
                    UsageTime=Convert.ToDateTime("2022-06-27 14:30:00"),
                    UsageShopId=2,
                    UsageBillNo="CS202206271430001",
                    EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"),
                    EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"),
                    Status=1
                },
                 new CrmCouponTestDto {
                    Id=3,
                    CouponCode="test003",
                    CouponId = 1,
                    MemberId=102,
                    IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"),
                    UsageTime=Convert.ToDateTime("3000-12-31 00:00:00"),
                    UsageShopId=0,
                    UsageBillNo="",
                    EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"),
                    EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"),
                    Status=0
                },
                   new CrmCouponTestDto {
                    Id=4,
                    CouponCode="test004",
                    CouponId = 1,
                    MemberId=103,
                    IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"),
                    UsageTime=Convert.ToDateTime("3000-12-31 00:00:00"),
                    UsageShopId=0,
                    UsageBillNo="",
                    EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"),
                    EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"),
                    Status=0
                }
            };
           #endregion
           var result = BulkLoaderData<CrmCouponTestDto>(data);
           return result;

}

/// <summary>
       /// 使用MySqlBulkLoader批量插入数据
       /// </summary>
       /// <typeparam name="T"></typeparam>
       /// <param name="data"></param>
       /// <returns></returns>
       /// <exception cref="Exception"></exception>
       public int BulkLoaderData<T>(List<T> data)
       {
           if (data.Count <= 0) return 0;

var connectString = "数据库连接地址";
           using (MySqlConnection connection = new MySqlConnection(connectString))
           {
               MySqlTransaction sqlTransaction = null;
               try
               {
                   if (connection.State == ConnectionState.Closed)
                   {
                       connection.Open();
                   }
                   sqlTransaction = connection.BeginTransaction();

var dt = ListToDataTable<T>(data); //将List转成dataTable
                   string tmpPath = Path.GetTempFileName();
                   dt.ToCsv(tmpPath); //将DataTable转成CSV文件
                   var insertCount = MySqlHelper.BulkLoad(connection, dt, tmpPath); //使用MySqlBulkLoader插入数据
                   sqlTransaction.Commit();

try
                   {
                       if (File.Exists(tmpPath)) File.Delete(tmpPath);
                   }
                   catch (Exception)
                   {
                       //删除文件失败

}
                   return insertCount; //返回执行成功的条数
               }
               catch (Exception e)
               {
                   if (sqlTransaction != null)
                   {
                       sqlTransaction.Rollback();
                   }
                   //执行异常
                   throw e;
               }
           }

}

/// <summary>
       /// 将List转化为DataTable核心方法
       /// </summary>
       /// <returns></returns>
       public DataTable ListToDataTable<T>(List<T> data)
       {
           #region 创建一个DataTable,以实体名称作为DataTable名称

var tableName = typeof(T).Name;
           tableName = tableName.ToSnakeCase(); /*实体名称与表名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/
           DataTable dt = new DataTable
           {
               TableName = tableName
           };

#endregion

#region 拿取列名,以实体的属性名作为列名      

var properties = typeof(T).GetProperties();
           foreach (var item in properties)
           {
               var curFileName = item.Name;
               curFileName = curFileName.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/
               dt.Columns.Add(curFileName);
           }

#endregion

#region 列赋值
           foreach (var item in data)
           {
               DataRow dr = dt.NewRow();
               var columns = dt.Columns;

var curPropertyList = item.GetType().GetProperties();
               foreach (var p in curPropertyList)
               {
                   var name = p.Name;
                   name = name.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/
                   var curValue = p.GetValue(item);

int i = columns.IndexOf(name);
                   dr[i] = curValue;
               }

dt.Rows.Add(dr);
           }

#endregion  

return dt;
       }

}

/// <summary>
   /// 批量导入mysql帮助类
   /// </summary>
   public static class MySqlHelper
   {
       /// <summary>
       /// MySqlBulkLoader批量导入
       /// </summary>
       /// <param name="_mySqlConnection">数据库连接地址</param>
       /// <param name="table"></param>
       /// <param name="csvName"></param>
       /// <returns></returns>
       public static int BulkLoad(MySqlConnection _mySqlConnection, DataTable table, string csvName)
       {
           var columns = table.Columns.Cast<DataColumn>().Select(colum => colum.ColumnName).ToList();
           MySqlBulkLoader bulk = new MySqlBulkLoader(_mySqlConnection)
           {
               FieldTerminator = "\t",
               FieldQuotationCharacter = '"',
               EscapeCharacter = '"',
               LineTerminator = "\r\n",
               FileName = csvName,
               NumberOfLinesToSkip = 0,
               TableName = table.TableName,

};

bulk.Columns.AddRange(columns);
           return bulk.Load();
       }
   }

/// <summary>
   /// csv扩展
   /// </summary>
   public static class CSVEx
   {
       /// <summary>
       ///将DataTable转换为标准的CSV文件
       /// </summary>
       /// <param name="table">数据表</param>
       /// <param name="tmpPath">文件地址</param>
       /// <returns>返回标准的CSV</returns>
       public static void ToCsv(this DataTable table, string tmpPath)
       {
           //以半角逗号(即,)作分隔符,列为空也要表达其存在。
           //列内容如存在半角逗号(即,)则用半角引号(即"")将该字段值包含起来。
           //列内容如存在半角引号(即")则应替换成半角双引号("")转义,并用半角引号(即"")将该字段值包含起来。
           StringBuilder sb = new StringBuilder();
           DataColumn colum;
           foreach (DataRow row in table.Rows)
           {
               for (int i = 0; i < table.Columns.Count; i++)
               {
                   Type _datatype = typeof(DateTime);
                   colum = table.Columns[i];
                   if (i != 0) sb.Append("\t");
                   //if (colum.DataType == typeof(string) && row[colum].ToString().Contains(","))
                   //{
                   //    sb.Append("\"" + row[colum].ToString().Replace("\"", "\"\"") + "\"");
                   //}
                   if (colum.DataType == _datatype)
                   {
                       sb.Append(((DateTime)row[colum]).ToString("yyyy/MM/dd HH:mm:ss"));
                   }
                   else sb.Append(row[colum].ToString());
               }
               sb.Append("\r\n");
           }
           StreamWriter sw = new StreamWriter(tmpPath, false, UTF8Encoding.UTF8);
           sw.Write(sb.ToString());
           sw.Close();
       }

}

/// <summary>
   /// 字符串转化
   /// </summary>
   public static class StringExtensions
   {
       /// <summary>
       /// 转换为 main_keys_id 这种形式的字符串方式
       /// </summary>
       public static string ToSnakeCase(this string input)
       {
           if (string.IsNullOrEmpty(input)) { return input; }

var startUnderscores = Regex.Match(input, @"^_+");
           return startUnderscores + Regex.Replace(input, @"([a-z0-9])([A-Z])", "$1_$2").ToLower();
       }
   }

/// <summary>
   /// 实体
   /// </summary>
   public class CrmCouponTestDto
   {
       /// <summary>
       /// ID
       /// </summary>
       public long Id { get; set; }

/// <summary>
       /// 卡券号
       /// </summary>    
       public string CouponCode { get; set; }

/// <summary>
       /// 卡券ID
       /// </summary>
       public int CouponId { get; set; }

/// <summary>
       /// 会员ID
       /// </summary>
       public int MemberId { get; set; }

/// <summary>
       /// 发放时间
       /// </summary>  
       public DateTime IssueTime { get; set; }

/// <summary>
       /// 使用时间
       /// </summary>      
       public DateTime UsageTime { get; set; }

/// <summary>
       /// 使用店铺ID
       /// </summary>      

public int UsageShopId { get; set; }

/// <summary>
       /// 使用单号
       /// </summary>      
       public string UsageBillNo { get; set; }

/// <summary>
       /// 有效开始时间
       /// </summary>      
       public DateTime EffectiveStart { get; set; }

/// <summary>
       /// 有效结束时间
       /// </summary>      
       public DateTime EffectiveEnd { get; set; }

/// <summary>
       /// 状态
       /// CouponStatus 卡券状态:
       /// -1:未领用
       /// 0:未使用
       /// 1:已使用
       /// 2:已过期
       ///3:已作废
       ///4:转赠中
       /// </summary>

public Int16 Status { get; set; }
   }
}

来源:https://www.cnblogs.com/yaoxiaodan/p/16416567.html

标签:MySqlBulkLoader,批量,插入数据
0
投稿

猜你喜欢

  • matplotlib绘制多个子图(subplot)的方法

    2023-01-17 08:27:45
  • 一次Mysql update sql不当引起的生产故障记录

    2024-01-21 09:09:22
  • 需要掌握的八个CSS布局技巧

    2008-05-17 11:45:00
  • pytorch 彩色图像转灰度图像实例

    2023-08-02 17:28:37
  • pytorch随机采样操作SubsetRandomSampler()

    2021-05-22 18:00:46
  • Docker安装MySQL8的方法步骤

    2024-01-21 12:26:40
  • python如何实现word批量转HTML

    2023-11-04 06:28:41
  • javascript 使用sleep函数的常见方法详解

    2024-04-22 13:00:08
  • 解决安装pycharm后不能执行python脚本的问题

    2023-07-25 06:51:04
  • Python区块链创世块创建教程

    2023-10-10 06:25:52
  • python 筛选数据集中列中value长度大于20的数据集方法

    2023-05-27 03:55:02
  • Jquery 切换不同图片示例代码

    2024-04-16 09:31:33
  • 用python自动生成日历

    2022-08-28 14:28:58
  • pyinstaller参数介绍以及总结详解

    2023-12-02 11:11:59
  • 详解Python中命令行参数argparse的常用命令

    2022-06-06 15:59:30
  • nditer—numpy.ndarray 多维数组的迭代操作

    2023-10-20 20:21:10
  • Python全局锁中如何合理运用多线程(多进程)

    2022-05-27 18:58:22
  • Mysql中事务ACID的实现原理详解

    2024-01-29 01:49:41
  • Python语言的12个基础知识点小结

    2023-08-23 10:12:07
  • 如何修改vue-treeSelect的高度

    2024-05-08 09:33:55
  • asp之家 网络编程 m.aspxhome.com