方案 | 數(shù)量 | 時間 |
---|---|---|
Insert | 1千條 | 145.4351ms |
BatchInsert | 1千條 | 103.9061ms |
SqlBulkCopy | 1千條 | 7.021ms |
Insert | 1萬條 | 1501.326ms |
BatchInsert | 1萬條 | 850.6274ms |
SqlBulkCopy | 1萬條 | 30.5129ms |
Insert | 10萬條 | 13875.4934ms |
BatchInsert | 10萬條 | 8278.9056ms |
SqlBulkCopy | 10萬條 | 314.8402ms |
兩者插入效率對比,Insert
明顯比SqlBulkCopy
要慢太多,大概20~40倍性能差距,下面我們將SqlBulkCopy
封裝一下,讓批量插入更加方便
批量插入擴展方法簽名
方法 | 方法參數(shù) | 介紹 |
---|---|---|
BulkCopy | 同步的批量插入方法 | |
SqlConnection connection | sql server 連接對象 | |
IEnumerableT> source | 需要批量插入的數(shù)據(jù)源 | |
string tableName = null | 插入表名稱【為NULL默認(rèn)為實體名稱】 | |
int bulkCopyTimeout = 30 | 批量插入超時時間 | |
int batchSize = 0 | 寫入數(shù)據(jù)庫一批數(shù)量【如果為0代表全部一次性插入】最合適數(shù)量【這取決于您的環(huán)境,尤其是行數(shù)和網(wǎng)絡(luò)延遲。就個人而言,我將從BatchSize屬性設(shè)置為1000行開始,然后看看其性能如何。如果可行,那么我將使行數(shù)加倍(例如增加到2000、4000等),直到性能下降或超時。否則,如果超時發(fā)生在1000,那么我將行數(shù)減少一半(例如500),直到它起作用為止。】 | |
SqlBulkCopyOptions options = SqlBulkCopyOptions.Default | 批量復(fù)制參數(shù) | |
SqlTransaction externalTransaction = null | 執(zhí)行的事務(wù)對象 | |
BulkCopyAsync | 異步的批量插入方法 | |
SqlConnection connection | sql server 連接對象 | |
IEnumerableT> source | 需要批量插入的數(shù)據(jù)源 | |
string tableName = null | 插入表名稱【為NULL默認(rèn)為實體名稱】 | |
int bulkCopyTimeout = 30 | 批量插入超時時間 | |
int batchSize = 0 | 寫入數(shù)據(jù)庫一批數(shù)量【如果為0代表全部一次性插入】最合適數(shù)量【這取決于您的環(huán)境,尤其是行數(shù)和網(wǎng)絡(luò)延遲。就個人而言,我將從BatchSize屬性設(shè)置為1000行開始,然后看看其性能如何。如果可行,那么我將使行數(shù)加倍(例如增加到2000、4000等),直到性能下降或超時。否則,如果超時發(fā)生在1000,那么我將行數(shù)減少一半(例如500),直到它起作用為止?!?/td> | |
SqlBulkCopyOptions options = SqlBulkCopyOptions.Default | 批量復(fù)制參數(shù) | |
SqlTransaction externalTransaction = null | 執(zhí)行的事務(wù)對象 |
這個方法主要解決了兩個問題:
DataTable
或者IDataReader
接口實現(xiàn)類,手動構(gòu)建的轉(zhuǎn)換比較難以維護,如果修改字段就得把這些地方都進行修改,特別是還需要將枚舉類型特殊處理,轉(zhuǎn)換成他的基礎(chǔ)類型(默認(rèn)int
)SqlBulkCopy
對象,和配置數(shù)據(jù)庫列的映射,和一些屬性的配置此方案也是在我公司中使用,以滿足公司的批量插入數(shù)據(jù)的需求,例如第三方的對賬數(shù)據(jù)此方法使用的是Expression
動態(tài)生成數(shù)據(jù)轉(zhuǎn)換函數(shù),其效率和手寫的原生代碼差不多,和原生手寫代碼相比,多余的轉(zhuǎn)換損失很小【最大的性能損失都是在值類型
拆裝箱上】
此方案和其他網(wǎng)上的方案有些不同的是:不是將List
先轉(zhuǎn)換成DataTable
,然后寫入SqlBulkCopy
的,而是使用一個實現(xiàn)IDataReader
的讀取器包裝List
,每往SqlBulkCopy
插入一行數(shù)據(jù)才會轉(zhuǎn)換一行數(shù)據(jù)
IDataReader
方案和DataTable
方案相比優(yōu)點
效率高:DataTable
方案需要先完全轉(zhuǎn)換后,才能交由SqlBulkCopy
寫入數(shù)據(jù)庫,而IDataReader
方案可以邊轉(zhuǎn)換邊交給SqlBulkCopy
寫入數(shù)據(jù)庫(例如:10萬數(shù)據(jù)插入速度可提升30%)
占用內(nèi)存少:DataTable
方案需要先完全轉(zhuǎn)換后,才能交由SqlBulkCopy
寫入數(shù)據(jù)庫,需要占用大量內(nèi)存,而IDataReader
方案可以邊轉(zhuǎn)換邊交給SqlBulkCopy
寫入數(shù)據(jù)庫,無須占用過多內(nèi)存
強大:因為是邊寫入邊轉(zhuǎn)換,而且EnumerableReader
傳入的是一個迭代器,可以實現(xiàn)持續(xù)插入數(shù)據(jù)的效果
① 實體Model與表映射
數(shù)據(jù)庫表代碼
CREATE TABLE [dbo].[Person]( [Id] [BIGINT] NOT NULL, [Name] [VARCHAR](64) NOT NULL, [Age] [INT] NOT NULL, [CreateTime] [DATETIME] NULL, [Sex] [INT] NOT NULL, PRIMARY KEY CLUSTERED ( [Id] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] ) ON [PRIMARY]
實體類代碼
public class Person { public long Id { get; set; } public string Name { get; set; } public int Age { get; set; } public DateTime? CreateTime { get; set; } public Gender Sex { get; set; } } public enum Gender { Man = 0, Woman = 1 }
SqlBulkCopy
類型的ColumnMappings
屬性來完成,數(shù)據(jù)列與數(shù)據(jù)庫中列的映射//創(chuàng)建批量插入對象 using (var copy = new SqlBulkCopy(connection, options, externalTransaction)) { foreach (var column in ModelToDataTableTModel>.Columns) { //創(chuàng)建字段映射 copy.ColumnMappings.Add(column.ColumnName, column.ColumnName); } }
② 實體轉(zhuǎn)換成數(shù)據(jù)行
將數(shù)據(jù)轉(zhuǎn)換成數(shù)據(jù)行采用的是:反射
+Expression
來完成
其中反射
是用于獲取編寫Expression
所需程序類,屬性等信息
其中Expression
是用于生成高效轉(zhuǎn)換函數(shù)其中ModelToDataTableTModel>
類型利用了靜態(tài)泛型類特性,實現(xiàn)泛型參數(shù)的緩存效果
在ModelToDataTableTModel>
的靜態(tài)構(gòu)造函數(shù)中,生成轉(zhuǎn)換函數(shù),獲取需要轉(zhuǎn)換的屬性信息,并存入靜態(tài)只讀字段中,完成緩存
③ 使用IDataReader插入數(shù)據(jù)的重載
EnumerableReader
是實現(xiàn)了IDataReader
接口的讀取類,用于將模型對象,在迭代器中讀取出來,并轉(zhuǎn)換成數(shù)據(jù)行,可供SqlBulkCopy
讀取
SqlBulkCopy
只會調(diào)用三個方法:GetOrdinal
、Read
、GetValue
GetOrdinal
只會在首行讀取每個列所代表序號【需要填寫:SqlBulkCopy
類型的ColumnMappings
屬性】Read
方法是迭代到下一行,并調(diào)用ModelToDataTableTModel>.ToRowData.Invoke()
來將模型對象轉(zhuǎn)換成數(shù)據(jù)行object[]
GetValue
方法是獲取當(dāng)前行指定下標(biāo)位置的值擴展方法類
public static class SqlConnectionExtension { /// summary> /// 批量復(fù)制 /// /summary> /// typeparam name="TModel">插入的模型對象/typeparam> /// param name="source">需要批量插入的數(shù)據(jù)源/param> /// param name="connection">數(shù)據(jù)庫連接對象/param> /// param name="tableName">插入表名稱【為NULL默認(rèn)為實體名稱】/param> /// param name="bulkCopyTimeout">插入超時時間/param> /// param name="batchSize">寫入數(shù)據(jù)庫一批數(shù)量【如果為0代表全部一次性插入】最合適數(shù)量【這取決于您的環(huán)境,尤其是行數(shù)和網(wǎng)絡(luò)延遲。就個人而言,我將從BatchSize屬性設(shè)置為1000行開始,然后看看其性能如何。如果可行,那么我將使行數(shù)加倍(例如增加到2000、4000等),直到性能下降或超時。否則,如果超時發(fā)生在1000,那么我將行數(shù)減少一半(例如500),直到它起作用為止?!?param> /// param name="options">批量復(fù)制參數(shù)/param> /// param name="externalTransaction">執(zhí)行的事務(wù)對象/param> /// returns>插入數(shù)量/returns> public static int BulkCopyTModel>(this SqlConnection connection, IEnumerableTModel> source, string tableName = null, int bulkCopyTimeout = 30, int batchSize = 0, SqlBulkCopyOptions options = SqlBulkCopyOptions.Default, SqlTransaction externalTransaction = null) { //創(chuàng)建讀取器 using (var reader = new EnumerableReaderTModel>(source)) { //創(chuàng)建批量插入對象 using (var copy = new SqlBulkCopy(connection, options, externalTransaction)) { //插入的表 copy.DestinationTableName = tableName ?? typeof(TModel).Name; //寫入數(shù)據(jù)庫一批數(shù)量 copy.BatchSize = batchSize; //超時時間 copy.BulkCopyTimeout = bulkCopyTimeout; //創(chuàng)建字段映射【如果沒有此字段映射會導(dǎo)致數(shù)據(jù)填錯位置,如果類型不對還會導(dǎo)致報錯】【因為:沒有此字段映射默認(rèn)是按照列序號對應(yīng)插入的】 foreach (var column in ModelToDataTableTModel>.Columns) { //創(chuàng)建字段映射 copy.ColumnMappings.Add(column.ColumnName, column.ColumnName); } //將數(shù)據(jù)批量寫入數(shù)據(jù)庫 copy.WriteToServer(reader); //返回插入數(shù)據(jù)數(shù)量 return reader.Depth; } } } /// summary> /// 批量復(fù)制-異步 /// /summary> /// typeparam name="TModel">插入的模型對象/typeparam> /// param name="source">需要批量插入的數(shù)據(jù)源/param> /// param name="connection">數(shù)據(jù)庫連接對象/param> /// param name="tableName">插入表名稱【為NULL默認(rèn)為實體名稱】/param> /// param name="bulkCopyTimeout">插入超時時間/param> /// param name="batchSize">寫入數(shù)據(jù)庫一批數(shù)量【如果為0代表全部一次性插入】最合適數(shù)量【這取決于您的環(huán)境,尤其是行數(shù)和網(wǎng)絡(luò)延遲。就個人而言,我將從BatchSize屬性設(shè)置為1000行開始,然后看看其性能如何。如果可行,那么我將使行數(shù)加倍(例如增加到2000、4000等),直到性能下降或超時。否則,如果超時發(fā)生在1000,那么我將行數(shù)減少一半(例如500),直到它起作用為止?!?param> /// param name="options">批量復(fù)制參數(shù)/param> /// param name="externalTransaction">執(zhí)行的事務(wù)對象/param> /// returns>插入數(shù)量/returns> public static async Taskint> BulkCopyAsyncTModel>(this SqlConnection connection, IEnumerableTModel> source, string tableName = null, int bulkCopyTimeout = 30, int batchSize = 0, SqlBulkCopyOptions options = SqlBulkCopyOptions.Default, SqlTransaction externalTransaction = null) { //創(chuàng)建讀取器 using (var reader = new EnumerableReaderTModel>(source)) { //創(chuàng)建批量插入對象 using (var copy = new SqlBulkCopy(connection, options, externalTransaction)) { //插入的表 copy.DestinationTableName = tableName ?? typeof(TModel).Name; //寫入數(shù)據(jù)庫一批數(shù)量 copy.BatchSize = batchSize; //超時時間 copy.BulkCopyTimeout = bulkCopyTimeout; //創(chuàng)建字段映射【如果沒有此字段映射會導(dǎo)致數(shù)據(jù)填錯位置,如果類型不對還會導(dǎo)致報錯】【因為:沒有此字段映射默認(rèn)是按照列序號對應(yīng)插入的】 foreach (var column in ModelToDataTableTModel>.Columns) { //創(chuàng)建字段映射 copy.ColumnMappings.Add(column.ColumnName, column.ColumnName); } //將數(shù)據(jù)批量寫入數(shù)據(jù)庫 await copy.WriteToServerAsync(reader); //返回插入數(shù)據(jù)數(shù)量 return reader.Depth; } } } }
封裝的迭代器數(shù)據(jù)讀取器
/// summary> /// 迭代器數(shù)據(jù)讀取器 /// /summary> /// typeparam name="TModel">模型類型/typeparam> public class EnumerableReaderTModel> : IDataReader { /// summary> /// 實例化迭代器讀取對象 /// /summary> /// param name="source">模型源/param> public EnumerableReader(IEnumerableTModel> source) { _source = source ?? throw new ArgumentNullException(nameof(source)); _enumerable = source.GetEnumerator(); } private readonly IEnumerableTModel> _source; private readonly IEnumeratorTModel> _enumerable; private object[] _currentDataRow = Array.Emptyobject>(); private int _depth; private bool _release; public void Dispose() { _release = true; _enumerable.Dispose(); } public int GetValues(object[] values) { if (values == null) throw new ArgumentNullException(nameof(values)); var length = Math.Min(_currentDataRow.Length, values.Length); Array.Copy(_currentDataRow, values, length); return length; } public int GetOrdinal(string name) { for (int i = 0; i ModelToDataTableTModel>.Columns.Count; i++) { if (ModelToDataTableTModel>.Columns[i].ColumnName == name) return i; } return -1; } public long GetBytes(int ordinal, long dataIndex, byte[] buffer, int bufferIndex, int length) { if (dataIndex 0) throw new Exception($"起始下標(biāo)不能小于0!"); if (bufferIndex 0) throw new Exception("目標(biāo)緩沖區(qū)起始下標(biāo)不能小于0!"); if (length 0) throw new Exception("讀取長度不能小于0!"); var numArray = (byte[])GetValue(ordinal); if (buffer == null) return numArray.Length; if (buffer.Length = bufferIndex) throw new Exception("目標(biāo)緩沖區(qū)起始下標(biāo)不能大于目標(biāo)緩沖區(qū)范圍!"); var freeLength = Math.Min(numArray.Length - bufferIndex, length); if (freeLength = 0) return 0; Array.Copy(numArray, dataIndex, buffer, bufferIndex, length); return freeLength; } public long GetChars(int ordinal, long dataIndex, char[] buffer, int bufferIndex, int length) { if (dataIndex 0) throw new Exception($"起始下標(biāo)不能小于0!"); if (bufferIndex 0) throw new Exception("目標(biāo)緩沖區(qū)起始下標(biāo)不能小于0!"); if (length 0) throw new Exception("讀取長度不能小于0!"); var numArray = (char[])GetValue(ordinal); if (buffer == null) return numArray.Length; if (buffer.Length = bufferIndex) throw new Exception("目標(biāo)緩沖區(qū)起始下標(biāo)不能大于目標(biāo)緩沖區(qū)范圍!"); var freeLength = Math.Min(numArray.Length - bufferIndex, length); if (freeLength = 0) return 0; Array.Copy(numArray, dataIndex, buffer, bufferIndex, length); return freeLength; } public bool IsDBNull(int i) { var value = GetValue(i); return value == null || value is DBNull; } public bool NextResult() { //移動到下一個元素 if (!_enumerable.MoveNext()) return false; //行層+1 Interlocked.Increment(ref _depth); //得到數(shù)據(jù)行 _currentDataRow = ModelToDataTableTModel>.ToRowData.Invoke(_enumerable.Current); return true; } public byte GetByte(int i) => (byte)GetValue(i); public string GetName(int i) => ModelToDataTableTModel>.Columns[i].ColumnName; public string GetDataTypeName(int i) => ModelToDataTableTModel>.Columns[i].DataType.Name; public Type GetFieldType(int i) => ModelToDataTableTModel>.Columns[i].DataType; public object GetValue(int i) => _currentDataRow[i]; public bool GetBoolean(int i) => (bool)GetValue(i); public char GetChar(int i) => (char)GetValue(i); public Guid GetGuid(int i) => (Guid)GetValue(i); public short GetInt16(int i) => (short)GetValue(i); public int GetInt32(int i) => (int)GetValue(i); public long GetInt64(int i) => (long)GetValue(i); public float GetFloat(int i) => (float)GetValue(i); public double GetDouble(int i) => (double)GetValue(i); public string GetString(int i) => (string)GetValue(i); public decimal GetDecimal(int i) => (decimal)GetValue(i); public DateTime GetDateTime(int i) => (DateTime)GetValue(i); public IDataReader GetData(int i) => throw new NotSupportedException(); public int FieldCount => ModelToDataTableTModel>.Columns.Count; public object this[int i] => GetValue(i); public object this[string name] => GetValue(GetOrdinal(name)); public void Close() => Dispose(); public DataTable GetSchemaTable() => ModelToDataTableTModel>.ToDataTable(_source); public bool Read() => NextResult(); public int Depth => _depth; public bool IsClosed => _release; public int RecordsAffected => 0; }
模型對象轉(zhuǎn)數(shù)據(jù)行工具類
/// summary> /// 對象轉(zhuǎn)換成DataTable轉(zhuǎn)換類 /// /summary> /// typeparam name="TModel">泛型類型/typeparam> public static class ModelToDataTableTModel> { static ModelToDataTable() { //如果需要剔除某些列可以修改這段代碼 var propertyList = typeof(TModel).GetProperties().Where(w => w.CanRead).ToArray(); Columns = new ReadOnlyCollectionDataColumn>(propertyList .Select(pr => new DataColumn(pr.Name, GetDataType(pr.PropertyType))).ToArray()); //生成對象轉(zhuǎn)數(shù)據(jù)行委托 ToRowData = BuildToRowDataDelegation(typeof(TModel), propertyList); } /// summary> /// 構(gòu)建轉(zhuǎn)換成數(shù)據(jù)行委托 /// /summary> /// param name="type">傳入類型/param> /// param name="propertyList">轉(zhuǎn)換的屬性/param> /// returns>轉(zhuǎn)換數(shù)據(jù)行委托/returns> private static FuncTModel, object[]> BuildToRowDataDelegation(Type type, PropertyInfo[] propertyList) { var source = Expression.Parameter(type); var items = propertyList.Select(property => ConvertBindPropertyToData(source, property)); var array = Expression.NewArrayInit(typeof(object), items); var lambda = Expression.LambdaFuncTModel, object[]>>(array, source); return lambda.Compile(); } /// summary> /// 將屬性轉(zhuǎn)換成數(shù)據(jù) /// /summary> /// param name="source">源變量/param> /// param name="property">屬性信息/param> /// returns>獲取屬性數(shù)據(jù)表達(dá)式/returns> private static Expression ConvertBindPropertyToData(ParameterExpression source, PropertyInfo property) { var propertyType = property.PropertyType; var expression = (Expression)Expression.Property(source, property); if (propertyType.IsEnum) expression = Expression.Convert(expression, propertyType.GetEnumUnderlyingType()); return Expression.Convert(expression, typeof(object)); } /// summary> /// 獲取數(shù)據(jù)類型 /// /summary> /// param name="type">屬性類型/param> /// returns>數(shù)據(jù)類型/returns> private static Type GetDataType(Type type) { //枚舉默認(rèn)轉(zhuǎn)換成對應(yīng)的值類型 if (type.IsEnum) return type.GetEnumUnderlyingType(); //可空類型 if (type.IsGenericType type.GetGenericTypeDefinition() == typeof(Nullable>)) return GetDataType(type.GetGenericArguments().First()); return type; } /// summary> /// 列集合 /// /summary> public static IReadOnlyListDataColumn> Columns { get; } /// summary> /// 對象轉(zhuǎn)數(shù)據(jù)行委托 /// /summary> public static FuncTModel, object[]> ToRowData { get; } /// summary> /// 集合轉(zhuǎn)換成DataTable /// /summary> /// param name="source">集合/param> /// param name="tableName">表名稱/param> /// returns>轉(zhuǎn)換完成的DataTable/returns> public static DataTable ToDataTable(IEnumerableTModel> source, string tableName = "TempTable") { //創(chuàng)建表對象 var table = new DataTable(tableName); //設(shè)置列 foreach (var dataColumn in Columns) { table.Columns.Add(new DataColumn(dataColumn.ColumnName, dataColumn.DataType)); } //循環(huán)轉(zhuǎn)換每一行數(shù)據(jù) foreach (var item in source) { table.Rows.Add(ToRowData.Invoke(item)); } //返回表對象 return table; } }
創(chuàng)表代碼
CREATE TABLE [dbo].[Person]( [Id] [BIGINT] NOT NULL, [Name] [VARCHAR](64) NOT NULL, [Age] [INT] NOT NULL, [CreateTime] [DATETIME] NULL, [Sex] [INT] NOT NULL, PRIMARY KEY CLUSTERED ( [Id] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] ) ON [PRIMARY]
實體類代碼
定義的實體的屬性名稱需要和SqlServer
列名稱類型對應(yīng)
public class Person { public long Id { get; set; } public string Name { get; set; } public int Age { get; set; } public DateTime? CreateTime { get; set; } public Gender Sex { get; set; } } public enum Gender { Man = 0, Woman = 1 }
測試方法
//生成10萬條數(shù)據(jù) var persons = new Person[100000]; var random = new Random(); for (int i = 0; i persons.Length; i++) { persons[i] = new Person { Id = i + 1, Name = "張三" + i, Age = random.Next(1, 128), Sex = (Gender)random.Next(2), CreateTime = random.Next(2) == 0 ? null : (DateTime?) DateTime.Now.AddSeconds(i) }; } //創(chuàng)建數(shù)據(jù)庫連接 using (var conn = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;")) { conn.Open(); var sw = Stopwatch.StartNew(); //批量插入數(shù)據(jù) var qty = conn.BulkCopy(persons); sw.Stop(); Console.WriteLine(sw.Elapsed.TotalMilliseconds + "ms"); }
執(zhí)行批量插入結(jié)果
226.4767ms
請按任意鍵繼續(xù). . .
GitHub代碼地址:https://github.com/liu-zhen-liang/PackagingComponentsSet/tree/main/SqlBulkCopyComponents
標(biāo)簽:汕頭 濟源 武威 廣東 安徽 臺州 濟寧 泰安
巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《SQL Server 批量插入數(shù)據(jù)的完美解決方案》,本文關(guān)鍵詞 SQL,Server,批量,插入,數(shù)據(jù),;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請?zhí)峁┫嚓P(guān)信息告之我們,我們將及時溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。