From 1d87dd0e407dc8586206931d67375e94749f0827 Mon Sep 17 00:00:00 2001 From: Vitalii Savitskii Date: Thu, 25 Apr 2024 15:12:05 +0200 Subject: [PATCH] Add StreamStatusService class and IStreamGraphBuilder interface --- src/Contracts/Annotations.cs | 22 +++++++++++++++++++++ src/Services/Base/IStreamGraphBuilder.cs | 20 +++++++++++++++++++ src/Services/Base/IStreamStatusService.cs | 17 ++++++++++++++++ src/Services/StreamStatusService.cs | 24 +++++++++++++++++++++++ 4 files changed, 83 insertions(+) create mode 100644 src/Contracts/Annotations.cs create mode 100644 src/Services/Base/IStreamGraphBuilder.cs create mode 100644 src/Services/Base/IStreamStatusService.cs create mode 100644 src/Services/StreamStatusService.cs diff --git a/src/Contracts/Annotations.cs b/src/Contracts/Annotations.cs new file mode 100644 index 0000000..c14a2b0 --- /dev/null +++ b/src/Contracts/Annotations.cs @@ -0,0 +1,22 @@ +namespace Arcane.Framework.Contracts; + +/// +/// Streaming job annotation keys that can be used by a StreamRunner and a Stream Operator +/// This class should be removed in future and replaced with Stream-class based contracts, see #23 +/// +public class Annotations +{ + /// + /// Annotation that signals to StreamOperator to start backfill process + /// If the streaming job was marked with this annotation, StreamOperator stops the current stream and starts + /// a backfill job. + /// + public const string STATE_ANNOTATION_KEY = "arcane.streaming.sneaksanddata.com/state"; + + /// + /// Annotation value signals to StreamOperator to start backfill process + /// If the streaming job was marked with this annotation, StreamOperator stops the current stream and starts + /// a backfill job. + /// + public const string SCHEMA_MISMATCH_STATE_ANNOTATION_VALUE = "schema-mismatch"; +} diff --git a/src/Services/Base/IStreamGraphBuilder.cs b/src/Services/Base/IStreamGraphBuilder.cs new file mode 100644 index 0000000..2f25c34 --- /dev/null +++ b/src/Services/Base/IStreamGraphBuilder.cs @@ -0,0 +1,20 @@ +using System.Threading.Tasks; +using Akka.Streams; +using Akka.Streams.Dsl; + +namespace Arcane.Framework.Services.Base; + +/// +/// Base interface for classes that build Akka.NET stream graphs. +/// This interface is intended to be implemented by the streaming plugin. +/// +/// Type of the StreamContext class +public interface IStreamGraphBuilder where TStreamContext : IStreamContext +{ + /// + /// Build an instance of the runnable Akka.NET graph. + /// + /// StreamContext instance. + /// Runnable graph that materialized to a UniqueKillSwitch and a Task. + public IRunnableGraph<(UniqueKillSwitch, Task)> BuildGraph(TStreamContext context); +} diff --git a/src/Services/Base/IStreamStatusService.cs b/src/Services/Base/IStreamStatusService.cs new file mode 100644 index 0000000..50cb5ff --- /dev/null +++ b/src/Services/Base/IStreamStatusService.cs @@ -0,0 +1,17 @@ +using System.Threading.Tasks; + +namespace Arcane.Framework.Services.Base; + +/// +/// A service that allows to report stream status events to the maintainer service. +/// It can be useful to report a stream status that triggers the maintainer service to take action (schema mismatch, etc.) +/// +public interface IStreamStatusService +{ + /// + /// Report that schema mismatch occured to the maintainer service. + /// + /// Id of the current stream. + Task ReportSchemaMismatch(string streamId); +} + diff --git a/src/Services/StreamStatusService.cs b/src/Services/StreamStatusService.cs new file mode 100644 index 0000000..a244784 --- /dev/null +++ b/src/Services/StreamStatusService.cs @@ -0,0 +1,24 @@ +using System.Threading.Tasks; +using Arcane.Framework.Contracts; +using Arcane.Framework.Services.Base; +using Snd.Sdk.Kubernetes.Base; + +namespace Arcane.Framework.Services; + +internal class StreamStatusService: IStreamStatusService +{ + private readonly IKubeCluster kubernetesService; + public StreamStatusService(IKubeCluster kubernetesService) + { + this.kubernetesService = kubernetesService; + } + + public Task ReportSchemaMismatch(string streamId) + { + var nameSpace = this.kubernetesService.GetCurrentNamespace(); + return this.kubernetesService.AnnotateJob(streamId, + nameSpace, + Annotations.STATE_ANNOTATION_KEY, + Annotations.SCHEMA_MISMATCH_STATE_ANNOTATION_VALUE); + } +}