From adbe7bff5c4fbbd3a6277eae672e29a6ab136f3e Mon Sep 17 00:00:00 2001 From: Dmytro Rakhmanov Date: Tue, 12 Sep 2023 05:34:11 +0300 Subject: [PATCH] Adding facade to enable session processing (#1396) * Adding facade to enable session processing * fixes --- .../AzureServiceBusConsumerClient.cs | 42 +++++--- .../AzureServiceBusConsumerCommitInput.cs | 18 +++- .../ServiceBusProcessorFacade.cs | 95 +++++++++++++++++++ 3 files changed, 142 insertions(+), 13 deletions(-) create mode 100644 src/DotNetCore.CAP.AzureServiceBus/ServiceBusProcessorFacade.cs diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs index 9c15f05c5..de5a07c86 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs @@ -26,7 +26,7 @@ internal sealed class AzureServiceBusConsumerClient : IConsumerClient private ServiceBusAdministrationClient? _administrationClient; private ServiceBusClient? _serviceBusClient; - private ServiceBusProcessor? _serviceBusProcessor; + private ServiceBusProcessorFacade? _serviceBusProcessor; public AzureServiceBusConsumerClient( ILogger logger, @@ -100,6 +100,15 @@ public void Listening(TimeSpan timeout, CancellationToken cancellationToken) { ConnectAsync().GetAwaiter().GetResult(); + if (_serviceBusProcessor!.IsSessionProcessor) + { + _serviceBusProcessor!.ProcessSessionMessageAsync += _serviceBusProcessor_ProcessSessionMessageAsync; + } + else + { + _serviceBusProcessor!.ProcessMessageAsync += _serviceBusProcessor_ProcessMessageAsync; + } + _serviceBusProcessor!.ProcessMessageAsync += _serviceBusProcessor_ProcessMessageAsync; _serviceBusProcessor.ProcessErrorAsync += _serviceBusProcessor_ProcessErrorAsync; @@ -110,8 +119,7 @@ public void Commit(object? sender) { var commitInput = (AzureServiceBusConsumerCommitInput)sender!; if (_serviceBusProcessor?.AutoCompleteMessages ?? false) - commitInput.ProcessMessageArgs.CompleteMessageAsync(commitInput.ProcessMessageArgs.Message).GetAwaiter() - .GetResult(); + commitInput.CompleteMessageAsync().GetAwaiter().GetResult(); } public void Reject(object? sender) @@ -150,6 +158,13 @@ private async Task _serviceBusProcessor_ProcessMessageAsync(ProcessMessageEventA await OnMessageCallback!(context, new AzureServiceBusConsumerCommitInput(arg)); } + private async Task _serviceBusProcessor_ProcessSessionMessageAsync(ProcessSessionMessageEventArgs arg) + { + var context = ConvertMessage(arg.Message); + + await OnMessageCallback!(context, new AzureServiceBusConsumerCommitInput(arg)); + } + public async Task ConnectAsync() { if (_serviceBusProcessor != null) return; @@ -200,15 +215,18 @@ public async Task ConnectAsync() } } - _serviceBusProcessor = _serviceBusClient.CreateProcessor(_asbOptions.TopicPath, _subscriptionName, - _asbOptions.EnableSessions - ? new ServiceBusProcessorOptions - { - AutoCompleteMessages = _asbOptions.AutoCompleteMessages, - MaxConcurrentCalls = _asbOptions.MaxConcurrentCalls, - MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(30) - } - : null); + _serviceBusProcessor = !_asbOptions.EnableSessions + ? new ServiceBusProcessorFacade( + _serviceBusClient.CreateProcessor(_asbOptions.TopicPath, _subscriptionName)) + : new ServiceBusProcessorFacade( + serviceBusSessionProcessor: _serviceBusClient.CreateSessionProcessor(_asbOptions.TopicPath, + _subscriptionName, + new ServiceBusSessionProcessorOptions + { + AutoCompleteMessages = _asbOptions.AutoCompleteMessages, + MaxConcurrentCallsPerSession = _asbOptions.MaxConcurrentCalls, + MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(30), + })); } } finally diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerCommitInput.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerCommitInput.cs index 1af67bab8..af20a2f3b 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerCommitInput.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerCommitInput.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System.Threading.Tasks; using Azure.Messaging.ServiceBus; namespace DotNetCore.CAP.AzureServiceBus; @@ -12,5 +13,20 @@ public AzureServiceBusConsumerCommitInput(ProcessMessageEventArgs processMessage ProcessMessageArgs = processMessageEventArgs; } - public ProcessMessageEventArgs ProcessMessageArgs { get; set; } + public AzureServiceBusConsumerCommitInput(ProcessSessionMessageEventArgs processSessionMessageArgs) + { + ProcessSessionMessageArgs = processSessionMessageArgs; + } + + private ProcessMessageEventArgs? ProcessMessageArgs { get; } + private ProcessSessionMessageEventArgs? ProcessSessionMessageArgs { get; } + + private ServiceBusReceivedMessage Message => ProcessMessageArgs?.Message ?? ProcessSessionMessageArgs!.Message; + + public Task CompleteMessageAsync() + { + return ProcessMessageArgs != null + ? ProcessMessageArgs.CompleteMessageAsync(Message) + : ProcessSessionMessageArgs!.CompleteMessageAsync(Message); + } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.AzureServiceBus/ServiceBusProcessorFacade.cs b/src/DotNetCore.CAP.AzureServiceBus/ServiceBusProcessorFacade.cs new file mode 100644 index 000000000..bcdee8571 --- /dev/null +++ b/src/DotNetCore.CAP.AzureServiceBus/ServiceBusProcessorFacade.cs @@ -0,0 +1,95 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; + +namespace DotNetCore.CAP.AzureServiceBus; + +public class ServiceBusProcessorFacade : IAsyncDisposable +{ + private readonly ServiceBusProcessor? _serviceBusProcessor; + private readonly ServiceBusSessionProcessor? _serviceBusSessionProcessor; + + public bool IsSessionProcessor { get; } + + public bool IsProcessing => IsSessionProcessor + ? _serviceBusSessionProcessor!.IsProcessing + : _serviceBusProcessor!.IsProcessing; + + public bool AutoCompleteMessages => IsSessionProcessor + ? _serviceBusSessionProcessor!.AutoCompleteMessages + : _serviceBusProcessor!.AutoCompleteMessages; + + public ServiceBusProcessorFacade(ServiceBusProcessor? serviceBusProcessor = null, + ServiceBusSessionProcessor? serviceBusSessionProcessor = null) + { + if (serviceBusProcessor is null && serviceBusSessionProcessor is null) + { + throw new ArgumentNullException(nameof(serviceBusProcessor), + "Either serviceBusProcessor or serviceBusSessionProcessor must be provided"); + } + + _serviceBusProcessor = serviceBusProcessor; + _serviceBusSessionProcessor = serviceBusSessionProcessor; + + IsSessionProcessor = _serviceBusSessionProcessor is not null; + } + + public Task StartProcessingAsync(CancellationToken cancellationToken = default) + { + return IsSessionProcessor + ? _serviceBusSessionProcessor!.StartProcessingAsync(cancellationToken) + : _serviceBusProcessor!.StartProcessingAsync(cancellationToken); + } + + public event Func ProcessMessageAsync + { + add => _serviceBusProcessor!.ProcessMessageAsync += value; + + remove => _serviceBusProcessor!.ProcessMessageAsync -= value; + } + + public event Func ProcessSessionMessageAsync + { + add => _serviceBusSessionProcessor!.ProcessMessageAsync += value; + + remove => _serviceBusSessionProcessor!.ProcessMessageAsync -= value; + } + + public event Func ProcessErrorAsync + { + add + { + if (IsSessionProcessor) + { + _serviceBusSessionProcessor!.ProcessErrorAsync += value; + } + else + { + _serviceBusProcessor!.ProcessErrorAsync += value; + } + } + + remove + { + if (IsSessionProcessor) + { + _serviceBusSessionProcessor!.ProcessErrorAsync -= value; + } + else + { + _serviceBusProcessor!.ProcessErrorAsync -= value; + } + } + } + + + public async ValueTask DisposeAsync() + { + if (_serviceBusProcessor is not null) await _serviceBusProcessor.DisposeAsync(); + if (_serviceBusSessionProcessor is not null) await _serviceBusSessionProcessor.DisposeAsync(); + } +} \ No newline at end of file