Skip to content

Commit

Permalink
rework subjects
Browse files Browse the repository at this point in the history
  • Loading branch information
rickdotnet committed Sep 14, 2024
1 parent 5f18e94 commit b76e9ec
Show file tree
Hide file tree
Showing 15 changed files with 254 additions and 150 deletions.
2 changes: 1 addition & 1 deletion demo/ConsoleDemo/Demo/AsbDemo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public static async ValueTask Demo()
{
var anonConfig = new EndpointConfig
{
EndpointSubject = "topic-test", // topic name
Subject = "topic-test", // topic name
ConsumerName = "Sandbox.Test", // subscription name
EndpointName = "Topic Test", // display only when subject is sent
};
Expand Down
46 changes: 23 additions & 23 deletions demo/ConsoleDemo/Demo/HostDemo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,43 @@ public static class HostDemo
public static async Task Demo(bool useNats = false)
{
var endpointConfig = new EndpointConfig { ConsumerName = "endpoint", EndpointName = "Demo" };
var anonConfig = new EndpointConfig { ConsumerName = "anon", EndpointSubject = "demo.testevent" };
var anonConfig = new EndpointConfig { ConsumerName = "anon", Subject = "demo" };

int count = 1; // thread-safe when in sync mode
var builder = Host.CreateApplicationBuilder();
builder.Services
.AddApollo(
apolloBuilder =>
{
apolloBuilder
.AddEndpoint<TestEndpoint>(endpointConfig)
.AddHandler(anonConfig, (_, _) =>
builder.Services.AddApollo(
ab =>
{
ab
.AddEndpoint<TestEndpoint>(endpointConfig)
.AddHandler(anonConfig, (_, _) =>
{
Console.WriteLine($"Anonymous handler received: {count++}");
return Task.CompletedTask;
});
}
);
if (useNats)
{
apolloBuilder.AddNatsProvider(
opts => opts with
if (useNats)
{
ab.AddNatsProvider(
opts => opts with
{
Url = "nats://localhost:4222",
AuthOpts = new NatsAuthOpts
{
Url = "nats://localhost:4222",
AuthOpts = new NatsAuthOpts
{
Username = "apollo",
Password = "demo"
}
Username = "apollo",
Password = "demo"
}
);
}
}
);
}
);
}
);

var host = builder.Build();
var hostTask = host.RunAsync();

await Task.Delay(8000);
await Task.Delay(3000);
using var scope = host.Services.CreateScope();
var serviceProvider = scope.ServiceProvider;
var apollo = serviceProvider.GetRequiredService<ApolloClient>();
Expand Down
2 changes: 1 addition & 1 deletion demo/ConsoleDemo/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@

//await OriginalDemo.Demo();
//await Direct.Demo();
await HostDemo.Demo(true);
await HostDemo.Demo(useNats: true);
//await AsbDemo.Demo();
25 changes: 17 additions & 8 deletions src/Apollo.Providers.NATS/NatsCoreSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ internal class NatsCoreSubscription : ISubscription
private readonly Func<ApolloContext, CancellationToken, Task> handler;
private readonly string endpointSubject;
private readonly Dictionary<string, Type> subjectTypeMapping;
private readonly DefaultSubjectTypeMapper subjectTypeMapper;

public NatsCoreSubscription(
INatsConnection connection,
Expand All @@ -26,24 +27,28 @@ Func<ApolloContext, CancellationToken, Task> handler
this.config = config;
this.handler = handler;

endpointSubject = Utils.GetSubject(config);

var trimmedSubject = endpointSubject.TrimWildEnds();
subjectTypeMapping = config.MessageTypes.ToDictionary(x => $"{trimmedSubject}.{x.Name.ToLower()}", x => x);
subjectTypeMapper = DefaultSubjectTypeMapper.From(config);
endpointSubject = subjectTypeMapper.EndpointSubject;
subjectTypeMapping = subjectTypeMapper.SubjectTypeMapping;
}

public async Task Subscribe(CancellationToken cancellationToken)
{
try
{
logger.LogInformation("Subscribing to {Subject}", endpointSubject);
logger.LogInformation("Subscribing to {Endpoint} - {Subject}", config.EndpointName, endpointSubject);
await foreach (var msg in connection.SubscribeAsync<byte[]>(endpointSubject)
.WithCancellation(cancellationToken))
{
var handlerOnly = config.EndpointType == null;
try
{
if (handlerOnly || subjectTypeMapping.ContainsKey(msg.Subject))
// type mapping is for endpoint types only
var subjectMapping = "";
if (msg.Headers != null && msg.Headers.TryGetValue(ApolloHeader.MessageType, out var apolloType))
subjectMapping = apolloType.First() ?? "";

if (handlerOnly || subjectTypeMapping.ContainsKey(subjectMapping))
await ProcessMessage(msg);
else
logger.LogWarning(
Expand Down Expand Up @@ -78,8 +83,12 @@ Task ProcessMessage(NatsMsg<byte[]> natsMsg)
Data = natsMsg.Data,
};

subjectTypeMapping.TryGetValue(message.Subject, out var messageType);
message.MessageType = messageType; // ?? typeof(byte[]);
if (message.Headers.TryGetValue(ApolloHeader.MessageType, out var headerType)
&& headerType.Count > 0)
{
message.MessageType =
subjectTypeMapper.TypeFromApolloMessageType(headerType.First()!); // ?? typeof(byte[]);
}

var replyFunc = natsMsg.ReplyTo != null
? new Func<byte[], CancellationToken, Task>(
Expand Down
25 changes: 17 additions & 8 deletions src/Apollo.Providers.NATS/NatsJetStreamSubscription.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Apollo.Abstractions;
using Apollo.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
Expand All @@ -13,6 +14,7 @@ internal class NatsJetStreamSubscription : ISubscription
private readonly ILogger<NatsJetStreamSubscription> logger;
private readonly SubscriptionConfig config;
private readonly Func<ApolloContext, CancellationToken, Task> handler;
private readonly DefaultSubjectTypeMapper subjectTypeMapper;
private readonly string endpointSubject;
private readonly Dictionary<string, Type> subjectTypeMapping;

Expand All @@ -28,10 +30,9 @@ Func<ApolloContext, CancellationToken, Task> handler
this.config = config;
this.handler = handler;

endpointSubject = Utils.GetSubject(config);

var trimmedSubject = endpointSubject.TrimWildEnds();
subjectTypeMapping = config.MessageTypes.ToDictionary(x => $"{trimmedSubject}.{x.Name.ToLower()}", x => x);
subjectTypeMapper = DefaultSubjectTypeMapper.From(config);
endpointSubject = subjectTypeMapper.EndpointSubject;
subjectTypeMapping = subjectTypeMapper.SubjectTypeMapping;
}

public async Task Subscribe(CancellationToken cancellationToken)
Expand Down Expand Up @@ -66,7 +67,11 @@ await js.CreateStreamAsync(
var handlerOnly = config.EndpointType == null;
try
{
if (handlerOnly || subjectTypeMapping.ContainsKey(msg.Subject))
var subjectMapping = "";
if (msg.Headers != null && msg.Headers.TryGetValue(ApolloHeader.MessageType, out var apolloType))
subjectMapping = apolloType.First() ?? "";

if (handlerOnly || subjectTypeMapping.ContainsKey(subjectMapping))
{
await ProcessMessage(msg);
await msg.AckAsync(cancellationToken: cancellationToken);
Expand All @@ -75,7 +80,7 @@ await js.CreateStreamAsync(
{
logger.LogWarning(
"No handler found for {Subject} in endpoint ({Endpoint})",
msg.Subject,
subjectMapping,
config.EndpointName);

await msg.AckTerminateAsync(cancellationToken: cancellationToken);
Expand Down Expand Up @@ -107,8 +112,12 @@ Task ProcessMessage(NatsJSMsg<byte[]> natsMsg)
Headers = natsMsg.Headers ?? new NatsHeaders(),
Data = natsMsg.Data,
};

subjectTypeMapping.TryGetValue(message.Subject, out var messageType);

var subjectMapping = "";
if (natsMsg.Headers != null && natsMsg.Headers.TryGetValue(ApolloHeader.MessageType, out var apolloType))
subjectMapping = apolloType.First() ?? "";

subjectTypeMapping.TryGetValue(subjectMapping, out var messageType);
message.MessageType = messageType ?? typeof(byte[]);

var replyFunc = natsMsg.ReplyTo != null
Expand Down
32 changes: 21 additions & 11 deletions src/Apollo.Providers.NATS/NatsPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Apollo.Abstractions;
using Apollo.Configuration;
using Microsoft.Extensions.Primitives;
using NATS.Client.Core;

namespace Apollo.Providers.NATS;
Expand All @@ -12,22 +13,31 @@ public NatsPublisher(INatsConnection connection)
{
this.connection = connection;
}

public Task Publish(PublishConfig publishConfig, ApolloMessage message, CancellationToken cancellationToken)
{
var subject = Utils.GetSubject(publishConfig).TrimEnd('>').TrimEnd('*').TrimEnd('.');
if(message.MessageType != null)
subject = $"{subject}.{message.MessageType.Name.ToLower()}";

return connection.PublishAsync($"{subject}", message.Data, cancellationToken: cancellationToken).AsTask();
var subject = DefaultSubjectTypeMapper.From(publishConfig).EndpointSubject;

return connection.PublishAsync(
$"{subject}",
message.Data,
headers: new NatsHeaders((Dictionary<string, StringValues>)message.Headers),
cancellationToken: cancellationToken).AsTask();
}

public async Task<byte[]> Request(PublishConfig publishConfig, ApolloMessage message, CancellationToken cancellationToken)
public async Task<byte[]> Request(
PublishConfig publishConfig,
ApolloMessage message,
CancellationToken cancellationToken)
{
var subject = Utils.GetSubject(publishConfig).TrimEnd('>').TrimEnd('*').TrimEnd('.');
if(message.MessageType != null)
subject = $"{subject}.{message.MessageType.Name.ToLower()}";

var response = await connection.RequestAsync<byte[], byte[]>($"{subject}", message.Data, cancellationToken: cancellationToken).AsTask();
var subject = DefaultSubjectTypeMapper.From(publishConfig).EndpointSubject;

var response = await connection
.RequestAsync<byte[], byte[]>(
$"{subject}",
message.Data,
headers: new NatsHeaders((Dictionary<string, StringValues>)message.Headers),
cancellationToken: cancellationToken).AsTask();
return response.Data!;
}
}
2 changes: 1 addition & 1 deletion src/Apollo/Abstractions/ApolloMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Apollo.Abstractions;

public record ApolloMessage
public sealed record ApolloMessage
{
public string Subject { get; set; } = string.Empty;
public IDictionary<string, StringValues> Headers { get; set; } = new Dictionary<string, StringValues>();
Expand Down
10 changes: 10 additions & 0 deletions src/Apollo/ApolloHeader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace Apollo;

public static class ApolloHeader
{
public const string MessageType = "apollo-message-type";
public const string MessageClrType = "apollo-message-clr-type";
public const string MessageAction = "apollo-message-action";
public const string ResponseType = "apollo-response-type";
public const string ResponseClrType = "apollo-response-clr-type";
}
45 changes: 38 additions & 7 deletions src/Apollo/Configuration/EndpointConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,44 @@ namespace Apollo.Configuration;

public record EndpointConfig
{
public string? InstanceId { get; internal set; } // set by apollo
public string? ConsumerName { get; set;} // set to ApolloConfig.DefaultConsumerName if not provided
public string? Namespace { get; set; } // set to ApolloConfig.DefaultNamespace if not provided; always prefixes the subject if provided
internal Type? EndpointType { get; set; } // set by ApolloClient
public string? EndpointName { get; init; } // endpoint name or subject must be provided
public string? EndpointSubject { get; set; } // endpoint name or subject must be provided
/// <summary>
/// Defaults to ApolloConfig.InstanceId
/// </summary>
public string? InstanceId { get; internal set; }

/// <summary>
/// Consumer name is required for durable subscriptions and load balancing
/// </summary>
public string? ConsumerName { get; set;}

/// <summary>
/// Optional namespace for isolation. Defaults to ApolloConfig.DefaultNamespace
/// </summary>
public string? Namespace { get; set; }

/// <summary>
/// Set by ApolloClient
/// </summary>
internal Type? EndpointType { get; set; }

/// <summary>
/// Used to create a unique endpoint name if no subject is provided
/// </summary>
public string? EndpointName { get; init; }

/// <summary>
/// The subject to use for the endpoint. If not provided, the endpoint name will be slugified. If neither are provided, the namespace will be used.
/// </summary>
public string? Subject { get; set; } // endpoint name or subject must be provided

/// <summary>
/// Indicates to the subscription provider that the endpoint should be created as a durable subscription
/// </summary>
public bool IsDurable { get; set;}

/// <summary>
/// Provides create/update/delete permissions for resources
/// </summary>
public bool CreateMissingResources { get; set; }

/// <summary>
Expand All @@ -37,7 +68,7 @@ public static PublishConfig ToPublishConfig(this EndpointConfig config)
{
Namespace = config.Namespace,
EndpointName = config.EndpointName,
EndpointSubject = config.EndpointSubject
EndpointSubject = config.Subject
};
}
}
2 changes: 1 addition & 1 deletion src/Apollo/Configuration/SubscriptionConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public static SubscriptionConfig ForEndpoint(EndpointConfig endpointConfig, Type
ConsumerName = endpointConfig.ConsumerName ?? "", // TODO: revist this
Namespace = endpointConfig.Namespace,
EndpointName = endpointConfig.EndpointName,
EndpointSubject = endpointConfig.EndpointSubject,
EndpointSubject = endpointConfig.Subject,
IsDurable = endpointConfig.IsDurable,
CreateMissingResources = endpointConfig.CreateMissingResources,
EndpointType = endpointType,
Expand Down
Loading

0 comments on commit b76e9ec

Please sign in to comment.