Skip to content

Commit

Permalink
Adding facade to enable session processing (#1396)
Browse files Browse the repository at this point in the history
* Adding facade to enable session processing

* fixes
  • Loading branch information
demorgi authored Sep 12, 2023
1 parent 778d8f1 commit adbe7bf
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 13 deletions.
42 changes: 30 additions & 12 deletions src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal sealed class AzureServiceBusConsumerClient : IConsumerClient

private ServiceBusAdministrationClient? _administrationClient;
private ServiceBusClient? _serviceBusClient;
private ServiceBusProcessor? _serviceBusProcessor;
private ServiceBusProcessorFacade? _serviceBusProcessor;

public AzureServiceBusConsumerClient(
ILogger logger,
Expand Down Expand Up @@ -100,6 +100,15 @@ public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
ConnectAsync().GetAwaiter().GetResult();

if (_serviceBusProcessor!.IsSessionProcessor)
{
_serviceBusProcessor!.ProcessSessionMessageAsync += _serviceBusProcessor_ProcessSessionMessageAsync;
}
else
{
_serviceBusProcessor!.ProcessMessageAsync += _serviceBusProcessor_ProcessMessageAsync;
}

_serviceBusProcessor!.ProcessMessageAsync += _serviceBusProcessor_ProcessMessageAsync;
_serviceBusProcessor.ProcessErrorAsync += _serviceBusProcessor_ProcessErrorAsync;

Expand All @@ -110,8 +119,7 @@ public void Commit(object? sender)
{
var commitInput = (AzureServiceBusConsumerCommitInput)sender!;
if (_serviceBusProcessor?.AutoCompleteMessages ?? false)
commitInput.ProcessMessageArgs.CompleteMessageAsync(commitInput.ProcessMessageArgs.Message).GetAwaiter()
.GetResult();
commitInput.CompleteMessageAsync().GetAwaiter().GetResult();
}

public void Reject(object? sender)
Expand Down Expand Up @@ -150,6 +158,13 @@ private async Task _serviceBusProcessor_ProcessMessageAsync(ProcessMessageEventA
await OnMessageCallback!(context, new AzureServiceBusConsumerCommitInput(arg));
}

private async Task _serviceBusProcessor_ProcessSessionMessageAsync(ProcessSessionMessageEventArgs arg)
{
var context = ConvertMessage(arg.Message);

await OnMessageCallback!(context, new AzureServiceBusConsumerCommitInput(arg));
}

public async Task ConnectAsync()
{
if (_serviceBusProcessor != null) return;
Expand Down Expand Up @@ -200,15 +215,18 @@ public async Task ConnectAsync()
}
}

_serviceBusProcessor = _serviceBusClient.CreateProcessor(_asbOptions.TopicPath, _subscriptionName,
_asbOptions.EnableSessions
? new ServiceBusProcessorOptions
{
AutoCompleteMessages = _asbOptions.AutoCompleteMessages,
MaxConcurrentCalls = _asbOptions.MaxConcurrentCalls,
MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(30)
}
: null);
_serviceBusProcessor = !_asbOptions.EnableSessions
? new ServiceBusProcessorFacade(
_serviceBusClient.CreateProcessor(_asbOptions.TopicPath, _subscriptionName))
: new ServiceBusProcessorFacade(
serviceBusSessionProcessor: _serviceBusClient.CreateSessionProcessor(_asbOptions.TopicPath,
_subscriptionName,
new ServiceBusSessionProcessorOptions
{
AutoCompleteMessages = _asbOptions.AutoCompleteMessages,
MaxConcurrentCallsPerSession = _asbOptions.MaxConcurrentCalls,
MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(30),
}));
}
}
finally
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;

namespace DotNetCore.CAP.AzureServiceBus;
Expand All @@ -12,5 +13,20 @@ public AzureServiceBusConsumerCommitInput(ProcessMessageEventArgs processMessage
ProcessMessageArgs = processMessageEventArgs;
}

