From 8ffdc749f044d544605a2bcd7faee8d84b5e59f6 Mon Sep 17 00:00:00 2001 From: Vitalii Savitskii Date: Fri, 25 Oct 2024 12:25:37 +0200 Subject: [PATCH] Add interruption token to ParquetSink (#139) Part of #128 Implemented: - The ParquetSink now has InterruptionToken as a parameter. It will not save completion token if the stream was interrupted. --- src/Services/Base/IInterruptionToken.cs | 12 ++++ src/Services/Base/IStreamLifetimeService.cs | 2 +- src/Services/StreamLifetimeService.cs | 3 + src/Sinks/Parquet/ParquetSink.cs | 46 ++++++++++++--- test/SinkTests/ParquetSinkTests.cs | 62 ++++++++++++++++++++- 5 files changed, 115 insertions(+), 10 deletions(-) create mode 100644 src/Services/Base/IInterruptionToken.cs diff --git a/src/Services/Base/IInterruptionToken.cs b/src/Services/Base/IInterruptionToken.cs new file mode 100644 index 0000000..00e0abc --- /dev/null +++ b/src/Services/Base/IInterruptionToken.cs @@ -0,0 +1,12 @@ +namespace Arcane.Framework.Services.Base; + +/// +/// Provides information about a stream interruption +/// +public interface IInterruptionToken +{ + /// + /// Returns true if the stream was interrupted + /// + public bool IsInterrupted { get; } +} diff --git a/src/Services/Base/IStreamLifetimeService.cs b/src/Services/Base/IStreamLifetimeService.cs index 34c0673..f457dab 100644 --- a/src/Services/Base/IStreamLifetimeService.cs +++ b/src/Services/Base/IStreamLifetimeService.cs @@ -6,7 +6,7 @@ namespace Arcane.Framework.Services.Base; /// /// Service to manage the lifetime of a stream runner /// -public interface IStreamLifetimeService: IDisposable +public interface IStreamLifetimeService: IDisposable, IInterruptionToken { /// /// Add a signal to listen for to stop the stream diff --git a/src/Services/StreamLifetimeService.cs b/src/Services/StreamLifetimeService.cs index 10a84ad..0d7369f 100644 --- a/src/Services/StreamLifetimeService.cs +++ b/src/Services/StreamLifetimeService.cs @@ -41,6 +41,9 @@ public void AddStreamTerminationSignal(PosixSignal posixSignal) /// > public bool IsStopRequested { get; private set; } + /// > + public bool IsInterrupted => this.IsStopRequested; + /// /// Stops the stream and sets the stop requested flag. /// diff --git a/src/Sinks/Parquet/ParquetSink.cs b/src/Sinks/Parquet/ParquetSink.cs index 8805828..b1caddd 100644 --- a/src/Sinks/Parquet/ParquetSink.cs +++ b/src/Sinks/Parquet/ParquetSink.cs @@ -6,12 +6,14 @@ using Akka.Actor; using Akka.Event; using Akka.Streams; +using Akka.Streams.Dsl; using Akka.Streams.Stage; using Arcane.Framework.Sinks.Extensions; using Arcane.Framework.Sinks.Models; using Arcane.Framework.Sinks.Services.Base; using Akka.Util; using Akka.Util.Extensions; +using Arcane.Framework.Services.Base; using Parquet; using Parquet.Data; using Snd.Sdk.Storage.Base; @@ -37,13 +39,23 @@ public class ParquetSink : GraphStageWithMaterializedValue /// Creates a new instance of /// - private ParquetSink(Schema parquetSchema, IBlobStorageWriter storageWriter, string parquetFilePath, - int rowGroupsPerFile, bool createSchemaFile, bool partitionByDate, string dataSinkPathSegment, - string schemaSinkPathSegment, bool dropCompletionToken, StreamMetadata streamMetadata, string metadataSinkPathSegment) + private ParquetSink(Schema parquetSchema, + IBlobStorageWriter storageWriter, + string parquetFilePath, + int rowGroupsPerFile, + bool createSchemaFile, + bool partitionByDate, + string dataSinkPathSegment, + string schemaSinkPathSegment, + bool dropCompletionToken, + StreamMetadata streamMetadata, + string metadataSinkPathSegment, + IInterruptionToken interruptionToken) { this.parquetSchema = parquetSchema; this.storageWriter = storageWriter; @@ -59,6 +71,7 @@ private ParquetSink(Schema parquetSchema, IBlobStorageWriter storageWriter, stri this.metadataSinkPathSegment = metadataSinkPathSegment; this.dropCompletionToken = dropCompletionToken; this.metadata = streamMetadata; + this.interruptionToken = interruptionToken; this.Shape = new SinkShape>(this.In); } @@ -88,15 +101,24 @@ private ParquetSink(Schema parquetSchema, IBlobStorageWriter storageWriter, stri /// True if sink should drop a file when complete. /// Metadata that describes data produced by the stream /// Folder name to emit metadata + /// Provides information about streaming container interrupteion /// - public static ParquetSink Create(Schema parquetSchema, IBlobStorageWriter storageWriter, string parquetFilePath, - StreamMetadata streamMetadata, int rowGroupsPerFile = 1, bool createSchemaFile = false, - bool partitionByDate = false, string dataSinkPathSegment = "data", string schemaSinkPathSegment = "schema", - bool dropCompletionToken = false, string metadataSinkPathSegment = "metadata") + public static ParquetSink Create(Schema parquetSchema, + IBlobStorageWriter storageWriter, + IInterruptionToken interruptionToken, + string parquetFilePath, + StreamMetadata streamMetadata, + int rowGroupsPerFile = 1, + bool createSchemaFile = false, + bool partitionByDate = false, + string dataSinkPathSegment = "data", + string schemaSinkPathSegment = "schema", + bool dropCompletionToken = false, + string metadataSinkPathSegment = "metadata") { return new ParquetSink(parquetSchema, storageWriter, parquetFilePath, rowGroupsPerFile, createSchemaFile, partitionByDate, dataSinkPathSegment, schemaSinkPathSegment, dropCompletionToken, streamMetadata, - metadataSinkPathSegment); + metadataSinkPathSegment, interruptionToken); } /// @@ -118,6 +140,7 @@ private sealed class SinkLogic : GraphStageLogic private bool writeInProgress; private readonly IMetadataWriter metadataWriter; + private readonly IInterruptionToken interruptionToken; public SinkLogic(ParquetSink sink, TaskCompletionSource taskCompletion) : base(sink.Shape) @@ -132,6 +155,8 @@ public SinkLogic(ParquetSink sink, TaskCompletionSource taskCompletion) _ => Directive.Stop }); this.writeInProgress = false; + this.interruptionToken = sink.interruptionToken; + this.SetHandler(sink.In, () => this.WriteRowGroup(this.Grab(sink.In)), @@ -218,6 +243,11 @@ private Task> SavePart() private Task> SaveCompletionToken() { + if (this.interruptionToken.IsInterrupted) + { + this.Log.Info("Stream was interrupted, not saving completion token"); + return Task.FromResult(Option.None); + } if (this.sink.dropCompletionToken) // there seems to be an issue with Moq library and how it serializes BinaryData type // in order to have consistent behaviour between units and actual runs we write byte 0 to the file diff --git a/test/SinkTests/ParquetSinkTests.cs b/test/SinkTests/ParquetSinkTests.cs index 5a0e658..4541fd9 100644 --- a/test/SinkTests/ParquetSinkTests.cs +++ b/test/SinkTests/ParquetSinkTests.cs @@ -1,8 +1,12 @@ using System; +using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; +using Akka.Streams; using Akka.Streams.Dsl; using Akka.Util; +using Arcane.Framework.Services.Base; using Arcane.Framework.Sinks.Models; using Arcane.Framework.Sinks.Parquet; using Arcane.Framework.Tests.Fixtures; @@ -47,7 +51,7 @@ public async Task ParquetSinkWrites(int blocks, int rowGroupsPerBlock, bool crea .ReturnsAsync(new UploadedBlob()); await Source.From(Enumerable.Range(0, blocks).Select(_ => columns.ToList())).RunWith( - ParquetSink.Create(schema, this.mockBlobStorageService.Object, $"tmp@{pathString}", + ParquetSink.Create(schema, this.mockBlobStorageService.Object, Mock.Of(), $"tmp@{pathString}", new StreamMetadata(Option.None), rowGroupsPerBlock, createSchemaFile, dropCompletionToken: dropCompletionToken), this.akkaFixture.Materializer); @@ -82,6 +86,7 @@ public async Task RemovesEmptyStreamMetadata() var sink = ParquetSink.Create(schema, this.mockBlobStorageService.Object, + Mock.Of(), basePath, new StreamMetadata(Option.None), 5, @@ -124,6 +129,7 @@ public async Task OverwritesExistingSchemaMetadata() }); var sink = ParquetSink.Create(schema, this.mockBlobStorageService.Object, + Mock.Of(), basePath, metadata, 5, @@ -155,6 +161,7 @@ public async Task HandleSchemaFailures() var source = Source.From(Enumerable.Range(0, 10).Select(_ => columns.ToList())); var sink = ParquetSink.Create(parquetSchema: schema, + interruptionToken: Mock.Of(), storageWriter: this.mockBlobStorageService.Object, parquetFilePath: "s3a://bucket/object", streamMetadata: new StreamMetadata(Option.None), @@ -192,6 +199,7 @@ public async Task ParquetSinkDoesNotDropCompletionTokenOnFail(bool dropCompletio var sink = ParquetSink.Create( schema, this.mockBlobStorageService.Object, + Mock.Of(), $"tmp@{pathString}", new StreamMetadata(Option.None), 4, @@ -229,6 +237,7 @@ public async Task ParquetSinkDoesNotDropCompletionTokenOnUpstreamFail(bool dropC var sink = ParquetSink.Create( schema, this.mockBlobStorageService.Object, + Mock.Of(), $"tmp@{pathString}", new StreamMetadata(Option.None), 4, @@ -248,4 +257,55 @@ public async Task ParquetSinkDoesNotDropCompletionTokenOnUpstreamFail(bool dropC mb => mb.SaveBytesAsBlob(It.IsAny(), It.Is(path => path.Contains(pathString)), It.Is(fn => fn.EndsWith(".COMPLETED")), It.IsAny()), Times.Never); } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task ParquetSinkDoesNotDropCompletionTokenOnBackfillCompletion(bool dropCompletionToken) + { + var columns = Enumerable.Range(0, 10) + .Select(ixCol => new DataColumn(new DataField(ixCol.ToString()), Enumerable.Range(0, 10).ToArray())) + .ToArray(); + + var pathString = Guid.NewGuid().ToString(); + var schema = new Schema(columns.Select(c => c.Field).ToList()); + + var cts = new CancellationTokenSource(); + var callCount = 0; + this.mockBlobStorageService.Setup(mb => mb.SaveBytesAsBlob(It.IsAny(), + It.Is(p => p.Contains(pathString)), It.IsAny(), It.IsAny())) + .ReturnsAsync(() => + { + if (callCount++ == 3) + { + cts.Cancel(); + } + return new UploadedBlob(); + }); + + var interruptionMock = new Mock(); + interruptionMock.Setup(i => i.IsInterrupted).Returns(true); + var sink = ParquetSink.Create( + schema, + this.mockBlobStorageService.Object, + interruptionMock.Object, + $"tmp@{pathString}", + new StreamMetadata(Option.None), + 4, + true, + dropCompletionToken: dropCompletionToken); + + var graph = Source.Repeat(columns.ToList()) + .ViaMaterialized(KillSwitches.Single>(), Keep.Right) + .ToMaterialized(sink, Keep.Both); + + var (ks, task) = graph.Run(this.akkaFixture.Materializer); + await Task.Delay(5 * 1000); + ks.Shutdown(); + await task; + + this.mockBlobStorageService.Verify( + mb => mb.SaveBytesAsBlob(It.IsAny(), It.Is(path => path.Contains(pathString)), + It.Is(fn => fn.EndsWith(".COMPLETED")), It.IsAny()), Times.Never); + } }