有一个解决方案,可以在不牺牲性能的情况下使用异步,在EF Core和MS SQL数据库中进行了测试。
首先,你需要为
DBDataReader
做一个包装器。
它的
ReadAsync
方法应该读取整个行,将每一列的值存储在一个缓冲区中。
它的
GetXyz
方法应该从上述的缓冲区获取数值。
可以选择使用
GetBytes
+
Encoding.GetString
,而不是
GetString
。对于我的使用情况(每行16KB的文本列),这使得同步和异步的速度明显加快。
可以选择调整你的连接字符串的数据包大小。对于我的用例,32767的值使同步和异步的速度都有明显的提高。
现在你可以做一个
DbCommandInterceptor
,拦截
ReaderExecutingAsync
,创建一个有顺序访问的
DBDataReader
,由上述的包装器包装。
EF Core会尝试以非顺序的方式访问字段--这就是为什么包装器必须先读取和缓冲整个行。
下面是一个实现的例子(同时拦截异步和同步)。
public class ExampleDbCommandInterceptor : DbCommandInterceptor
public async override ValueTask<InterceptionResult<DbDataReader>> ReaderExecutingAsync(DbCommand command, CommandEventData eventData, InterceptionResult<DbDataReader> result, CancellationToken cancellationToken = default)
var behavior = CommandBehavior.SequentialAccess;
var reader = await command.ExecuteReaderAsync(behavior, cancellationToken).ConfigureAwait(false);
var wrapper = await DbDataReaderOptimizedWrapper.CreateAsync(reader, cancellationToken).ConfigureAwait(false);
return InterceptionResult<DbDataReader>.SuppressWithResult(wrapper);
public override InterceptionResult<DbDataReader> ReaderExecuting(DbCommand command, CommandEventData eventData, InterceptionResult<DbDataReader> result)
var behavior = CommandBehavior.SequentialAccess;
var reader = command.ExecuteReader(behavior);
var wrapper = DbDataReaderOptimizedWrapper.Create(reader);
return InterceptionResult<DbDataReader>.SuppressWithResult(wrapper);
sealed class DbDataReaderOptimizedWrapper : DbDataReader
readonly DbDataReader reader;
readonly DbColumn[] schema;
readonly object[] cache;
readonly Func<object>[] materializers;
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
private T Get<T>(int ordinal)
if (cache[ordinal] != DBNull.Value) return (T)cache[ordinal];
return (T)(object)null;
private DbDataReaderOptimizedWrapper(DbDataReader reader, IEnumerable<DbColumn> schema)
this.reader = reader;
this.schema = schema.OrderBy(x => x.ColumnOrdinal).ToArray();
cache = new object[this.schema.Length];
byte[] stringGetterBuffer = null;
string stringGetter(int i)
var dbColumn = this.schema[i];
if (dbColumn.ColumnSize < int.MaxValue) return reader.GetString(i);
if (stringGetterBuffer == null) stringGetterBuffer = new byte[32 * 1024];
var totalRead = 0;
while (true)
var offset = totalRead;
totalRead += (int)reader.GetBytes(i, offset, stringGetterBuffer, offset, stringGetterBuffer.Length - offset);
if (totalRead < stringGetterBuffer.Length) break;
const int maxBufferSize = int.MaxValue / 2;
if (stringGetterBuffer.Length >= maxBufferSize)
throw new OutOfMemoryException($"{nameof(DbDataReaderOptimizedWrapper)}.{nameof(GetString)} cannot load column '{GetName(i)}' because it contains a string longer than {maxBufferSize} bytes.");
Array.Resize(ref stringGetterBuffer, 2 * stringGetterBuffer.Length);
var c = dbColumn.DataTypeName[0];
var encoding = (c is 'N' or 'n') ? Encoding.Unicode : Encoding.ASCII;
return encoding.GetString(stringGetterBuffer.AsSpan(0, totalRead));
var dict = new Dictionary<Type, Func<DbColumn, int, Func<object>>>
[typeof(bool)] = (column, index) => () => reader.GetBoolean(index),
[typeof(byte)] = (column, index) => () => reader.GetByte(index),
[typeof(char)] = (column, index) => () => reader.GetChar(index),
[typeof(short)] = (column, index) => () => reader.GetInt16(index),
[typeof(int)] = (column, index) => () => reader.GetInt32(index),
[typeof(long)] = (column, index) => () => reader.GetInt64(index),
[typeof(float)] = (column, index) => () => reader.GetFloat(index),
[typeof(double)] = (column, index) => () => reader.GetDouble(index),
[typeof(decimal)] = (column, index) => () => reader.GetDecimal(index),
[typeof(DateTime)] = (column, index) => () => reader.GetDateTime(index),
[typeof(Guid)] = (column, index) => () => reader.GetGuid(index),
[typeof(string)] = (column, index) => () => stringGetter(index),
materializers = schema.Select((column, index) => dict[column.DataType](column, index)).ToArray();
public static DbDataReaderOptimizedWrapper Create(DbDataReader reader)
=> new DbDataReaderOptimizedWrapper(reader, reader.GetColumnSchema());
public static async ValueTask<DbDataReaderOptimizedWrapper> CreateAsync(DbDataReader reader, CancellationToken cancellationToken)
=> new DbDataReaderOptimizedWrapper(reader, await reader.GetColumnSchemaAsync(cancellationToken).ConfigureAwait(false));
protected override void Dispose(bool disposing) => reader.Dispose();
public async override ValueTask DisposeAsync() => await reader.DisposeAsync().ConfigureAwait(false);
public override object this[int ordinal] => Get<object>(ordinal);
public override object this[string name] => Get<object>(GetOrdinal(name));
public override int Depth => reader.Depth;
public override int FieldCount => reader.FieldCount;
public override bool HasRows => reader.HasRows;
public override bool IsClosed => reader.IsClosed;
public override int RecordsAffected => reader.RecordsAffected;
public override int VisibleFieldCount => reader.VisibleFieldCount;
public override bool GetBoolean(int ordinal) => Get<bool>(ordinal);
public override byte GetByte(int ordinal) => Get<byte>(ordinal);
public override long GetBytes(int ordinal, long dataOffset, byte[] buffer, int bufferOffset, int length) => throw new NotSupportedException();
public override char GetChar(int ordinal) => Get<char>(ordinal);
public override long GetChars(int ordinal, long dataOffset, char[] buffer, int bufferOffset, int length) => throw new NotSupportedException();
public override string GetDataTypeName(int ordinal) => reader.GetDataTypeName(ordinal);
public override DateTime GetDateTime(int ordinal) => Get<DateTime>(ordinal);
public override decimal GetDecimal(int ordinal) => Get<decimal>(ordinal);
public override double GetDouble(int ordinal) => Get<double>(ordinal);
public override IEnumerator GetEnumerator() => reader.GetEnumerator();
public override Type GetFieldType(int ordinal) => reader.GetFieldType(ordinal);
public override float GetFloat(int ordinal) => Get<float>(ordinal);
public override Guid GetGuid(int ordinal) => Get<Guid>(ordinal);
public override short GetInt16(int ordinal) => Get<short>(ordinal);
public override int GetInt32(int ordinal) => Get<int>(ordinal);
public override long GetInt64(int ordinal) => Get<long>(ordinal);
public override string GetName(int ordinal) => reader.GetName(ordinal);
public override int GetOrdinal(string name) => reader.GetOrdinal(name);
public override string GetString(int ordinal) => Get<string>(ordinal);
public override object GetValue(int ordinal) => Get<object>(ordinal);
public override int GetValues(object[] values)
var min = Math.Min(cache.Length, values.Length);
Array.Copy(cache, values, min);
return min;
public override bool IsDBNull(int ordinal) => Convert.IsDBNull(cache[ordinal]);
public override bool NextResult() => reader.NextResult();
public override bool Read()
Array.Clear(cache, 0, cache.Length);
if (reader.Read())
for (int i = 0; i < cache.Length; ++i)
if ((schema[i].AllowDBNull ?? true) && reader.IsDBNull(i))
cache[i] = DBNull.Value;
else cache[i] = materializers[i]();
return true;
return false;
public override void Close() => reader.Close();
public async override Task CloseAsync() => await reader.CloseAsync().ConfigureAwait(false);
public override DataTable GetSchemaTable() => reader.GetSchemaTable();
public async override Task<DataTable> GetSchemaTableAsync(CancellationToken cancellationToken = default) => await reader.GetSchemaTableAsync(cancellationToken).ConfigureAwait(false);
public async override Task<ReadOnlyCollection<DbColumn>> GetColumnSchemaAsync(CancellationToken cancellationToken = default) => await reader.GetColumnSchemaAsync(cancellationToken).ConfigureAwait(false);
public async override Task<bool> NextResultAsync(CancellationToken cancellationToken) => await reader.NextResultAsync(cancellationToken).ConfigureAwait(false);
public async override Task<bool> ReadAsync(CancellationToken cancellationToken)
Array.Clear(cache, 0, cache.Length);
if (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
for (int i = 0; i < cache.Length; ++i)
if ((schema[i].AllowDBNull ?? true) && await reader.IsDBNullAsync(i, cancellationToken).ConfigureAwait(false))
cache[i] = DBNull.Value;
else cache[i] = materializers[i]();
return true;
return false;
我现在无法提供一个基准,希望有人能在评论中这样做。