public ProcessMessageEventArgs ProcessMessageArgs { get; set; }
public AzureServiceBusConsumerCommitInput(ProcessSessionMessageEventArgs processSessionMessageArgs)
{
ProcessSessionMessageArgs = processSessionMessageArgs;
}

private ProcessMessageEventArgs? ProcessMessageArgs { get; }
private ProcessSessionMessageEventArgs? ProcessSessionMessageArgs { get; }

private ServiceBusReceivedMessage Message => ProcessMessageArgs?.Message ?? ProcessSessionMessageArgs!.Message;

public Task CompleteMessageAsync()
{
return ProcessMessageArgs != null
? ProcessMessageArgs.CompleteMessageAsync(Message)
: ProcessSessionMessageArgs!.CompleteMessageAsync(Message);
}
}
95 changes: 95 additions & 0 deletions src/DotNetCore.CAP.AzureServiceBus/ServiceBusProcessorFacade.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;

namespace DotNetCore.CAP.AzureServiceBus;

public class ServiceBusProcessorFacade : IAsyncDisposable
{
private readonly ServiceBusProcessor? _serviceBusProcessor;
private readonly ServiceBusSessionProcessor? _serviceBusSessionProcessor;

public bool IsSessionProcessor { get; }

public bool IsProcessing => IsSessionProcessor
? _serviceBusSessionProcessor!.IsProcessing
: _serviceBusProcessor!.IsProcessing;

public bool AutoCompleteMessages => IsSessionProcessor
? _serviceBusSessionProcessor!.AutoCompleteMessages
: _serviceBusProcessor!.AutoCompleteMessages;

public ServiceBusProcessorFacade(ServiceBusProcessor? serviceBusProcessor = null,
ServiceBusSessionProcessor? serviceBusSessionProcessor = null)
{
if (serviceBusProcessor is null && serviceBusSessionProcessor is null)
{
throw new ArgumentNullException(nameof(serviceBusProcessor),
"Either serviceBusProcessor or serviceBusSessionProcessor must be provided");
}

_serviceBusProcessor = serviceBusProcessor;
_serviceBusSessionProcessor = serviceBusSessionProcessor;

IsSessionProcessor = _serviceBusSessionProcessor is not null;
}

public Task StartProcessingAsync(CancellationToken cancellationToken = default)
{
return IsSessionProcessor
? _serviceBusSessionProcessor!.StartProcessingAsync(cancellationToken)
: _serviceBusProcessor!.StartProcessingAsync(cancellationToken);
}

public event Func<ProcessMessageEventArgs, Task> ProcessMessageAsync
{
add => _serviceBusProcessor!.ProcessMessageAsync += value;

remove => _serviceBusProcessor!.ProcessMessageAsync -= value;
}

public event Func<ProcessSessionMessageEventArgs, Task> ProcessSessionMessageAsync
{
add => _serviceBusSessionProcessor!.ProcessMessageAsync += value;

remove => _serviceBusSessionProcessor!.ProcessMessageAsync -= value;
}

public event Func<ProcessErrorEventArgs, Task> ProcessErrorAsync
{
add
{
if (IsSessionProcessor)
{
_serviceBusSessionProcessor!.ProcessErrorAsync += value;
}
else
{
_serviceBusProcessor!.ProcessErrorAsync += value;
}
}

remove
{
if (IsSessionProcessor)
{
_serviceBusSessionProcessor!.ProcessErrorAsync -= value;
}
else
{
_serviceBusProcessor!.ProcessErrorAsync -= value;
}
}
}


public async ValueTask DisposeAsync()
{
if (_serviceBusProcessor is not null) await _serviceBusProcessor.DisposeAsync();
if (_serviceBusSessionProcessor is not null) await _serviceBusSessionProcessor.DisposeAsync();
}
}

0 comments on commit adbe7bf

Please sign in to comment.