用户登录
用户注册

分享至

c#实现几种数据库的大数据批量插入

  • 作者: 俄每天都开心
  • 来源: 51数据库
  • 2022-04-28

在之前只知道sqlserver支持数据批量插入,殊不知道oracle、sqlite和mysql也是支持的,不过oracle需要使用orace.dataaccess驱动,今天就贴出几种数据库的批量插入解决方法。

首先说一下,iprovider里有一个用于实现批量插入的插件服务接口ibatcherprovider,此接口在前一篇文章中已经提到过了。

/// <summary> 
  /// 提供数据批量处理的方法。 
  /// </summary> 
  public interface ibatcherprovider : iproviderservice 
  { 
    /// <summary> 
    /// 将 <see cref="datatable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param> 
    /// <param name="batchsize">每批次写入的数据量。</param> 
    void insert(datatable datatable, int batchsize = 10000); 
  } 

一、sqlserver数据批量插入

sqlserver的批量插入很简单,使用sqlbulkcopy就可以,以下是该类的实现:

/// <summary> 
  /// 为 system.data.sqlclient 提供的用于批量操作的方法。 
  /// </summary> 
  public sealed class mssqlbatcher : ibatcherprovider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public servicecontext servicecontext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="datatable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param> 
    /// <param name="batchsize">每批次写入的数据量。</param> 
    public void insert(datatable datatable, int batchsize = 10000) 
    { 
      checker.argumentnull(datatable, "datatable"); 
      if (datatable.rows.count == 0) 
      { 
        return; 
      } 
      using (var connection = (sqlconnection)servicecontext.database.createconnection()) 
      { 
        try 
        { 
          connection.tryopen(); 
          //给表名加上前后导符 
          var tablename = dbutility.formatbyquote(servicecontext.database.provider.getservice<isyntaxprovider>(), datatable.tablename); 
          using (var bulk = new sqlbulkcopy(connection, sqlbulkcopyoptions.keepidentity, null) 
            { 
              destinationtablename = tablename,  
              batchsize = batchsize 
            }) 
          { 
            //循环所有列,为bulk添加映射 
            datatable.eachcolumn(c => bulk.columnmappings.add(c.columnname, c.columnname), c => !c.autoincrement); 
            bulk.writetoserver(datatable); 
            bulk.close(); 
          } 
        } 
        catch (exception exp) 
        { 
          throw new batcherexception(exp); 
        } 
        finally 
        { 
          connection.tryclose(); 
        } 
      } 
    } 
  } 

以上没有使用事务,使用事务在性能上会有一定的影响,如果要使用事务,可以设置sqlbulkcopyoptions.useinternaltransaction。

二、oracle数据批量插入

system.data.oracleclient不支持批量插入,因此只能使用oracle.dataaccess组件来作为提供者。

/// <summary> 
  /// oracle.data.access 组件提供的用于批量操作的方法。 
  /// </summary> 
  public sealed class oracleaccessbatcher : ibatcherprovider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public servicecontext servicecontext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="datatable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param> 
    /// <param name="batchsize">每批次写入的数据量。</param> 
    public void insert(datatable datatable, int batchsize = 10000) 
    { 
      checker.argumentnull(datatable, "datatable"); 
      if (datatable.rows.count == 0) 
      { 
        return; 
      } 
      using (var connection = servicecontext.database.createconnection()) 
      { 
        try 
        { 
          connection.tryopen(); 
          using (var command = servicecontext.database.provider.dbproviderfactory.createcommand()) 
          { 
            if (command == null) 
            { 
              throw new batcherexception(new argumentexception("command")); 
            } 
            command.connection = connection; 
            command.commandtext = generateinsersql(servicecontext.database, command, datatable); 
            command.executenonquery(); 
          } 
        } 
        catch (exception exp) 
        { 
          throw new batcherexception(exp); 
        } 
        finally 
        { 
          connection.tryclose(); 
        } 
      } 
    } 
 
    /// <summary> 
    /// 生成插入数据的sql语句。 
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="command"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string generateinsersql(idatabase database, dbcommand command, datatable table) 
    { 
      var names = new stringbuilder(); 
      var values = new stringbuilder(); 
      //将一个datatable的数据转换为数组的数组 
      var data = table.toarray(); 
 
      //设置arraybindcount属性 
      command.gettype().getproperty("arraybindcount").setvalue(command, table.rows.count, null); 
 
      var syntax = database.provider.getservice<isyntaxprovider>(); 
      for (var i = 0; i < table.columns.count; i++) 
      { 
        var column = table.columns[i]; 
 
        var parameter = database.provider.dbproviderfactory.createparameter(); 
        if (parameter == null) 
        { 
          continue; 
        } 
        parameter.parametername = column.columnname; 
        parameter.direction = parameterdirection.input; 
        parameter.dbtype = column.datatype.getdbtype(); 
        parameter.value = data[i]; 
 
        if (names.length > 0) 
        { 
          names.append(","); 
          values.append(","); 
        } 
        names.appendformat("{0}", dbutility.formatbyquote(syntax, column.columnname)); 
        values.appendformat("{0}{1}", syntax.parameterprefix, column.columnname); 
 
        command.parameters.add(parameter); 
      } 
      return string.format("insert into {0}({1}) values ({2})", dbutility.formatbyquote(syntax, table.tablename), names, values); 
    } 
  } 

