Skip to content

Commit

Permalink
Initial work on async support in Rowbot
Browse files Browse the repository at this point in the history
  • Loading branch information
kristho91 committed Mar 12, 2024
1 parent acfb1f4 commit 763b1b6
Show file tree
Hide file tree
Showing 46 changed files with 3,776 additions and 218 deletions.
1 change: 1 addition & 0 deletions src/Rowbot.ClosedXml/Rowbot.ClosedXml.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<LangVersion>8</LangVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
87 changes: 87 additions & 0 deletions src/Rowbot/CsvHelper/AsyncCsvHelperSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using CsvHelper;
using CsvHelper.Configuration;
using System.IO;
using System.Linq;
using System.Threading.Tasks;

namespace Rowbot.CsvHelper
{
public class AsyncCsvHelperSource : IAsyncRowSource
{
private readonly CsvReader _csvReader;
private readonly bool _readFirstLineAsHeaders = false;
private int _readCallCount = 0;

public AsyncCsvHelperSource(Stream stream, CsvConfiguration configuration, bool readFirstLineAsHeaders) : this(new CsvReader(new StreamReader(stream), configuration), readFirstLineAsHeaders: readFirstLineAsHeaders)
{
}

public AsyncCsvHelperSource(CsvReader csvReader, bool readFirstLineAsHeaders)
{
_csvReader = csvReader;
_readFirstLineAsHeaders = readFirstLineAsHeaders;
}

public void Dispose()
{
_csvReader.Dispose();
}

public Task CompleteAsync()
{
// Nothing to complete
return Task.CompletedTask;
}

public async Task<ColumnInfo[]> InitAndGetColumnsAsync()
{
await _csvReader.ReadAsync();
_csvReader.ReadHeader();

if (_readFirstLineAsHeaders)
{
var columns = _csvReader.HeaderRecord.Select(header => new ColumnInfo(name: header, valueType: typeof(string))).ToArray();
return columns;
}
else
{
var columns = new ColumnInfo[_csvReader.HeaderRecord.Length];
for (var i = 0; i < _csvReader.HeaderRecord.Length; i++)
{
columns[i] = new ColumnInfo(name: $"Column{i + 1}", valueType: typeof(string));
}
return columns;
}
}

public async Task<bool> ReadRowAsync(object[] values)
{
_readCallCount++;
if (_readCallCount == 1 && !_readFirstLineAsHeaders)
{
// First line should not be read as headers but as data. Copy from the headers line onto the values buffer
for (var i = 0; i < values.Length; i++)
{
values[i] = _csvReader.HeaderRecord[i];
}
return true;
}
else
{
if (await _csvReader.ReadAsync())
{

for (var i = 0; i < values.Length; i++)
{
values[i] = _csvReader.GetField(i);
}
return true;
}
else
{
return false;
}
}
}
}
}
80 changes: 80 additions & 0 deletions src/Rowbot/CsvHelper/AsyncCsvHelperTarget.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
using CsvHelper;
using CsvHelper.Configuration;
using System;
using System.IO;
using System.Threading.Tasks;

namespace Rowbot.CsvHelper
{
public sealed class AsyncCsvHelperTarget : IAsyncRowTarget, IDisposable
{
private readonly CsvWriter _csvWriter;
private readonly bool _writeHeaders;
private int _unflushedRowCount = 0;
private bool _firstWrite = true;

public AsyncCsvHelperTarget(Stream stream, CsvConfiguration configuration, bool writeHeaders = true, bool leaveOpen = false) : this(new CsvWriter(new StreamWriter(stream), configuration, leaveOpen: leaveOpen))
{
_writeHeaders = writeHeaders;
}

public AsyncCsvHelperTarget(CsvWriter csvWriter)
{
_csvWriter = csvWriter;
}

public void Dispose()
{
_csvWriter?.Dispose();
}

public async Task CompleteAsync()
{
await FlushAsync();
_csvWriter?.Dispose();
}

public Task InitAsync(ColumnInfo[] columns)
{
if (_writeHeaders)
{
for (var i = 0; i < columns.Length; i++)
{
_csvWriter.WriteField(columns[i].Name);
}
_firstWrite = false;
}

return Task.CompletedTask;
}

public async Task WriteRowAsync(object[] values)
{
if (!_firstWrite)
{
await _csvWriter.NextRecordAsync();
}
for (var i = 0; i < values.Length; i++)
{
_csvWriter.WriteField(values[i]);
}
_unflushedRowCount++;
_firstWrite = false;
await FlushIfNeeded();
}

private async Task FlushIfNeeded()
{
if (_unflushedRowCount > 1000)
{
await FlushAsync();
}
}

private async Task FlushAsync()
{
await _csvWriter.FlushAsync();
_unflushedRowCount = 0;
}
}
}
63 changes: 63 additions & 0 deletions src/Rowbot/Execution/AsyncEnumerableTargetGuards.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using System;
using System.Threading.Tasks;

