Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The StreamMetadata interface finalization #125

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/Sinks/Extensions/StreamMetadataExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using Akka.Util;
using Arcane.Framework.Sinks.Models;
using Arcane.Framework.Sinks.Services;
using Arcane.Framework.Sinks.Services.Base;
using Arcane.Framework.Sinks.Services.StreamMetadata.V1;
using Arcane.Framework.Sinks.Services.StreamMetadata.V0;
using Snd.Sdk.Storage.Base;

namespace Arcane.Framework.Sinks.Extensions;
Expand All @@ -14,16 +14,17 @@ public static class StreamMetadataExtensions
/// <summary>
/// Creates a new IMetadataWriter instance from the StreamMetadata instance
/// </summary>
/// <param name="streamMetadata">The object that hods the stream metadata</param>
/// <param name="maybeStreamMetadata">The object that hods the stream metadata</param>
/// <param name="writer">Blob Storage writer to be used to perform the write operation.</param>
/// <param name="basePath">The Sink root directory</param>
/// <returns>
/// Metadata writer instance that can write one or more metadata fields to the given root directory.
/// </returns>
public static IMetadataWriter ToStreamMetadataWriter(this StreamMetadata streamMetadata,
public static IMetadataWriter ToStreamMetadataWriter(this Option<StreamMetadata> maybeStreamMetadata,
IBlobStorageWriter writer,
string basePath)
{
return new PartitionsWriter(streamMetadata.Partitions, writer, basePath);
var metadata = maybeStreamMetadata.GetOrElse(new StreamMetadata(Option<StreamPartition[]>.None));
return new PartitionsWriter(metadata.Partitions, writer, basePath);
}
}
11 changes: 5 additions & 6 deletions src/Sinks/Json/MultilineJsonSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Akka.Event;
using Akka.Streams;
using Akka.Streams.Stage;
using Akka.Util;
using Arcane.Framework.Sinks.Extensions;
using Arcane.Framework.Sinks.Models;
using Arcane.Framework.Sinks.Parquet;
Expand All @@ -31,9 +32,8 @@ public class MultilineJsonSink : GraphStageWithMaterializedValue<SinkShape<List<
private readonly string schemaPathSegment;
private readonly Schema sinkSchema;
private readonly IBlobStorageWriter storageWriter;
private readonly StreamMetadata streamMetadata;
private readonly Option<StreamMetadata> streamMetadata;
private readonly string metadataSinkPathSegment;
private readonly StreamMetadata metadata;