以上最重要的一步,就是将datatable转为数组的数组表示,即object[][],前数组的上标是列的个数,后数组是行的个数,因此循环columns将后数组作为parameter的值,也就是说,参数的值是一个数组。而insert语句与一般的插入语句没有什么不一样。

三、sqlite数据批量插入

sqlite的批量插入只需开启事务就可以了,这个具体的原理不得而知。

public sealed class sqlitebatcher : ibatcherprovider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public servicecontext servicecontext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="datatable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param> 
    /// <param name="batchsize">每批次写入的数据量。</param> 
    public void insert(datatable datatable, int batchsize = 10000) 
    { 
      checker.argumentnull(datatable, "datatable"); 
      if (datatable.rows.count == 0) 
      { 
        return; 
      } 
      using (var connection = servicecontext.database.createconnection()) 
      { 
        dbtransaction transcation = null; 
        try 
        { 
          connection.tryopen(); 
          transcation = connection.begintransaction(); 
          using (var command = servicecontext.database.provider.dbproviderfactory.createcommand()) 
          { 
            if (command == null) 
            { 
              throw new batcherexception(new argumentexception("command")); 
            } 
            command.connection = connection; 
 
            command.commandtext = generateinsersql(servicecontext.database, datatable); 
            if (command.commandtext == string.empty) 
            { 
              return; 
            } 
 
            var flag = new assertflag(); 
            datatable.eachrow(row => 
              { 
                var first = flag.asserttrue(); 
                processcommandparameters(datatable, command, row, first); 
                command.executenonquery(); 
              }); 
          } 
          transcation.commit(); 
        } 
        catch (exception exp) 
        { 
          if (transcation != null) 
          { 
            transcation.rollback(); 
          } 
          throw new batcherexception(exp); 
        } 
        finally 
        { 
          connection.tryclose(); 
        } 
      } 
    } 
 
    private void processcommandparameters(datatable datatable, dbcommand command, datarow row, bool first) 
    { 
      for (var c = 0; c < datatable.columns.count; c++) 
      { 
        dbparameter parameter; 
        //首次创建参数,是为了使用缓存 
        if (first) 
        { 
          parameter = servicecontext.database.provider.dbproviderfactory.createparameter(); 
          parameter.parametername = datatable.columns[c].columnname; 
          command.parameters.add(parameter); 
        } 
        else 
        { 
          parameter = command.parameters[c]; 
        } 
        parameter.value = row[c]; 
      } 
    } 
 
    /// <summary> 
    /// 生成插入数据的sql语句。 
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string generateinsersql(idatabase database, datatable table) 
    { 
      var syntax = database.provider.getservice<isyntaxprovider>(); 
      var names = new stringbuilder(); 
      var values = new stringbuilder(); 
      var flag = new assertflag(); 
      table.eachcolumn(column => 
        { 
          if (!flag.asserttrue()) 
          { 
            names.append(","); 
            values.append(","); 
          } 
          names.append(dbutility.formatbyquote(syntax, column.columnname)); 
          values.appendformat("{0}{1}", syntax.parameterprefix, column.columnname); 
        }); 
      return string.format("insert into {0}({1}) values ({2})", dbutility.formatbyquote(syntax, table.tablename), names, values); 
    } 
  } 

四、mysql数据批量插入

