Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StreamStatusService class and IStreamGraphBuilder interface #24

Merged
merged 1 commit into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/Contracts/Annotations.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace Arcane.Framework.Contracts;

/// <summary>
/// Streaming job annotation keys that can be used by a StreamRunner and a Stream Operator
/// This class should be removed in future and replaced with Stream-class based contracts, see #23
/// </summary>
public class Annotations
{
/// <summary>
/// Annotation that signals to StreamOperator to start backfill process
/// If the streaming job was marked with this annotation, StreamOperator stops the current stream and starts
/// a backfill job.
/// </summary>
public const string STATE_ANNOTATION_KEY = "arcane.streaming.sneaksanddata.com/state";

/// <summary>
/// Annotation value signals to StreamOperator to start backfill process
/// If the streaming job was marked with this annotation, StreamOperator stops the current stream and starts
/// a backfill job.
/// </summary>
public const string SCHEMA_MISMATCH_STATE_ANNOTATION_VALUE = "schema-mismatch";
}
20 changes: 20 additions & 0 deletions src/Services/Base/IStreamGraphBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Threading.Tasks;
using Akka.Streams;
using Akka.Streams.Dsl;

namespace Arcane.Framework.Services.Base;

/// <summary>
/// Base interface for classes that build Akka.NET stream graphs.
/// This interface is intended to be implemented by the streaming plugin.
/// </summary>
/// <typeparam name="TStreamContext">Type of the StreamContext class</typeparam>
public interface IStreamGraphBuilder<in TStreamContext> where TStreamContext : IStreamContext
{
/// <summary>
/// Build an instance of the runnable Akka.NET graph.
/// </summary>
/// <param name="context">StreamContext instance.</param>
/// <returns>Runnable graph that materialized to a UniqueKillSwitch and a Task.</returns>
public IRunnableGraph<(UniqueKillSwitch, Task)> BuildGraph(TStreamContext context);
}
17 changes: 17 additions & 0 deletions src/Services/Base/IStreamStatusService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System.Threading.Tasks;

namespace Arcane.Framework.Services.Base;

/// <summary>
/// A service that allows to report stream status events to the maintainer service.
/// It can be useful to report a stream status that triggers the maintainer service to take action (schema mismatch, etc.)
/// </summary>
public interface IStreamStatusService
{
/// <summary>
/// Report that schema mismatch occured to the maintainer service.
/// </summary>
/// <param name="streamId">Id of the current stream.</param>
Task ReportSchemaMismatch(string streamId);
}

24 changes: 24 additions & 0 deletions src/Services/StreamStatusService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System.Threading.Tasks;
using Arcane.Framework.Contracts;
using Arcane.Framework.Services.Base;
using Snd.Sdk.Kubernetes.Base;

namespace Arcane.Framework.Services;

internal class StreamStatusService: IStreamStatusService
{
private readonly IKubeCluster kubernetesService;
public StreamStatusService(IKubeCluster kubernetesService)
{
this.kubernetesService = kubernetesService;
}

public Task ReportSchemaMismatch(string streamId)
{
var nameSpace = this.kubernetesService.GetCurrentNamespace();
return this.kubernetesService.AnnotateJob(streamId,
nameSpace,
Annotations.STATE_ANNOTATION_KEY,
Annotations.SCHEMA_MISMATCH_STATE_ANNOTATION_VALUE);
}
}
Loading