/// <summary>
/// Creates a new instance of <see cref="JsonSink"/>
Expand All @@ -45,7 +45,7 @@ private MultilineJsonSink(
string schemaPathSegment,
Schema sinkSchema,
bool dropCompletionToken,
StreamMetadata streamMetadata,
Option<StreamMetadata> streamMetadata,
string metadataSinkPathSegment)
{
this.storageWriter = storageWriter;
Expand All @@ -56,7 +56,6 @@ private MultilineJsonSink(
this.schemaPathSegment = schemaPathSegment;
this.streamMetadata = streamMetadata;
this.metadataSinkPathSegment = metadataSinkPathSegment;
this.metadata = streamMetadata;

this.Shape = new SinkShape<List<JsonElement>>(this.In);
}
Expand Down Expand Up @@ -88,7 +87,7 @@ public static MultilineJsonSink Create(
IBlobStorageWriter storageWriter,
string jsonSinkPath,
Schema sinkSchema,
StreamMetadata streamMetadata,
Option<StreamMetadata> streamMetadata,
string dataPathSegment = "data",
string schemaPathSegment = "schema",
bool dropCompletionToken = false,
Expand Down Expand Up @@ -128,7 +127,7 @@ public SinkLogic(MultilineJsonSink sink, TaskCompletionSource<NotUsed> taskCompl
{
this.sink = sink;
this.taskCompletion = taskCompletion;
this.metadataWriter = sink.metadata.ToStreamMetadataWriter(this.sink.storageWriter, this.GetMetadataPath());
this.metadataWriter = sink.streamMetadata.ToStreamMetadataWriter(this.sink.storageWriter, this.GetMetadataPath());
this.decider = Decider.From((ex) => ex.GetType().Name switch
{
nameof(ArgumentException) => Directive.Stop,
Expand Down
7 changes: 4 additions & 3 deletions src/Sinks/Parquet/ParquetSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Akka.Event;
using Akka.Streams;
using Akka.Streams.Stage;
using Akka.Util;
using Arcane.Framework.Sinks.Extensions;
using Arcane.Framework.Sinks.Models;
using Arcane.Framework.Sinks.Services.Base;
Expand Down Expand Up @@ -34,14 +35,14 @@ public class ParquetSink : GraphStageWithMaterializedValue<SinkShape<List<Parque
private readonly string schemaSinkPathSegment;
private readonly string metadataSinkPathSegment;
private readonly IBlobStorageWriter storageWriter;
private readonly StreamMetadata metadata;
private readonly Option<StreamMetadata> metadata;

/// <summary>
/// Creates a new instance of <see cref="ParquetSink"/>
/// </summary>
private ParquetSink(Schema parquetSchema, IBlobStorageWriter storageWriter, string parquetFilePath,
int rowGroupsPerFile, bool createSchemaFile, bool partitionByDate, string dataSinkPathSegment,
string schemaSinkPathSegment, bool dropCompletionToken, StreamMetadata streamMetadata, string metadataSinkPathSegment)
string schemaSinkPathSegment, bool dropCompletionToken, Option<StreamMetadata> streamMetadata, string metadataSinkPathSegment)
{
this.parquetSchema = parquetSchema;
this.storageWriter = storageWriter;
Expand Down Expand Up @@ -88,7 +89,7 @@ private ParquetSink(Schema parquetSchema, IBlobStorageWriter storageWriter, stri
/// <param name="metadataSinkPathSegment">Folder name to emit metadata</param>
/// <returns></returns>
public static ParquetSink Create(Schema parquetSchema, IBlobStorageWriter storageWriter, string parquetFilePath,
StreamMetadata streamMetadata, int rowGroupsPerFile = 1, bool createSchemaFile = false,
Option<StreamMetadata> streamMetadata, int rowGroupsPerFile = 1, bool createSchemaFile = false,
bool partitionByDate = false, string dataSinkPathSegment = "data", string schemaSinkPathSegment = "schema",
bool dropCompletionToken = false, string metadataSinkPathSegment = "metadata")
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using Arcane.Framework.Sinks.Services.Base;
using Snd.Sdk.Storage.Base;

namespace Arcane.Framework.Sinks.Services.StreamMetadata.V1;
namespace Arcane.Framework.Sinks.Services.StreamMetadata.V0;

/// <summary>
/// A stream metadata writer that writes partitions metadata to a blob storage
Expand Down
20 changes: 20 additions & 0 deletions test/SinkTests/MultilineJsonSinkTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,26 @@ public async Task RemovesEmptyStreamMetadata()
this.mockBlobStorageService.Verify(m => m.RemoveBlob($"{basePath}/metadata", "v0/partitions.json"), Times.Once);
}

[Fact]
public async Task RemovesIfNoMetadataProvided()
{
var basePath = "s3a://bucket/path";
var mockIn = Enumerable
.Range(0, 10)
.Select(_ => Enumerable.Range(0, 1).Select(ix => JsonSerializer.Deserialize<JsonElement>(JsonSerializer.Serialize(new { Value = ix }))))
.ToList();
var schema = new Schema(new DataField("test", DataType.Int32));

var sink = MultilineJsonSink.Create(this.mockBlobStorageService.Object,
basePath,
schema,
Option<StreamMetadata>.None);

await Source.From(mockIn).Select(v => v.ToList()).RunWith(sink, this.akkaFixture.Materializer);

this.mockBlobStorageService.Verify(m => m.RemoveBlob($"{basePath}/metadata", "v0/partitions.json"), Times.Once);
}

[Fact]
public async Task OverwritesExistingSchemaMetadata()
{
Expand Down
23 changes: 23 additions & 0 deletions test/SinkTests/ParquetSinkTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,29 @@ public async Task RemovesEmptyStreamMetadata()
this.mockBlobStorageService.Verify(m => m.RemoveBlob($"{basePath}/metadata", "v0/partitions.json"), Times.Once);
}

[Fact]
public async Task RemovesIfNoMetadataProvided()
{
var basePath = "s3a://bucket/path";
var columns = Enumerable
.Range(0, 10)
.Select(col => new DataColumn(new DataField<int?>(col.ToString()), Enumerable.Range(0, 10).ToArray()))
.ToList();
var schema = new Schema(columns.Select(c => c.Field).ToList());

var sink = ParquetSink.Create(schema,
this.mockBlobStorageService.Object,
basePath,
Option<StreamMetadata>.None,
5,
true,
false);

await Source.From(Enumerable.Range(0, 10).Select(_ => columns.ToList())).RunWith(sink, this.akkaFixture.Materializer);

this.mockBlobStorageService.Verify(m => m.RemoveBlob($"{basePath}/metadata", "v0/partitions.json"), Times.Once);
}

[Fact]
public async Task OverwritesExistingSchemaMetadata()
{
Expand Down