/// <summary> 
  /// 为 mysql.data 组件提供的用于批量操作的方法。 
  /// </summary> 
  public sealed class mysqlbatcher : ibatcherprovider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public servicecontext servicecontext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="datatable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param> 
    /// <param name="batchsize">每批次写入的数据量。</param> 
    public void insert(datatable datatable, int batchsize = 10000) 
    { 
      checker.argumentnull(datatable, "datatable"); 
      if (datatable.rows.count == 0) 
      { 
        return; 
      } 
      using (var connection = servicecontext.database.createconnection()) 
      { 
        try 
        { 
          connection.tryopen(); 
          using (var command = servicecontext.database.provider.dbproviderfactory.createcommand()) 
          { 
            if (command == null) 
            { 
              throw new batcherexception(new argumentexception("command")); 
            } 
            command.connection = connection; 
 
            command.commandtext = generateinsersql(servicecontext.database, command, datatable); 
            if (command.commandtext == string.empty) 
            { 
              return; 
            } 
            command.executenonquery(); 
          } 
        } 
        catch (exception exp) 
        { 
          throw new batcherexception(exp); 
        } 
        finally 
        { 
          connection.tryclose(); 
        } 
      } 
    } 
 
    /// <summary> 
    /// 生成插入数据的sql语句。 
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="command"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string generateinsersql(idatabase database, dbcommand command, datatable table) 
    { 
      var names = new stringbuilder(); 
      var values = new stringbuilder(); 
      var types = new list<dbtype>(); 
      var count = table.columns.count; 
      var syntax = database.provider.getservice<isyntaxprovider>(); 
      table.eachcolumn(c => 
        { 
          if (names.length > 0) 
          { 
            names.append(","); 
          } 
          names.appendformat("{0}", dbutility.formatbyquote(syntax, c.columnname)); 
          types.add(c.datatype.getdbtype()); 
        }); 
 
      var i = 0; 
      foreach (datarow row in table.rows) 
      { 
        if (i > 0) 
        { 
          values.append(","); 
        } 
        values.append("("); 
        for (var j = 0; j < count; j++) 
        { 
          if (j > 0) 
          { 
            values.append(", "); 
          } 
          var isstrtype = isstringtype(types[j]); 
          var parameter = createparameter(database.provider, isstrtype, types[j], row[j], syntax.parameterprefix, i, j); 
          if (parameter != null) 
          { 
            values.append(parameter.parametername); 
            command.parameters.add(parameter); 
          } 
          else if (isstrtype) 
          { 
            values.appendformat("'{0}'", row[j]); 
          } 
          else 
          { 
            values.append(row[j]); 
          } 
        } 
        values.append(")"); 
        i++; 
      } 
      return string.format("insert into {0}({1}) values {2}", dbutility.formatbyquote(syntax, table.tablename), names, values); 
    } 
 
    /// <summary> 
    /// 判断是否为字符串类别。 
    /// </summary> 
    /// <param name="dbtype"></param> 
    /// <returns></returns> 
    private bool isstringtype(dbtype dbtype) 
    { 
      return dbtype == dbtype.ansistring || dbtype == dbtype.ansistringfixedlength || dbtype == dbtype.string || dbtype == dbtype.stringfixedlength; 
    } 
 
    /// <summary> 
    /// 创建参数。 
    /// </summary> 
    /// <param name="provider"></param> 
    /// <param name="isstrtype"></param> 
    /// <param name="dbtype"></param> 
    /// <param name="value"></param> 
    /// <param name="parprefix"></param> 
    /// <param name="row"></param> 
    /// <param name="col"></param> 
    /// <returns></returns> 
    private dbparameter createparameter(iprovider provider, bool isstrtype, dbtype dbtype, object value, char parprefix, int row, int col) 
    { 
      //如果生成全部的参数,则速度会很慢,因此,只有数据类型为字符串(包含'号)和日期型时才添加参数 
      if ((isstrtype && value.tostring().indexof('\'') != -1) || dbtype == dbtype.datetime) 
      { 
        var name = string.format("{0}p_{1}_{2}", parprefix, row, col); 
        var parameter = provider.dbproviderfactory.createparameter(); 
        parameter.parametername = name; 
        parameter.direction = parameterdirection.input; 
        parameter.dbtype = dbtype; 
        parameter.value = value; 
        return parameter; 
      } 
      return null; 
    } 
  } 

mysql的批量插入,是将值全部写在语句的values里,例如,insert batcher(id, name) values(1, '1', 2, '2', 3, '3', ........ 10, '10')。

五、测试

接下来写一个测试用例来看一下使用批量插入的效果。

    public void testbatchinsert() 
    { 
      console.writeline(timewatcher.watch(() => 
        invoketest(database => 
          { 
            var table = new datatable("batcher"); 
            table.columns.add("id", typeof(int)); 
            table.columns.add("name1", typeof(string)); 
            table.columns.add("name2", typeof(string)); 
            table.columns.add("name3", typeof(string)); 
            table.columns.add("name4", typeof(string)); 
 
            //构造100000条数据 
            for (var i = 0; i < 100000; i++) 
            { 
              table.rows.add(i, i.tostring(), i.tostring(), i.tostring(), i.tostring()); 
            } 
 
            //获取 ibatcherprovider 
            var batcher = database.provider.getservice<ibatcherprovider>(); 
            if (batcher == null) 
            { 
              console.writeline("不支持批量插入。"); 
            } 
            else 
            { 
              batcher.insert(table); 
            } 
 
            //输出batcher表的数据量 
            var sql = new sqlcommand("select count(1) from batcher"); 
            console.writeline("当前共有 {0} 条数据", database.executescalar(sql)); 
 
          }))); 
    } 

以下表中列出了四种数据库生成10万条数据各耗用的时间

数据库

耗用时间

mssql 00:00:02.9376300
oracle 00:00:01.5155959
sqlite 00:00:01.6275634
mysql 00:00:05.4166891

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

软件
前端设计
程序设计
Java相关