namespace Rowbot.Execution
{
public sealed class AsyncEnumerableTargetGuards<T> : IAsyncEnumerableRowTarget<T>, IDisposable
{
private readonly IAsyncEnumerableRowTarget<T> _rowTarget;

private bool Completed { get; set; } = false;
private bool Initialized { get; set; } = false;

public AsyncEnumerableTargetGuards(IAsyncEnumerableRowTarget<T> rowTarget)
{
_rowTarget = rowTarget ?? throw new ArgumentNullException(nameof(rowTarget));
}

public Task CompleteAsync()
{
if (!Initialized)
throw new InvalidOperationException("Init must be called before Complete()");
if (Completed)
throw new InvalidOperationException("Complete already called and can only be called once.");
Completed = true;

return _rowTarget.CompleteAsync();
}

public void Dispose()
{
(_rowTarget as IDisposable)?.Dispose();
}

public Task InitAsync(ColumnInfo[] columns)
{
if (columns is null)
{
throw new ArgumentNullException(nameof(columns));
}

if (Initialized)
throw new InvalidOperationException("Init has already been called and can only be called once.");
Initialized = true;

return _rowTarget.InitAsync(columns);
}

public Task<T> WriteRowAsync(object[] values)
{
if (values is null)
{
throw new ArgumentNullException(nameof(values));
}

if (!Initialized)
throw new InvalidOperationException("Init must be called before WriteRows");
if (Completed)
throw new InvalidOperationException("Complete already called. Not allowed to write more rows");

return _rowTarget.WriteRowAsync(values);
}
}
}
75 changes: 75 additions & 0 deletions src/Rowbot/Execution/AsyncSourceGuards.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using System;
using System.Linq;
using System.Threading.Tasks;

namespace Rowbot.Execution
{
/// <summary>
/// This helper class wraps a source and handles all input and output parameter validation as well as ensuring methods not called out of order.
/// </summary>
public sealed class AsyncSourceGuards : IAsyncRowSource, IDisposable
{
private bool Initialized { get; set; } = false;
private bool Completed { get; set; } = false;
private int _columnCount = -1;
private bool? _previousReadResult = null;
private readonly IAsyncRowSource _rowSource;

public AsyncSourceGuards(IAsyncRowSource rowSource)
{
_rowSource = rowSource ?? throw new ArgumentNullException(nameof(rowSource));
}

public async Task<ColumnInfo[]> InitAndGetColumnsAsync()
{
if (Initialized)
throw new InvalidOperationException("Already initialized");
Initialized = true;
var columns = await _rowSource.InitAndGetColumnsAsync();
if (columns == null)
throw new InvalidOperationException("Null was returned by OnInitAndGetColumns() which is not valid.");
if (columns.Any(c => c == null))
throw new InvalidOperationException("Returned columns array from OnInitAndGetColumns() contains one or more null values which is not allowed.");

_columnCount = columns.Length;
return columns;
}

public async Task<bool> ReadRowAsync(object[] values)
{
if (values is null)
{
throw new ArgumentNullException(nameof(values));
}

if (!Initialized)
throw new InvalidOperationException("Not initialized");

if (values.Length != _columnCount)
throw new InvalidOperationException($"Provided values object[] buffer contains {values.Length} slots, but column count returned earlier container {_columnCount} columns. These counts must match.");

if (_previousReadResult == false)
throw new InvalidOperationException("It is not allowed to call Read after the method has already returned false in a previous call.");

var readResult = await _rowSource.ReadRowAsync(values);
_previousReadResult = readResult;

return readResult;
}

public Task CompleteAsync()
{
if (!Initialized)
throw new InvalidOperationException("Not initialized");
if (Completed)
throw new InvalidOperationException("Already completed");
Completed = true;
return _rowSource.CompleteAsync();
}

public void Dispose()
{
(_rowSource as IDisposable)?.Dispose();
}
}
}
63 changes: 63 additions & 0 deletions src/Rowbot/Execution/AsyncTargetGuards.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using System;
using System.Threading.Tasks;

namespace Rowbot.Execution
{
public sealed class AsyncTargetGuards : IAsyncRowTarget, IDisposable
{
private readonly IAsyncRowTarget _rowTarget;

private bool Completed { get; set; } = false;
private bool Initialized { get; set; } = false;

public AsyncTargetGuards(IAsyncRowTarget rowTarget)
{
_rowTarget = rowTarget ?? throw new ArgumentNullException(nameof(rowTarget));
}

public Task CompleteAsync()
{
if (!Initialized)
throw new InvalidOperationException("Init must be called before Complete()");
if (Completed)
throw new InvalidOperationException("Complete already called and can only be called once.");
Completed = true;

return _rowTarget.CompleteAsync();
}

public void Dispose()
{
(_rowTarget as IDisposable)?.Dispose();
}

public Task InitAsync(ColumnInfo[] columns)
{
if (columns is null)
{
throw new ArgumentNullException(nameof(columns));
}

if (Initialized)
throw new InvalidOperationException("Init has already been called and can only be called once.");
Initialized = true;

return _rowTarget.InitAsync(columns);
}

public Task WriteRowAsync(object[] values)
{
if (values is null)
{
throw new ArgumentNullException(nameof(values));
}

if (!Initialized)
throw new InvalidOperationException("Init must be called before WriteRows");
if (Completed)
throw new InvalidOperationException("Complete already called. Not allowed to write more rows");

return _rowTarget.WriteRowAsync(values);
}
}
}
Loading

0 comments on commit 763b1b6

Please sign in to comment.