Skip to content

Commit

Permalink
Add interruption token to ParquetSink (#139)
Browse files Browse the repository at this point in the history
Part of #128

Implemented:
- The ParquetSink now has InterruptionToken as a parameter. It will not save completion token if the stream was interrupted.
  • Loading branch information
s-vitaliy authored Oct 25, 2024
1 parent 2c20385 commit 8ffdc74
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 10 deletions.
12 changes: 12 additions & 0 deletions src/Services/Base/IInterruptionToken.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Arcane.Framework.Services.Base;

/// <summary>
/// Provides information about a stream interruption
/// </summary>
public interface IInterruptionToken
{
/// <summary>
/// Returns true if the stream was interrupted
/// </summary>
public bool IsInterrupted { get; }
}
2 changes: 1 addition & 1 deletion src/Services/Base/IStreamLifetimeService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Arcane.Framework.Services.Base;
/// <summary>
/// Service to manage the lifetime of a stream runner
/// </summary>
public interface IStreamLifetimeService: IDisposable
public interface IStreamLifetimeService: IDisposable, IInterruptionToken
{
/// <summary>
/// Add a signal to listen for to stop the stream
Expand Down
3 changes: 3 additions & 0 deletions src/Services/StreamLifetimeService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public void AddStreamTerminationSignal(PosixSignal posixSignal)
/// <inheritdoc cref="IStreamLifetimeService.IsStopRequested"/>>
public bool IsStopRequested { get; private set; }

/// <inheritdoc cref="IStreamLifetimeService.IsStopRequested"/>>
public bool IsInterrupted => this.IsStopRequested;

/// <summary>
/// Stops the stream and sets the stop requested flag.
/// </summary>
Expand Down
46 changes: 38 additions & 8 deletions src/Sinks/Parquet/ParquetSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
using Akka.Actor;
using Akka.Event;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Streams.Stage;
using Arcane.Framework.Sinks.Extensions;
using Arcane.Framework.Sinks.Models;
using Arcane.Framework.Sinks.Services.Base;
using Akka.Util;
using Akka.Util.Extensions;
using Arcane.Framework.Services.Base;
using Parquet;
using Parquet.Data;
using Snd.Sdk.Storage.Base;
Expand All @@ -37,13 +39,23 @@ public class ParquetSink : GraphStageWithMaterializedValue<SinkShape<List<Parque
private readonly string metadataSinkPathSegment;
private readonly IBlobStorageWriter storageWriter;
private readonly StreamMetadata metadata;
private readonly IInterruptionToken interruptionToken;

/// <summary>
/// Creates a new instance of <see cref="ParquetSink"/>
/// </summary>
private ParquetSink(Schema parquetSchema, IBlobStorageWriter storageWriter, string parquetFilePath,
int rowGroupsPerFile, bool createSchemaFile, bool partitionByDate, string dataSinkPathSegment,
string schemaSinkPathSegment, bool dropCompletionToken, StreamMetadata streamMetadata, string metadataSinkPathSegment)
private ParquetSink(Schema parquetSchema,
IBlobStorageWriter storageWriter,
string parquetFilePath,
int rowGroupsPerFile,
bool createSchemaFile,
bool partitionByDate,
string dataSinkPathSegment,
string schemaSinkPathSegment,
bool dropCompletionToken,
StreamMetadata streamMetadata,
string metadataSinkPathSegment,
IInterruptionToken interruptionToken)
{
this.parquetSchema = parquetSchema;
this.storageWriter = storageWriter;
Expand All @@ -59,6 +71,7 @@ private ParquetSink(Schema parquetSchema, IBlobStorageWriter storageWriter, stri
this.metadataSinkPathSegment = metadataSinkPathSegment;
this.dropCompletionToken = dropCompletionToken;
this.metadata = streamMetadata;
this.interruptionToken = interruptionToken;

this.Shape = new SinkShape<List<ParquetColumn>>(this.In);
}
Expand Down Expand Up @@ -88,15 +101,24 @@ private ParquetSink(Schema parquetSchema, IBlobStorageWriter storageWriter, stri
/// <param name="dropCompletionToken">True if sink should drop a file when complete.</param>
/// <param name="streamMetadata">Metadata that describes data produced by the stream</param>
/// <param name="metadataSinkPathSegment">Folder name to emit metadata</param>
/// <param name="interruptionToken">Provides information about streaming container interrupteion</param>
/// <returns></returns>
public static ParquetSink Create(Schema parquetSchema, IBlobStorageWriter storageWriter, string parquetFilePath,
StreamMetadata streamMetadata, int rowGroupsPerFile = 1, bool createSchemaFile = false,
bool partitionByDate = false, string dataSinkPathSegment = "data", string schemaSinkPathSegment = "schema",
bool dropCompletionToken = false, string metadataSinkPathSegment = "metadata")
public static ParquetSink Create(Schema parquetSchema,
IBlobStorageWriter storageWriter,
IInterruptionToken interruptionToken,
string parquetFilePath,
StreamMetadata streamMetadata,
int rowGroupsPerFile = 1,
bool createSchemaFile = false,
bool partitionByDate = false,
string dataSinkPathSegment = "data",
string schemaSinkPathSegment = "schema",
bool dropCompletionToken = false,
string metadataSinkPathSegment = "metadata")
{
return new ParquetSink(parquetSchema, storageWriter, parquetFilePath, rowGroupsPerFile, createSchemaFile,
partitionByDate, dataSinkPathSegment, schemaSinkPathSegment, dropCompletionToken, streamMetadata,
metadataSinkPathSegment);
metadataSinkPathSegment, interruptionToken);
}

/// <inheritdoc cref="GraphStageWithMaterializedValue{TShape,TMaterialized}.CreateLogicAndMaterializedValue"/>
Expand All @@ -118,6 +140,7 @@ private sealed class SinkLogic : GraphStageLogic

private bool writeInProgress;
private readonly IMetadataWriter metadataWriter;
private readonly IInterruptionToken interruptionToken;

public SinkLogic(ParquetSink sink, TaskCompletionSource<NotUsed> taskCompletion) :
base(sink.Shape)
Expand All @@ -132,6 +155,8 @@ public SinkLogic(ParquetSink sink, TaskCompletionSource<NotUsed> taskCompletion)
_ => Directive.Stop
});
this.writeInProgress = false;
this.interruptionToken = sink.interruptionToken;


this.SetHandler(sink.In,
() => this.WriteRowGroup(this.Grab(sink.In)),
Expand Down Expand Up @@ -218,6 +243,11 @@ private Task<Option<UploadedBlob>> SavePart()

private Task<Option<UploadedBlob>> SaveCompletionToken()
{
if (this.interruptionToken.IsInterrupted)
{
this.Log.Info("Stream was interrupted, not saving completion token");
return Task.FromResult(Option<UploadedBlob>.None);
}
if (this.sink.dropCompletionToken)
// there seems to be an issue with Moq library and how it serializes BinaryData type
// in order to have consistent behaviour between units and actual runs we write byte 0 to the file
Expand Down
62 changes: 61 additions & 1 deletion test/SinkTests/ParquetSinkTests.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Util;
using Arcane.Framework.Services.Base;
using Arcane.Framework.Sinks.Models;
using Arcane.Framework.Sinks.Parquet;
using Arcane.Framework.Tests.Fixtures;
Expand Down Expand Up @@ -47,7 +51,7 @@ public async Task ParquetSinkWrites(int blocks, int rowGroupsPerBlock, bool crea
.ReturnsAsync(new UploadedBlob());

await Source.From(Enumerable.Range(0, blocks).Select(_ => columns.ToList())).RunWith(
ParquetSink.Create(schema, this.mockBlobStorageService.Object, $"tmp@{pathString}",
ParquetSink.Create(schema, this.mockBlobStorageService.Object, Mock.Of<IInterruptionToken>(), $"tmp@{pathString}",
new StreamMetadata(Option<StreamPartition[]>.None),
rowGroupsPerBlock, createSchemaFile, dropCompletionToken: dropCompletionToken),
this.akkaFixture.Materializer);
Expand Down Expand Up @@ -82,6 +86,7 @@ public async Task RemovesEmptyStreamMetadata()

var sink = ParquetSink.Create(schema,
this.mockBlobStorageService.Object,
Mock.Of<IInterruptionToken>(),
basePath,
new StreamMetadata(Option<StreamPartition[]>.None),
5,
Expand Down Expand Up @@ -124,6 +129,7 @@ public async Task OverwritesExistingSchemaMetadata()
});
var sink = ParquetSink.Create(schema,
this.mockBlobStorageService.Object,
Mock.Of<IInterruptionToken>(),
basePath,
metadata,
5,
Expand Down Expand Up @@ -155,6 +161,7 @@ public async Task HandleSchemaFailures()
var source = Source.From(Enumerable.Range(0, 10).Select(_ => columns.ToList()));

var sink = ParquetSink.Create(parquetSchema: schema,
interruptionToken: Mock.Of<IInterruptionToken>(),
storageWriter: this.mockBlobStorageService.Object,
parquetFilePath: "s3a://bucket/object",
streamMetadata: new StreamMetadata(Option<StreamPartition[]>.None),
Expand Down Expand Up @@ -192,6 +199,7 @@ public async Task ParquetSinkDoesNotDropCompletionTokenOnFail(bool dropCompletio
var sink = ParquetSink.Create(
schema,
this.mockBlobStorageService.Object,
Mock.Of<IInterruptionToken>(),
$"tmp@{pathString}",
new StreamMetadata(Option<StreamPartition[]>.None),
4,
Expand Down Expand Up @@ -229,6 +237,7 @@ public async Task ParquetSinkDoesNotDropCompletionTokenOnUpstreamFail(bool dropC
var sink = ParquetSink.Create(
schema,
this.mockBlobStorageService.Object,
Mock.Of<IInterruptionToken>(),
$"tmp@{pathString}",
new StreamMetadata(Option<StreamPartition[]>.None),
4,
Expand All @@ -248,4 +257,55 @@ public async Task ParquetSinkDoesNotDropCompletionTokenOnUpstreamFail(bool dropC
mb => mb.SaveBytesAsBlob(It.IsAny<BinaryData>(), It.Is<string>(path => path.Contains(pathString)),
It.Is<string>(fn => fn.EndsWith(".COMPLETED")), It.IsAny<bool>()), Times.Never);
}

[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task ParquetSinkDoesNotDropCompletionTokenOnBackfillCompletion(bool dropCompletionToken)
{
var columns = Enumerable.Range(0, 10)
.Select(ixCol => new DataColumn(new DataField<int?>(ixCol.ToString()), Enumerable.Range(0, 10).ToArray()))
.ToArray();

var pathString = Guid.NewGuid().ToString();
var schema = new Schema(columns.Select(c => c.Field).ToList());

var cts = new CancellationTokenSource();
var callCount = 0;
this.mockBlobStorageService.Setup(mb => mb.SaveBytesAsBlob(It.IsAny<BinaryData>(),
It.Is<string>(p => p.Contains(pathString)), It.IsAny<string>(), It.IsAny<bool>()))
.ReturnsAsync(() =>
{
if (callCount++ == 3)
{
cts.Cancel();
}
return new UploadedBlob();
});

var interruptionMock = new Mock<IInterruptionToken>();
interruptionMock.Setup(i => i.IsInterrupted).Returns(true);
var sink = ParquetSink.Create(
schema,
this.mockBlobStorageService.Object,
interruptionMock.Object,
$"tmp@{pathString}",
new StreamMetadata(Option<StreamPartition[]>.None),
4,
true,
dropCompletionToken: dropCompletionToken);

var graph = Source.Repeat(columns.ToList())
.ViaMaterialized(KillSwitches.Single<List<DataColumn>>(), Keep.Right)
.ToMaterialized(sink, Keep.Both);

var (ks, task) = graph.Run(this.akkaFixture.Materializer);
await Task.Delay(5 * 1000);
ks.Shutdown();
await task;

this.mockBlobStorageService.Verify(
mb => mb.SaveBytesAsBlob(It.IsAny<BinaryData>(), It.Is<string>(path => path.Contains(pathString)),
It.Is<string>(fn => fn.EndsWith(".COMPLETED")), It.IsAny<bool>()), Times.Never);
}
}

0 comments on commit 8ffdc74

Please sign in to comment.