Skip to content

Commit

Permalink
pass apolloContext to handle method
Browse files Browse the repository at this point in the history
  • Loading branch information
rickdotnet committed Oct 15, 2024
1 parent c1738cb commit 9cf321a
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 18 deletions.
4 changes: 2 additions & 2 deletions build/version.props
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<Project>
<PropertyGroup>
<VersionMajor>0</VersionMajor>
<VersionMinor>4</VersionMinor>
<VersionPatch>4</VersionPatch>
<VersionMinor>5</VersionMinor>
<VersionPatch>0</VersionPatch>
<VersionQuality />
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)$(VersionQuality)</VersionPrefix>
</PropertyGroup>
Expand Down
7 changes: 3 additions & 4 deletions demo/ConsoleDemo/Demo/AsbDemo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,14 @@ private static Task Handle(ApolloContext context, CancellationToken token)
{
Log.Warning($"Anonymous handler received: {count++}");
Log.Debug("Headers");
var msg = context.Message;
foreach (var header in msg.Headers)
foreach (var header in context.Headers)
{
Log.Debug("{Key}: {Value}", header.Key, header.Value);
}

Log.Debug("Payload");
if (msg.Data != null)
Log.Debug(Encoding.UTF8.GetString(msg.Data));
if (context.Data != null)
Log.Debug(Encoding.UTF8.GetString(context.Data));


// let me see some messages before they spam through
Expand Down
2 changes: 1 addition & 1 deletion demo/ConsoleDemo/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@

//await OriginalDemo.Demo();
//await Direct.Demo();
await HostDemo.Demo(useNats: true);
await HostDemo.Demo();
//await AsbDemo.Demo();
9 changes: 5 additions & 4 deletions demo/ConsoleDemo/TestEndpoint.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Apollo.Abstractions;
using Apollo;
using Apollo.Abstractions;
using Serilog;

namespace ConsoleDemo;
Expand All @@ -11,22 +12,22 @@ public record TestResponse(string Message);
public class TestEndpoint : IListenFor<TestEvent>, IHandle<TestCommand>, IReplyTo<TestRequest, TestResponse>
{
private static int count = 0;
public Task Handle(TestEvent message, CancellationToken cancellationToken = default)
public Task Handle(TestEvent message, ApolloContext context, CancellationToken cancellationToken = default)
{
count++;
Log.Information("Endpoint: {Message}, Count: {Count}", message, count);
// simulate a delay to demonstrate concurrency
return Task.Delay(500);
}

public Task Handle(TestCommand message, CancellationToken cancellationToken)
public Task Handle(TestCommand message, ApolloContext context, CancellationToken cancellationToken)
{
Log.Information("TestEndpoint Received TestCommand");
Log.Information("Message: {Message}", message);
return Task.CompletedTask;
}

public Task<TestResponse> Handle(TestRequest message, CancellationToken cancellationToken = default)
public Task<TestResponse> Handle(TestRequest message, ApolloContext context, CancellationToken cancellationToken = default)
{
Log.Information("TestEndpoint Received TestRequest");
Log.Information("Message: {Message}", message);
Expand Down
2 changes: 1 addition & 1 deletion src/Apollo/Abstractions/IHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ public interface IHandle
}
public interface IHandle<in T> : IHandle where T : ICommand
{
Task Handle(T message, CancellationToken cancellationToken);
Task Handle(T message, ApolloContext context, CancellationToken cancellationToken);
}
2 changes: 1 addition & 1 deletion src/Apollo/Abstractions/IListenFor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ public interface IListenFor
}
public interface IListenFor<in TEvent> : IListenFor where TEvent : IEvent
{
public Task Handle(TEvent message, CancellationToken cancellationToken = default);
public Task Handle(TEvent message, ApolloContext context, CancellationToken cancellationToken = default);
}
2 changes: 1 addition & 1 deletion src/Apollo/Abstractions/IReplyTo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

public interface IReplyTo<in TRequest, TResponse> where TRequest : IRequest<TResponse>
{
public Task<TResponse> Handle(TRequest message, CancellationToken cancellationToken = default);
public Task<TResponse> Handle(TRequest message, ApolloContext context, CancellationToken cancellationToken = default);
}
7 changes: 6 additions & 1 deletion src/Apollo/ApolloContext.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
using Apollo.Abstractions;
using Microsoft.Extensions.Primitives;

namespace Apollo;

public class ApolloContext
{
public ApolloMessage Message { get; }
public IReadOnlyDictionary<string, StringValues> Headers { get; }
public string Subject => Message.Subject;
public byte[]? Data => Message.Data;
internal ApolloMessage Message { get; }
public bool ReplyAvailable => ReplyFunc is not null;
private Func<byte[] , CancellationToken, Task>? ReplyFunc { get; }

public ApolloContext(ApolloMessage message, Func<byte[] , CancellationToken, Task>? replyFunc = null)
{
Message = message;
ReplyFunc = replyFunc;
Headers = Message.Headers.AsReadOnly();
}

public Task Reply(byte[] response, CancellationToken cancellationToken)
Expand Down
6 changes: 3 additions & 3 deletions src/Apollo/Internal/SynchronousEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private async Task InternalHandle(ApolloContext context, CancellationToken cance
if (handleMethod is null)
{
handleMethod = endpointType!.GetMethod("Handle",
[context.Message.MessageType!, typeof(CancellationToken)]);
[context.Message.MessageType!, typeof(ApolloContext), typeof(CancellationToken)]);

if (handleMethod is null)
throw new InvalidOperationException("Handle method not found");
Expand All @@ -130,7 +130,7 @@ private async Task InternalHandle(ApolloContext context, CancellationToken cance
throw new InvalidOperationException("Uh, oh: No reply available");

var response =
await (dynamic)handleMethod.Invoke(endpointInstance, [messageObject, cancellationToken])!;
await (dynamic)handleMethod.Invoke(endpointInstance, [messageObject, context, cancellationToken])!;

// TODO: serialization point
var responseJson = JsonSerializer.Serialize(response);
Expand All @@ -139,7 +139,7 @@ private async Task InternalHandle(ApolloContext context, CancellationToken cance
}
else
{
var result = (Task)handleMethod.Invoke(endpointInstance, [messageObject, cancellationToken])!;
var result = (Task)handleMethod.Invoke(endpointInstance, [messageObject, context, cancellationToken])!;
await result;
}
}
Expand Down

0 comments on commit 9cf321a

Please sign in to comment.