Skip to content

Commit

Permalink
Add some unit testsfor ParquetSink behavior (#138)
Browse files Browse the repository at this point in the history
* Add some unit tests on for ParquetSink behavior

* Minor test fixes
  • Loading branch information
s-vitaliy authored Oct 25, 2024
1 parent d6cff29 commit 2c20385
Showing 1 changed file with 91 additions and 3 deletions.
94 changes: 91 additions & 3 deletions test/SinkTests/ParquetSinkTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public async Task RemovesEmptyStreamMetadata()
true,
false);

await Source.From(Enumerable.Range(0, 10).Select(_ => columns.ToList())).RunWith(sink, this.akkaFixture.Materializer);
await Source.From(Enumerable.Range(0, 10).Select(_ => columns.ToList()))
.RunWith(sink, this.akkaFixture.Materializer);

this.mockBlobStorageService.Verify(m => m.RemoveBlob($"{basePath}/metadata", "v0/partitions.json"), Times.Once);
}
Expand Down Expand Up @@ -129,11 +130,13 @@ public async Task OverwritesExistingSchemaMetadata()
true,
false);

await Source.From(Enumerable.Range(0, 10).Select(_ => columns.ToList())).RunWith(sink, this.akkaFixture.Materializer);
await Source.From(Enumerable.Range(0, 10).Select(_ => columns.ToList()))
.RunWith(sink, this.akkaFixture.Materializer);

var expectedMetadata =
"""[{"description":"date_month","field_name":"my_column_with_date","field_format":"datetime","field_expression":null,"is_date_partition":false},{"description":"date_month","field_name":"","field_format":"","field_expression":"date_format(cast(\u0027test\u0027 as date), \u0027yyyMM\u0027)","is_date_partition":true}]""";
this.mockBlobStorageService.Verify(m => m.SaveTextAsBlob(expectedMetadata, $"{basePath}/metadata", "v0/partitions.json"), Times.Once);
this.mockBlobStorageService.Verify(
m => m.SaveTextAsBlob(expectedMetadata, $"{basePath}/metadata", "v0/partitions.json"), Times.Once);
}

[Fact]
Expand All @@ -160,4 +163,89 @@ public async Task HandleSchemaFailures()

await Assert.ThrowsAsync<Exception>(async () => await source.RunWith(sink, this.akkaFixture.Materializer));
}

[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task ParquetSinkDoesNotDropCompletionTokenOnFail(bool dropCompletionToken)
{
var columns = Enumerable.Range(0, 10)
.Select(ixCol => new DataColumn(new DataField<int?>(ixCol.ToString()), Enumerable.Range(0, 10).ToArray()))
.ToArray();

var pathString = Guid.NewGuid().ToString();
var schema = new Schema(columns.Select(c => c.Field).ToList());

var callCount = 0;
this.mockBlobStorageService.Setup(mb => mb.SaveBytesAsBlob(It.IsAny<BinaryData>(),
It.Is<string>(p => p.Contains(pathString)), It.IsAny<string>(), It.IsAny<bool>()))
.ReturnsAsync(() =>
{
if (callCount++ == 3)
{
throw new Exception("expected exception");
}
return new UploadedBlob();
});

var sink = ParquetSink.Create(
schema,
this.mockBlobStorageService.Object,
$"tmp@{pathString}",
new StreamMetadata(Option<StreamPartition[]>.None),
4,
true,
dropCompletionToken: dropCompletionToken);

var ex = await Assert.ThrowsAsync<Exception>(async () => await Source
.Repeat(columns.ToList())
.Take(10)
.RunWith(sink, this.akkaFixture.Materializer)
);

Assert.Equal("expected exception", ex.Message);
this.mockBlobStorageService.Verify(
mb => mb.SaveBytesAsBlob(It.IsAny<BinaryData>(), It.Is<string>(path => path.Contains(pathString)),
It.Is<string>(fn => fn.EndsWith(".COMPLETED")), It.IsAny<bool>()), Times.Never);
}

[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task ParquetSinkDoesNotDropCompletionTokenOnUpstreamFail(bool dropCompletionToken)
{
var columns = Enumerable.Range(0, 10)
.Select(ixCol => new DataColumn(new DataField<int?>(ixCol.ToString()), Enumerable.Range(0, 10).ToArray()))
.ToArray();

var pathString = Guid.NewGuid().ToString();
var schema = new Schema(columns.Select(c => c.Field).ToList());

this.mockBlobStorageService.Setup(mb
=> mb.SaveBytesAsBlob(It.IsAny<BinaryData>(),It.Is<string>(p => p.Contains(pathString)), It.IsAny<string>(), It.IsAny<bool>()))
.ReturnsAsync(() => new UploadedBlob());

var sink = ParquetSink.Create(
schema,
this.mockBlobStorageService.Object,
$"tmp@{pathString}",
new StreamMetadata(Option<StreamPartition[]>.None),
4,
true,
dropCompletionToken: dropCompletionToken);

var callCount = 0;
var ex = await Assert.ThrowsAsync<Exception>(async () => await Source
.Repeat(columns.ToList())
.Take(10)
.Select(c => callCount++ == 5 ? throw new Exception("expected exception"): c)
.RunWith(sink, this.akkaFixture.Materializer)
);

Assert.Equal("expected exception", ex.Message);
this.mockBlobStorageService.Verify(
mb => mb.SaveBytesAsBlob(It.IsAny<BinaryData>(), It.Is<string>(path => path.Contains(pathString)),
It.Is<string>(fn => fn.EndsWith(".COMPLETED")), It.IsAny<bool>()), Times.Never);
}
}

0 comments on commit 2c20385

Please sign in to comment.