Skip to content

Commit

Permalink
Add blob storage source (#75)
Browse files Browse the repository at this point in the history
* Add blob storage source

* Fic docstring

* Fix

* Add unit tests

* Bump Snd.Sdk version

* Update src/Sources/BlobStorage/BlobStorageSource.cs

Co-authored-by: Adelina Mahu <[email protected]>

---------

Co-authored-by: Adelina Mahu <[email protected]>
  • Loading branch information
s-vitaliy and adelinag08 authored Jul 5, 2024
1 parent dffc1b0 commit 9797622
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/Arcane.Framework.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="SnD.Sdk" Version="1.1.6" />
<PackageReference Include="SnD.Sdk" Version="1.1.13" />
<PackageReference Include="Microsoft.Data.SqlClient" Version="5.1.5"/>
<PackageReference Include="Parquet.Net" Version="3.9.1"/>
<PackageReference Include="Microsoft.OpenApi" Version="1.6.6"/>
Expand Down
156 changes: 156 additions & 0 deletions src/Sources/BlobStorage/BlobStorageSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using Akka.Actor;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Streams.Stage;
using Arcane.Framework.Contracts;
using Arcane.Framework.Sources.Base;
using Snd.Sdk.Storage.Base;

namespace Arcane.Framework.Sources.BlobStorage;

/// <summary>
/// Akka source that emits list of blobs in a blob container with a given prefix.
/// The source enumerates a cloud blob storage content (e.g. S3 or Azure Blob Container) and emits list of objects
/// that exist in the storage.
/// </summary>
public class BlobStorageSource : GraphStage<SourceShape<string>>, ITaggedSource
{
private readonly string prefix;
private readonly IBlobStorageListService blobStorageService;
private readonly TimeSpan changeCaptureInterval;
private readonly string blobContainer;

private BlobStorageSource(string blobContainer, string prefix, IBlobStorageListService blobStorageService,
TimeSpan changeCaptureInterval)
{
this.prefix = prefix;
this.blobStorageService = blobStorageService;
this.changeCaptureInterval = changeCaptureInterval;
this.blobContainer = blobContainer;
this.Shape = new SourceShape<string>(this.Out);
}

/// <inheritdoc cref="GraphStageWithMaterializedValue{TShape,TMaterialized}.InitialAttributes"/>
protected override Attributes InitialAttributes { get; } = Attributes.CreateName(nameof(BlobStorageSource));

/// <summary>
/// Source outlet
/// </summary>
public Outlet<string> Out { get; } = new($"{nameof(BlobStorageSource)}.Out");

/// <inheritdoc cref="GraphStageWithMaterializedValue{TShape,TMaterialized}.Shape"/>
public override SourceShape<string> Shape { get; }

/// <inheritdoc cref="GraphStage{TShape}.CreateLogic"/>
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
{
return new SourceLogic(this);
}

/// <inheritdoc cref="ITaggedSource.GetDefaultTags"/>
public SourceTags GetDefaultTags()
{
return new SourceTags
{
SourceEntity = this.blobContainer,
SourceLocation = this.prefix
};
}

/// <summary>
/// Creates a <see cref="Source"/> for a cloud blob storage container.
/// </summary>
/// <param name="blobContainer">Container name (Blob storage container, S3 bucket etc...)</param>
/// <param name="prefix">Filter objects by prefix</param>
/// <param name="blobStorageService">Blob storage service instance</param>
/// <param name="changeCaptureInterval">How often check for storage updates</param>
/// <returns>BlobStorageSource instance</returns>
[ExcludeFromCodeCoverage(Justification = "Factory method")]
public static BlobStorageSource Create(
string blobContainer,
string prefix,
IBlobStorageService blobStorageService,
TimeSpan changeCaptureInterval)
{
return new BlobStorageSource(blobContainer, prefix, blobStorageService, changeCaptureInterval);
}

private class SourceLogic : TimerGraphStageLogic
{
private const string TimerKey = nameof(SourceLogic);

private readonly string prefix;
private readonly IBlobStorageListService blobStorageService;
private readonly TimeSpan changeCaptureInterval;
private readonly LocalOnlyDecider decider;
private readonly BlobStorageSource source;

private IEnumerable<string> blobs;

public SourceLogic(BlobStorageSource source) : base(source.Shape)
{
this.source = source;
this.prefix = source.prefix;
this.blobStorageService = source.blobStorageService;
this.changeCaptureInterval = source.changeCaptureInterval;
this.blobs = Enumerable.Empty<string>();
this.decider = Decider.From((ex) => ex.GetType().Name switch
{
nameof(TimeoutException) => Directive.Restart,
_ => Directive.Stop
});

this.SetHandler(source.Out, this.OnPull);
}

public override void PreStart()
{
this.GetBlobs();
}

protected override void OnTimer(object timerKey)
{
try
{
this.GetBlobs();
this.OnPull();
}
catch (Exception ex)
{
this.DecideOnFailure(ex);
}
}

private void DecideOnFailure(Exception ex)
{
switch (this.decider.Decide(ex))
{
case Directive.Stop:
this.FailStage(ex);
break;
default:
this.ScheduleOnce(TimerKey, this.changeCaptureInterval);
break;
}
}

private void OnPull()
{
if (this.blobs.Any())
{
this.EmitMultiple(this.source.Out, this.blobs);
this.blobs = Enumerable.Empty<string>();
}
this.ScheduleOnce(TimerKey, this.changeCaptureInterval);
}

private void GetBlobs()
{
this.blobs = this.blobStorageService.ListBlobsAsEnumerable(this.prefix).Select(s => s.Name).ToList();
}
}
}
91 changes: 91 additions & 0 deletions test/Sources/BlobStorageSourceTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Streams;
using Akka.Streams.Dsl;
using Arcane.Framework.Sources.BlobStorage;
using Moq;
using Snd.Sdk.Storage.Base;
using Snd.Sdk.Storage.Models;
using Xunit;

namespace Arcane.Framework.Tests.Sources;

public class BlobStorageSourceTests: IDisposable
{
private readonly ActorSystem actorSystem = ActorSystem.Create(nameof(BlobStorageSourceTests));
private readonly Mock<IBlobStorageService> mockBlobStorageService = new();
private readonly CancellationTokenSource cts = new();

[Fact]
public async Task TestCanStreamBlobStorageObjectNames()
{
// Arrange
this.mockBlobStorageService
.Setup(s => s.ListBlobsAsEnumerable(It.IsAny<string>()))
.Returns(new[] { new StoredBlob { Name = "key/value/item.csv", LastModified = DateTimeOffset.UtcNow } });
var blobStorageSource = BlobStorageSource.Create("container",
"",
this.mockBlobStorageService.Object,
TimeSpan.FromMinutes(1));
var source = Source.FromGraph(blobStorageSource);

// Act
var result = await source
.TakeWithin(TimeSpan.FromSeconds(5))
.RunAggregate(0, (i, _) => ++i, this.actorSystem.Materializer());

// Assert
Assert.Equal(1, result);
}

[Fact]
public async Task TestDoesNotStopOnEmpty()
{
// Arrange
var token = this.cts.Token;
var callCount = 0;
this.mockBlobStorageService
.Setup(s => s.ListBlobsAsEnumerable(It.IsAny<string>()))
.Returns(() =>
{
if (callCount == 0)
{
callCount++;
return new[] { new StoredBlob { Name = "key/value/item.csv", LastModified = DateTimeOffset.UtcNow } };
}
if (callCount > 3)
{
this.cts.Cancel();
}
callCount++;
return Array.Empty<StoredBlob>();
});
var blobStorageSource = BlobStorageSource.Create("container",
"",
this.mockBlobStorageService.Object,
TimeSpan.FromSeconds(1));
var source = Source.FromGraph(blobStorageSource);

// Act
var result = await source
.TakeWithin(TimeSpan.FromMinutes(1))
.Via(token.AsFlow<string>(cancelGracefully: true))
.RunAggregate(0, (i, _) => ++i, this.actorSystem.Materializer());

// Assert
Assert.Equal(1, result);
Assert.True(token.IsCancellationRequested);
this.mockBlobStorageService
.Verify(s => s.ListBlobsAsEnumerable(It.IsAny<string>()), Times.AtLeast(4));
}

public void Dispose()
{
this.actorSystem?.Dispose();
this.cts?.Dispose();
}
}

0 comments on commit 9797622

Please sign in to comment.