diff --git a/src/Rowbot.ClosedXml/Rowbot.ClosedXml.csproj b/src/Rowbot.ClosedXml/Rowbot.ClosedXml.csproj
index b792280..01594a5 100644
--- a/src/Rowbot.ClosedXml/Rowbot.ClosedXml.csproj
+++ b/src/Rowbot.ClosedXml/Rowbot.ClosedXml.csproj
@@ -2,6 +2,7 @@
netstandard2.0
+ 8
diff --git a/src/Rowbot/CsvHelper/AsyncCsvHelperSource.cs b/src/Rowbot/CsvHelper/AsyncCsvHelperSource.cs
new file mode 100644
index 0000000..7e7350d
--- /dev/null
+++ b/src/Rowbot/CsvHelper/AsyncCsvHelperSource.cs
@@ -0,0 +1,87 @@
+using CsvHelper;
+using CsvHelper.Configuration;
+using System.IO;
+using System.Linq;
+using System.Threading.Tasks;
+
+namespace Rowbot.CsvHelper
+{
+ public class AsyncCsvHelperSource : IAsyncRowSource
+ {
+ private readonly CsvReader _csvReader;
+ private readonly bool _readFirstLineAsHeaders = false;
+ private int _readCallCount = 0;
+
+ public AsyncCsvHelperSource(Stream stream, CsvConfiguration configuration, bool readFirstLineAsHeaders) : this(new CsvReader(new StreamReader(stream), configuration), readFirstLineAsHeaders: readFirstLineAsHeaders)
+ {
+ }
+
+ public AsyncCsvHelperSource(CsvReader csvReader, bool readFirstLineAsHeaders)
+ {
+ _csvReader = csvReader;
+ _readFirstLineAsHeaders = readFirstLineAsHeaders;
+ }
+
+ public void Dispose()
+ {
+ _csvReader.Dispose();
+ }
+
+ public Task CompleteAsync()
+ {
+ // Nothing to complete
+ return Task.CompletedTask;
+ }
+
+ public async Task InitAndGetColumnsAsync()
+ {
+ await _csvReader.ReadAsync();
+ _csvReader.ReadHeader();
+
+ if (_readFirstLineAsHeaders)
+ {
+ var columns = _csvReader.HeaderRecord.Select(header => new ColumnInfo(name: header, valueType: typeof(string))).ToArray();
+ return columns;
+ }
+ else
+ {
+ var columns = new ColumnInfo[_csvReader.HeaderRecord.Length];
+ for (var i = 0; i < _csvReader.HeaderRecord.Length; i++)
+ {
+ columns[i] = new ColumnInfo(name: $"Column{i + 1}", valueType: typeof(string));
+ }
+ return columns;
+ }
+ }
+
+ public async Task ReadRowAsync(object[] values)
+ {
+ _readCallCount++;
+ if (_readCallCount == 1 && !_readFirstLineAsHeaders)
+ {
+ // First line should not be read as headers but as data. Copy from the headers line onto the values buffer
+ for (var i = 0; i < values.Length; i++)
+ {
+ values[i] = _csvReader.HeaderRecord[i];
+ }
+ return true;
+ }
+ else
+ {
+ if (await _csvReader.ReadAsync())
+ {
+
+ for (var i = 0; i < values.Length; i++)
+ {
+ values[i] = _csvReader.GetField(i);
+ }
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ }
+}
diff --git a/src/Rowbot/CsvHelper/AsyncCsvHelperTarget.cs b/src/Rowbot/CsvHelper/AsyncCsvHelperTarget.cs
new file mode 100644
index 0000000..4fe058d
--- /dev/null
+++ b/src/Rowbot/CsvHelper/AsyncCsvHelperTarget.cs
@@ -0,0 +1,80 @@
+using CsvHelper;
+using CsvHelper.Configuration;
+using System;
+using System.IO;
+using System.Threading.Tasks;
+
+namespace Rowbot.CsvHelper
+{
+ public sealed class AsyncCsvHelperTarget : IAsyncRowTarget, IDisposable
+ {
+ private readonly CsvWriter _csvWriter;
+ private readonly bool _writeHeaders;
+ private int _unflushedRowCount = 0;
+ private bool _firstWrite = true;
+
+ public AsyncCsvHelperTarget(Stream stream, CsvConfiguration configuration, bool writeHeaders = true, bool leaveOpen = false) : this(new CsvWriter(new StreamWriter(stream), configuration, leaveOpen: leaveOpen))
+ {
+ _writeHeaders = writeHeaders;
+ }
+
+ public AsyncCsvHelperTarget(CsvWriter csvWriter)
+ {
+ _csvWriter = csvWriter;
+ }
+
+ public void Dispose()
+ {
+ _csvWriter?.Dispose();
+ }
+
+ public async Task CompleteAsync()
+ {
+ await FlushAsync();
+ _csvWriter?.Dispose();
+ }
+
+ public Task InitAsync(ColumnInfo[] columns)
+ {
+ if (_writeHeaders)
+ {
+ for (var i = 0; i < columns.Length; i++)
+ {
+ _csvWriter.WriteField(columns[i].Name);
+ }
+ _firstWrite = false;
+ }
+
+ return Task.CompletedTask;
+ }
+
+ public async Task WriteRowAsync(object[] values)
+ {
+ if (!_firstWrite)
+ {
+ await _csvWriter.NextRecordAsync();
+ }
+ for (var i = 0; i < values.Length; i++)
+ {
+ _csvWriter.WriteField(values[i]);
+ }
+ _unflushedRowCount++;
+ _firstWrite = false;
+ await FlushIfNeeded();
+ }
+
+ private async Task FlushIfNeeded()
+ {
+ if (_unflushedRowCount > 1000)
+ {
+ await FlushAsync();
+ }
+ }
+
+ private async Task FlushAsync()
+ {
+ await _csvWriter.FlushAsync();
+ _unflushedRowCount = 0;
+ }
+ }
+}
diff --git a/src/Rowbot/Execution/AsyncEnumerableTargetGuards.cs b/src/Rowbot/Execution/AsyncEnumerableTargetGuards.cs
new file mode 100644
index 0000000..14697ee
--- /dev/null
+++ b/src/Rowbot/Execution/AsyncEnumerableTargetGuards.cs
@@ -0,0 +1,63 @@
+using System;
+using System.Threading.Tasks;
+
+namespace Rowbot.Execution
+{
+ public sealed class AsyncEnumerableTargetGuards : IAsyncEnumerableRowTarget, IDisposable
+ {
+ private readonly IAsyncEnumerableRowTarget _rowTarget;
+
+ private bool Completed { get; set; } = false;
+ private bool Initialized { get; set; } = false;
+
+ public AsyncEnumerableTargetGuards(IAsyncEnumerableRowTarget rowTarget)
+ {
+ _rowTarget = rowTarget ?? throw new ArgumentNullException(nameof(rowTarget));
+ }
+
+ public Task CompleteAsync()
+ {
+ if (!Initialized)
+ throw new InvalidOperationException("Init must be called before Complete()");
+ if (Completed)
+ throw new InvalidOperationException("Complete already called and can only be called once.");
+ Completed = true;
+
+ return _rowTarget.CompleteAsync();
+ }
+
+ public void Dispose()
+ {
+ (_rowTarget as IDisposable)?.Dispose();
+ }
+
+ public Task InitAsync(ColumnInfo[] columns)
+ {
+ if (columns is null)
+ {
+ throw new ArgumentNullException(nameof(columns));
+ }
+
+ if (Initialized)
+ throw new InvalidOperationException("Init has already been called and can only be called once.");
+ Initialized = true;
+
+ return _rowTarget.InitAsync(columns);
+ }
+
+ public Task WriteRowAsync(object[] values)
+ {
+ if (values is null)
+ {
+ throw new ArgumentNullException(nameof(values));
+ }
+
+ if (!Initialized)
+ throw new InvalidOperationException("Init must be called before WriteRows");
+ if (Completed)
+ throw new InvalidOperationException("Complete already called. Not allowed to write more rows");
+
+ return _rowTarget.WriteRowAsync(values);
+ }
+ }
+}
diff --git a/src/Rowbot/Execution/AsyncSourceGuards.cs b/src/Rowbot/Execution/AsyncSourceGuards.cs
new file mode 100644
index 0000000..87d9635
--- /dev/null
+++ b/src/Rowbot/Execution/AsyncSourceGuards.cs
@@ -0,0 +1,75 @@
+using System;
+using System.Linq;
+using System.Threading.Tasks;
+
+namespace Rowbot.Execution
+{
+ ///
+ /// This helper class wraps a source and handles all input and output parameter validation as well as ensuring methods not called out of order.
+ ///
+ public sealed class AsyncSourceGuards : IAsyncRowSource, IDisposable
+ {
+ private bool Initialized { get; set; } = false;
+ private bool Completed { get; set; } = false;
+ private int _columnCount = -1;
+ private bool? _previousReadResult = null;
+ private readonly IAsyncRowSource _rowSource;
+
+ public AsyncSourceGuards(IAsyncRowSource rowSource)
+ {
+ _rowSource = rowSource ?? throw new ArgumentNullException(nameof(rowSource));
+ }
+
+ public async Task InitAndGetColumnsAsync()
+ {
+ if (Initialized)
+ throw new InvalidOperationException("Already initialized");
+ Initialized = true;
+ var columns = await _rowSource.InitAndGetColumnsAsync();
+ if (columns == null)
+ throw new InvalidOperationException("Null was returned by OnInitAndGetColumns() which is not valid.");
+ if (columns.Any(c => c == null))
+ throw new InvalidOperationException("Returned columns array from OnInitAndGetColumns() contains one or more null values which is not allowed.");
+
+ _columnCount = columns.Length;
+ return columns;
+ }
+
+ public async Task ReadRowAsync(object[] values)
+ {
+ if (values is null)
+ {
+ throw new ArgumentNullException(nameof(values));
+ }
+
+ if (!Initialized)
+ throw new InvalidOperationException("Not initialized");
+
+ if (values.Length != _columnCount)
+ throw new InvalidOperationException($"Provided values object[] buffer contains {values.Length} slots, but column count returned earlier container {_columnCount} columns. These counts must match.");
+
+ if (_previousReadResult == false)
+ throw new InvalidOperationException("It is not allowed to call Read after the method has already returned false in a previous call.");
+
+ var readResult = await _rowSource.ReadRowAsync(values);
+ _previousReadResult = readResult;
+
+ return readResult;
+ }
+
+ public Task CompleteAsync()
+ {
+ if (!Initialized)
+ throw new InvalidOperationException("Not initialized");
+ if (Completed)
+ throw new InvalidOperationException("Already completed");
+ Completed = true;
+ return _rowSource.CompleteAsync();
+ }
+
+ public void Dispose()
+ {
+ (_rowSource as IDisposable)?.Dispose();
+ }
+ }
+}
diff --git a/src/Rowbot/Execution/AsyncTargetGuards.cs b/src/Rowbot/Execution/AsyncTargetGuards.cs
new file mode 100644
index 0000000..2377086
--- /dev/null
+++ b/src/Rowbot/Execution/AsyncTargetGuards.cs
@@ -0,0 +1,63 @@
+using System;
+using System.Threading.Tasks;
+
+namespace Rowbot.Execution
+{
+ public sealed class AsyncTargetGuards : IAsyncRowTarget, IDisposable
+ {
+ private readonly IAsyncRowTarget _rowTarget;
+
+ private bool Completed { get; set; } = false;
+ private bool Initialized { get; set; } = false;
+
+ public AsyncTargetGuards(IAsyncRowTarget rowTarget)
+ {
+ _rowTarget = rowTarget ?? throw new ArgumentNullException(nameof(rowTarget));
+ }
+
+ public Task CompleteAsync()
+ {
+ if (!Initialized)
+ throw new InvalidOperationException("Init must be called before Complete()");
+ if (Completed)
+ throw new InvalidOperationException("Complete already called and can only be called once.");
+ Completed = true;
+
+ return _rowTarget.CompleteAsync();
+ }
+
+ public void Dispose()
+ {
+ (_rowTarget as IDisposable)?.Dispose();
+ }
+
+ public Task InitAsync(ColumnInfo[] columns)
+ {
+ if (columns is null)
+ {
+ throw new ArgumentNullException(nameof(columns));
+ }
+
+ if (Initialized)
+ throw new InvalidOperationException("Init has already been called and can only be called once.");
+ Initialized = true;
+
+ return _rowTarget.InitAsync(columns);
+ }
+
+ public Task WriteRowAsync(object[] values)
+ {
+ if (values is null)
+ {
+ throw new ArgumentNullException(nameof(values));
+ }
+
+ if (!Initialized)
+ throw new InvalidOperationException("Init must be called before WriteRows");
+ if (Completed)
+ throw new InvalidOperationException("Complete already called. Not allowed to write more rows");
+
+ return _rowTarget.WriteRowAsync(values);
+ }
+ }
+}
diff --git a/src/Rowbot/Execution/RowbotAsyncExecutor.cs b/src/Rowbot/Execution/RowbotAsyncExecutor.cs
new file mode 100644
index 0000000..9e707c8
--- /dev/null
+++ b/src/Rowbot/Execution/RowbotAsyncExecutor.cs
@@ -0,0 +1,111 @@
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace Rowbot.Execution
+{
+ public sealed class RowbotAsyncExecutor : IDisposable
+ {
+ private readonly AsyncSourceGuards _source;
+ private readonly AsyncTargetGuards _target;
+
+ public RowbotAsyncExecutor(IAsyncRowSource source, IAsyncRowTarget target)
+ {
+ _source = new AsyncSourceGuards(source);
+ _target = new AsyncTargetGuards(target);
+ }
+
+ public void Dispose()
+ {
+#pragma warning disable S2486 // Generic exceptions should not be ignored
+#pragma warning disable S108 // Nested blocks of code should not be left empty
+ try
+ {
+ _source.Dispose();
+ }
+ catch { }
+
+ try
+ {
+ _target.Dispose();
+ }
+ catch { }
+#pragma warning restore S108 // Nested blocks of code should not be left empty
+#pragma warning restore S2486 // Generic exceptions should not be ignored
+ }
+
+ public async Task ExecuteAsync()
+ {
+ // Columns
+ var columnNames = await _source.InitAndGetColumnsAsync();
+ await _target.InitAsync(columns: columnNames);
+
+ // Rows
+ var valuesBuffer = new object[columnNames.Length];
+ while (await _source.ReadRowAsync(valuesBuffer))
+ {
+ await _target.WriteRowAsync(valuesBuffer);
+ }
+
+ await _source.CompleteAsync();
+ await _target.CompleteAsync();
+
+ Dispose();
+ }
+ }
+
+ public sealed class RowbotAsyncEnumerableExecutor : IDisposable
+ {
+ private readonly AsyncSourceGuards _source;
+ private readonly AsyncEnumerableTargetGuards _target;
+
+ public RowbotAsyncEnumerableExecutor(IAsyncRowSource source, IAsyncEnumerableRowTarget target)
+ {
+ _source = new AsyncSourceGuards(source);
+ _target = new AsyncEnumerableTargetGuards(target);
+ }
+
+ public Task ExecuteAsync(Func, Task> consumer)
+ {
+ return consumer(ExecuteInternal());
+ }
+
+ private async IAsyncEnumerable ExecuteInternal()
+ {
+ // Columns
+ var columnNames = await _source.InitAndGetColumnsAsync();
+ await _target.InitAsync(columns: columnNames);
+
+ // Rows
+ var valuesBuffer = new object[columnNames.Length];
+ while (await _source.ReadRowAsync(valuesBuffer))
+ {
+ yield return await _target.WriteRowAsync(valuesBuffer);
+ }
+
+ await _source.CompleteAsync();
+ await _target.CompleteAsync();
+
+ Dispose();
+ }
+
+ public void Dispose()
+ {
+#pragma warning disable S2486 // Generic exceptions should not be ignored
+#pragma warning disable S108 // Nested blocks of code should not be left empty
+ try
+ {
+ _source.Dispose();
+ }
+ catch { }
+
+ try
+ {
+ _target.Dispose();
+ }
+ catch { }
+#pragma warning restore S108 // Nested blocks of code should not be left empty
+#pragma warning restore S2486 // Generic exceptions should not be ignored
+ }
+ }
+}
diff --git a/src/Rowbot/Execution/RowbotAsyncExecutorBuilder.cs b/src/Rowbot/Execution/RowbotAsyncExecutorBuilder.cs
new file mode 100644
index 0000000..aa93a07
--- /dev/null
+++ b/src/Rowbot/Execution/RowbotAsyncExecutorBuilder.cs
@@ -0,0 +1,113 @@
+using CsvHelper.Configuration;
+using Rowbot.CsvHelper;
+using Rowbot.Sources;
+using Rowbot.Targets;
+using System;
+using System.Collections.Generic;
+using System.Data;
+using System.IO;
+
+namespace Rowbot.Execution
+{
+ public class RowbotAsyncExecutorBuilder
+ {
+ private IAsyncRowSource _rowSource;
+
+ public RowbotAsyncExecutorBuilder()
+ {
+ }
+
+ public RowbotAsyncExecutorBuilder FromDataTable(DataTable dataTable)
+ {
+ return SetSource(new AsyncDataReaderSource(dataTable.CreateDataReader()));
+ }
+ public RowbotAsyncExecutorBuilder FromDataReader(IDataReader dataReader)
+ {
+ return SetSource(new AsyncDataReaderSource(dataReader));
+ }
+ public RowbotAsyncExecutorBuilder FromObjects(IAsyncEnumerable objects)
+ {
+ return SetSource(AsyncPropertyReflectionSource.Create(objects));
+ }
+
+ public RowbotAsyncExecutorBuilder FromDynamic(IAsyncEnumerable objects)
+ {
+ return SetSource(new AsyncDynamicObjectSource(objects));
+ }
+
+ public RowbotAsyncExecutorBuilder FromCsvByCsvHelper(Stream inputStream, CsvConfiguration csvConfiguration, bool readFirstLineAsHeaders)
+ {
+ return SetSource(new AsyncCsvHelperSource(stream: inputStream, configuration: csvConfiguration, readFirstLineAsHeaders: readFirstLineAsHeaders));
+ }
+ public RowbotAsyncExecutorBuilder FromCsvByCsvHelper(string filepath, CsvConfiguration csvConfiguration, bool readFirstLineAsHeaders)
+ {
+ var fs = File.Create(filepath);
+ return SetSource(new AsyncCsvHelperSource(stream: fs, configuration: csvConfiguration, readFirstLineAsHeaders: readFirstLineAsHeaders));
+ }
+ public RowbotAsyncExecutorBuilder From(IAsyncRowSource customRowSource)
+ {
+ return SetSource(customRowSource);
+ }
+
+ private RowbotAsyncExecutorBuilder SetSource(IAsyncRowSource rowSource)
+ {
+ if (rowSource is null)
+ {
+ throw new ArgumentNullException(nameof(rowSource));
+ }
+
+ if (_rowSource != null)
+ throw new ArgumentException("Source already defined in this builder");
+
+ _rowSource = rowSource;
+ return this;
+ }
+
+ public RowbotAsyncExecutor ToCsvUsingCsvHelper(Stream outputStream, CsvConfiguration config, bool writeHeaders, bool leaveOpen = false)
+ {
+ return To(new AsyncCsvHelperTarget(stream: outputStream, configuration: config, writeHeaders: writeHeaders, leaveOpen: leaveOpen));
+ }
+
+ public RowbotAsyncExecutor ToCsvUsingCsvHelper(string filepath, CsvConfiguration config, bool writeHeaders)
+ {
+ var fs = File.Create(filepath);
+ return To(new AsyncCsvHelperTarget(stream: fs, configuration: config, writeHeaders: writeHeaders, leaveOpen: false));
+ }
+
+ public RowbotAsyncExecutor ToExcel(Stream outputStream, string sheetName, bool writeHeaders, bool leaveOpen = false)
+ {
+ return To(new AsyncExcelTarget(outputStream: outputStream, sheetName: sheetName, writeHeaders: writeHeaders, leaveOpen: leaveOpen));
+ }
+
+ public RowbotAsyncExecutor ToExcel(string filepath, string sheetName, bool writeHeaders)
+ {
+ var fs = File.Create(filepath);
+ return To(new AsyncExcelTarget(outputStream: fs, sheetName: sheetName, writeHeaders: writeHeaders, leaveOpen: false));
+ }
+
+ public RowbotAsyncExecutor ToDataTable(DataTable tableToFill)
+ {
+ return To(new AsyncDataTableTarget(tableToFill));
+ }
+
+ public RowbotAsyncExecutor ToDataReader()
+ {
+ throw new NotImplementedException();
+ }
+
+ public RowbotAsyncEnumerableExecutor ToObjects() where TObjectType : new()
+ {
+ return ToCustomTarget(new AsyncPropertyReflectionTarget());
+ }
+
+ public RowbotAsyncExecutor To(IAsyncRowTarget target)
+ {
+ return new RowbotAsyncExecutor(_rowSource, target);
+ }
+
+ public RowbotAsyncEnumerableExecutor ToCustomTarget(IAsyncEnumerableRowTarget target)
+ {
+ return new RowbotAsyncEnumerableExecutor(source: _rowSource, target: target);
+ }
+ }
+}
diff --git a/src/Rowbot/IAsyncEnumerableRowTarget.cs b/src/Rowbot/IAsyncEnumerableRowTarget.cs
new file mode 100644
index 0000000..c1b9ec3
--- /dev/null
+++ b/src/Rowbot/IAsyncEnumerableRowTarget.cs
@@ -0,0 +1,11 @@
+using System.Threading.Tasks;
+
+namespace Rowbot
+{
+ public interface IAsyncEnumerableRowTarget
+ {
+ Task InitAsync(ColumnInfo[] columns);
+ Task WriteRowAsync(object[] values);
+ Task CompleteAsync();
+ }
+}
diff --git a/src/Rowbot/IAsyncRowSource.cs b/src/Rowbot/IAsyncRowSource.cs
new file mode 100644
index 0000000..5b1cc83
--- /dev/null
+++ b/src/Rowbot/IAsyncRowSource.cs
@@ -0,0 +1,11 @@
+using System.Threading.Tasks;
+
+namespace Rowbot
+{
+ public interface IAsyncRowSource
+ {
+ Task InitAndGetColumnsAsync();
+ Task ReadRowAsync(object[] values);
+ Task CompleteAsync();
+ }
+}
diff --git a/src/Rowbot/IAsyncRowTarget.cs b/src/Rowbot/IAsyncRowTarget.cs
new file mode 100644
index 0000000..a0ab6f8
--- /dev/null
+++ b/src/Rowbot/IAsyncRowTarget.cs
@@ -0,0 +1,11 @@
+using System.Threading.Tasks;
+
+namespace Rowbot
+{
+ public interface IAsyncRowTarget
+ {
+ Task InitAsync(ColumnInfo[] columns);
+ Task WriteRowAsync(object[] values);
+ Task CompleteAsync();
+ }
+}
diff --git a/src/Rowbot/Rowbot.csproj b/src/Rowbot/Rowbot.csproj
index d0d3cf2..5471375 100644
--- a/src/Rowbot/Rowbot.csproj
+++ b/src/Rowbot/Rowbot.csproj
@@ -9,6 +9,7 @@
Stephan Moeller
https://github.com/StephanMoeller/Rowbot
Fastest possible excel-writer with extremely low memory consumption.
+ 8
diff --git a/src/Rowbot/Sources/AsyncDataReaderSource.cs b/src/Rowbot/Sources/AsyncDataReaderSource.cs
new file mode 100644
index 0000000..c90e864
--- /dev/null
+++ b/src/Rowbot/Sources/AsyncDataReaderSource.cs
@@ -0,0 +1,65 @@
+using System;
+using System.Linq;
+using System.Data;
+using System.Threading.Tasks;
+
+namespace Rowbot.Sources
+{
+ public sealed class AsyncDataReaderSource : IAsyncRowSource, IDisposable
+ {
+ private readonly IDataReader _dataReader;
+ private readonly bool _leaveOpen;
+
+ public AsyncDataReaderSource(IDataReader dataReader, bool leaveOpen = false)
+ {
+ _dataReader = dataReader ?? throw new ArgumentNullException(nameof(dataReader));
+ _leaveOpen = leaveOpen;
+ }
+
+ public Task CompleteAsync()
+ {
+ if (!_leaveOpen)
+ {
+ _dataReader.Close();
+ }
+
+ return Task.CompletedTask;
+ }
+
+ public void Dispose()
+ {
+ if (!_leaveOpen)
+ {
+ _dataReader.Dispose();
+ }
+ }
+
+ public Task InitAndGetColumnsAsync()
+ {
+ var columnInfos = _dataReader.GetSchemaTable().Rows.Cast().Select(row => new ColumnInfo(name: (string)row["ColumnName"], valueType: (Type)row["DataType"])).ToArray();
+ return Task.FromResult(columnInfos);
+ }
+
+ public Task ReadRowAsync(object[] values)
+ {
+ if (_dataReader.Read())
+ {
+ _dataReader.GetValues(values);
+
+ // Replace DBNull with null
+ for (var i = 0; i < values.Length; i++)
+ {
+ if (values[i] == DBNull.Value)
+ {
+ values[i] = null;
+ }
+ }
+ return Task.FromResult(true);
+ }
+ else
+ {
+ return Task.FromResult(false);
+ }
+ }
+ }
+}
diff --git a/src/Rowbot/Sources/AsyncDynamicObjectSource.cs b/src/Rowbot/Sources/AsyncDynamicObjectSource.cs
new file mode 100644
index 0000000..4579116
--- /dev/null
+++ b/src/Rowbot/Sources/AsyncDynamicObjectSource.cs
@@ -0,0 +1,83 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+
+namespace Rowbot.Sources
+{
+ public class AsyncDynamicObjectSource : IAsyncRowSource, IDisposable
+ {
+ private readonly IAsyncEnumerator _enumerator;
+ private string[] _keys;
+ private bool _hasCurrentItem = true;
+ public AsyncDynamicObjectSource(IAsyncEnumerable objects)
+ {
+ if (objects is null)
+ {
+ throw new ArgumentNullException(nameof(objects));
+ }
+
+ this._enumerator = objects.GetAsyncEnumerator();
+ }
+
+ public async Task InitAndGetColumnsAsync()
+ {
+ _hasCurrentItem = await _enumerator.MoveNextAsync();
+ if (!_hasCurrentItem)
+ {
+ // There are no rows at all - return an empty column list as there are no dynamic objects available to reveal the properties
+ _keys = Array.Empty();
+ return Array.Empty();
+ }
+
+ var current = (IDictionary)_enumerator.Current;
+ _keys = current.Keys.ToArray();
+ return _keys.Select(k => new ColumnInfo(name: k, valueType: typeof(object))).ToArray();
+ }
+
+ public async Task ReadRowAsync(object[] values)
+ {
+ if (!_hasCurrentItem)
+ return false;
+
+ var current = (IDictionary)_enumerator.Current;
+
+ AssertEqualsExpectedKeysOrThrow(current.Keys);
+
+ for (var i = 0; i < _keys.Length; i++)
+ {
+ values[i] = current[_keys[i]];
+ }
+
+ _hasCurrentItem = await _enumerator.MoveNextAsync();
+ return true;
+ }
+
+ private void AssertEqualsExpectedKeysOrThrow(ICollection keys)
+ {
+ if (keys.Count != _keys.Length)
+ throw new DynamicObjectsNotIdenticalException($"Two dynamic objects was not identical in collection. One had keys: [{string.Join(",", _keys)}] and another one had keys: [{string.Join(", ", keys)}]");
+
+ int i = 0;
+ foreach(var key in keys)
+ {
+ if (key != _keys[i])
+ {
+ throw new DynamicObjectsNotIdenticalException($"Two dynamic objects was not identical in collection. One had keys: [{string.Join(",", _keys)}] and another one had keys: [{string.Join(", ", keys)}]");
+ }
+ i++;
+ }
+ }
+
+ public Task CompleteAsync()
+ {
+ // Nothing to complete
+ return Task.CompletedTask;
+ }
+
+ public void Dispose()
+ {
+ // Nothing to dispose
+ }
+ }
+}
diff --git a/src/Rowbot/Sources/AsyncPropertyReflectionSource.cs b/src/Rowbot/Sources/AsyncPropertyReflectionSource.cs
new file mode 100644
index 0000000..631973c
--- /dev/null
+++ b/src/Rowbot/Sources/AsyncPropertyReflectionSource.cs
@@ -0,0 +1,91 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.Threading.Tasks;
+
+namespace Rowbot.Sources
+{
+ public class AsyncPropertyReflectionSource : IAsyncRowSource
+ {
+ private readonly IAsyncEnumerator _elements;
+ private readonly PropertyInfo[] _properties;
+ private readonly ColumnInfo[] _columns;
+ private AsyncPropertyReflectionSource(IAsyncEnumerator elements, Type elementType)
+ {
+ if (elements is null)
+ {
+ throw new ArgumentNullException(nameof(elements));
+ }
+
+ if (elementType is null)
+ {
+ throw new ArgumentNullException(nameof(elementType));
+ }
+
+ _elements = elements;
+ _properties = elementType.GetProperties();
+ _columns = _properties.Select(p => new ColumnInfo(name: p.Name, valueType: p.PropertyType)).ToArray();
+ }
+
+ public static AsyncPropertyReflectionSource Create(IAsyncEnumerable elements)
+ {
+ var t = typeof(T);
+ if (t == typeof(object)) // Good enough. This will cover the dynamic case but also if someone adds raw object elements which would make no sense anyway.
+ throw new ArgumentException("Dynamic objects not supported in " + nameof(PropertyReflectionSource) + ".");
+
+ if (elements is null)
+ {
+ throw new ArgumentNullException(nameof(elements));
+ }
+
+ return new AsyncPropertyReflectionSource(elements.GetAsyncEnumerator(), typeof(T));
+ }
+
+ public static AsyncPropertyReflectionSource Create(IAsyncEnumerator elements)
+ {
+ if (elements is null)
+ {
+ throw new ArgumentNullException(nameof(elements));
+ }
+
+ return new AsyncPropertyReflectionSource(elements, typeof(T));
+ }
+
+
+ public void Dispose()
+ {
+ // Nothing to dispose
+ }
+
+ public Task CompleteAsync()
+ {
+ // Nothing to complete in this source
+ return Task.CompletedTask;
+ }
+
+ public Task InitAndGetColumnsAsync()
+ {
+ return Task.FromResult(_columns);
+ }
+
+ public async Task ReadRowAsync(object[] values)
+ {
+ if (values.Length != _properties.Length)
+ throw new ArgumentException($"Object array of size {values.Length} provided, but {_properties.Length} columns exist");
+
+ if (await _elements.MoveNextAsync())
+ {
+ for (var i = 0; i < _properties.Length; i++)
+ {
+ values[i] = _properties[i].GetValue(_elements.Current);
+ }
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+}
diff --git a/src/Rowbot/Sources/DynamicObjectSource.cs b/src/Rowbot/Sources/DynamicObjectSource.cs
index 447d5d6..2daabfd 100644
--- a/src/Rowbot/Sources/DynamicObjectSource.cs
+++ b/src/Rowbot/Sources/DynamicObjectSource.cs
@@ -79,9 +79,4 @@ public void Dispose()
// Nothing to dispose
}
}
-
- public class DynamicObjectsNotIdenticalException : Exception
- {
- public DynamicObjectsNotIdenticalException(string message) : base(message) { }
- }
}
diff --git a/src/Rowbot/Sources/DynamicObjectsNotIdenticalException.cs b/src/Rowbot/Sources/DynamicObjectsNotIdenticalException.cs
new file mode 100644
index 0000000..20344ef
--- /dev/null
+++ b/src/Rowbot/Sources/DynamicObjectsNotIdenticalException.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace Rowbot.Sources
+{
+ public class DynamicObjectsNotIdenticalException : Exception
+ {
+ public DynamicObjectsNotIdenticalException(string message) : base(message) { }
+ }
+}
\ No newline at end of file
diff --git a/src/Rowbot/Targets/AsyncDataTableTarget.cs b/src/Rowbot/Targets/AsyncDataTableTarget.cs
new file mode 100644
index 0000000..6a94347
--- /dev/null
+++ b/src/Rowbot/Targets/AsyncDataTableTarget.cs
@@ -0,0 +1,45 @@
+using System;
+using System.Data;
+using System.Threading.Tasks;
+
+namespace Rowbot.Targets
+{
+ public class AsyncDataTableTarget : IAsyncRowTarget
+ {
+ private readonly DataTable _table = null;
+
+ public AsyncDataTableTarget(DataTable tableToFill)
+ {
+ if (tableToFill.Columns.Count > 0 || tableToFill.Rows.Count > 0)
+ throw new ArgumentException("Provided table must be empty. Columns and/or rows found.");
+ _table = tableToFill;
+ }
+
+ public Task CompleteAsync()
+ {
+ return Task.CompletedTask;
+ }
+
+ public Task InitAsync(ColumnInfo[] columns)
+ {
+ foreach (var columnInfo in columns)
+ {
+ _table.Columns.Add(new DataColumn(columnName: columnInfo.Name, dataType: columnInfo.ValueType));
+ }
+
+ return Task.CompletedTask;
+ }
+
+ public Task WriteRowAsync(object[] values)
+ {
+ var row = _table.NewRow();
+ for (var i = 0; i < values.Length; i++)
+ {
+ row[i] = values[i] ?? DBNull.Value;
+ }
+ _table.Rows.Add(row);
+
+ return Task.CompletedTask;
+ }
+ }
+}
diff --git a/src/Rowbot/Targets/AsyncDynamicObjectTarget.cs b/src/Rowbot/Targets/AsyncDynamicObjectTarget.cs
new file mode 100644
index 0000000..eb6b671
--- /dev/null
+++ b/src/Rowbot/Targets/AsyncDynamicObjectTarget.cs
@@ -0,0 +1,44 @@
+using System;
+using System.Collections.Generic;
+using System.Dynamic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Rowbot.Targets
+{
+ public class AsyncDynamicObjectTarget : IDisposable // Not possible: : IEnumerableRowTarget
+ {
+ private string[] _columnNames;
+
+ public AsyncDynamicObjectTarget()
+ {
+ }
+
+ public void Dispose()
+ {
+ }
+
+ public Task CompleteAsync()
+ {
+ return Task.CompletedTask;
+ }
+
+ public Task InitAsync(ColumnInfo[] columns)
+ {
+ _columnNames = columns.Select(c => c.Name).ToArray();
+
+ return Task.CompletedTask;
+ }
+
+ public Task WriteRowAsync(object[] values)
+ {
+ IDictionary newObject = new ExpandoObject();
+ for (var i = 0; i < values.Length; i++)
+ {
+ newObject.Add(_columnNames[i], values[i]);
+ }
+ return Task.FromResult(newObject);
+ }
+ }
+}
diff --git a/src/Rowbot/Targets/AsyncExcelTarget.cs b/src/Rowbot/Targets/AsyncExcelTarget.cs
new file mode 100644
index 0000000..64971ec
--- /dev/null
+++ b/src/Rowbot/Targets/AsyncExcelTarget.cs
@@ -0,0 +1,338 @@
+using ICSharpCode.SharpZipLib.Zip;
+using System;
+using System.Globalization;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Rowbot.Targets
+{
+ public class AsyncExcelTarget : IAsyncRowTarget, IDisposable
+ {
+ private readonly Stream _outputStream;
+ private readonly string _sheetName;
+ private readonly bool _writeHeaders;
+ private readonly bool _leaveOpen;
+ private ColumnInfo[] _columns;
+ private readonly ZipOutputStream _zipOutputStream;
+ private readonly UTF8Encoding _utf8;
+ private byte[] _buffer = new byte[1024];
+ private int _bufferIndex = 0;
+ private static readonly string _columnChars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+ private string[] _excelColumnNames = null;
+ private int _rowIndex = 0;
+ private string _cache_minMaxColString;
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ /// A number from 0-9 where 0 means no compression and 9 means max. The higher the number, the smaller output size, but the more execution time.
+ ///
+ public AsyncExcelTarget(Stream outputStream, string sheetName, bool writeHeaders, bool leaveOpen = false, int compressionLevel = 1)
+ {
+ if (string.IsNullOrEmpty(sheetName))
+ {
+ throw new ArgumentException($"'{nameof(sheetName)}' cannot be null or empty.", nameof(sheetName));
+ }
+
+ _zipOutputStream = new ZipOutputStream(baseOutputStream: outputStream, bufferSize: 8_000_000); // 8MB buffer chosen out of blue air
+ _zipOutputStream.SetLevel(compressionLevel);
+ _zipOutputStream.IsStreamOwner = !leaveOpen;
+ _outputStream = outputStream;
+ _sheetName = sheetName;
+ _writeHeaders = writeHeaders;
+ _leaveOpen = leaveOpen;
+ _utf8 = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false);
+ }
+
+ public void Dispose()
+ {
+ _zipOutputStream?.Dispose();
+ if (!_leaveOpen)
+ {
+ _outputStream.Dispose();
+ }
+ }
+
+ public static string GetColumnName(int oneBasedColumnIndex)
+ {
+ StringBuilder sb = new StringBuilder();
+ var remainder = oneBasedColumnIndex;
+ while (remainder > 0)
+ {
+ var nextIndex = (remainder - 1) % 26;
+ sb.Insert(0, _columnChars[nextIndex]);
+ remainder -= nextIndex + 1;
+ remainder /= 26;
+ }
+ return sb.ToString();
+ }
+
+ public async Task InitAsync(ColumnInfo[] columns)
+ {
+ _columns = columns;
+ await WriteStaticFilesToArchiveAsync();
+
+ await _zipOutputStream.PutNextEntryAsync(new ZipEntry("xl/worksheets/sheet1.xml"));
+
+ WriteSheetStartToSheetStream();
+ _excelColumnNames = new string[columns.Length];
+
+ for (int i = 0; i < columns.Length; i++)
+ {
+ _excelColumnNames[i] = GetColumnName(oneBasedColumnIndex: i + 1);
+ };
+
+ _rowIndex = 1;
+
+ int minCol = 1;
+ int maxCol = _columns.Length;
+ _cache_minMaxColString = $"{minCol}:{maxCol}";
+
+ if (_writeHeaders)
+ {
+ await WriteRowAsync(columns.Select(c => c.Name).Cast