Skip to content

Commit

Permalink
fix: BrokerAddress property in AzureServiceBus transport (#1576)
Browse files Browse the repository at this point in the history
* fixed getting the broker address from service bus

* fixed namespace extraction
  • Loading branch information
mviegas authored Aug 23, 2024
1 parent 1c765af commit 225a513
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using DotNetCore.CAP.AzureServiceBus.Helpers;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -48,7 +49,7 @@ public AzureServiceBusConsumerClient(

public Action<LogMessageEventArgs>? OnLogCallback { get; set; }

public BrokerAddress BrokerAddress => new("AzureServiceBus", _asbOptions.ConnectionString);
public BrokerAddress BrokerAddress => ServiceBusHelpers.GetBrokerAddress(_asbOptions.ConnectionString, _asbOptions.Namespace);

public void Subscribe(IEnumerable<string> topics)
{
Expand Down
54 changes: 54 additions & 0 deletions src/DotNetCore.CAP.AzureServiceBus/Helpers/ServiceBusHelpers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using System;
using DotNetCore.CAP.Transport;

namespace DotNetCore.CAP.AzureServiceBus.Helpers;

public static class ServiceBusHelpers
{
public static BrokerAddress GetBrokerAddress(string? connectionString, string? @namespace)
{
var host = (@namespace, connectionString) switch
{
_ when string.IsNullOrWhiteSpace(@namespace) && string.IsNullOrWhiteSpace(connectionString)
=> throw new ArgumentException("Either connection string or namespace are required."),
_ when string.IsNullOrWhiteSpace(connectionString)
|| (!string.IsNullOrWhiteSpace(@namespace) && !string.IsNullOrWhiteSpace(connectionString))
=> @namespace!,
_ when string.IsNullOrWhiteSpace(@namespace)
=> TryGetEndpointFromConnectionString(connectionString, out var extractedValue)
? extractedValue!
: throw new InvalidOperationException("Unable to extract namespace from connection string.")
};

return new BrokerAddress("AzureServiceBus", host);
}


private static bool TryGetEndpointFromConnectionString(string? connectionString, out string? @namespace)
{
@namespace = string.Empty;

if (string.IsNullOrWhiteSpace(connectionString))
return false;

var keyValuePairs = connectionString.Split(';');

foreach (var kvp in keyValuePairs)
{
if (!kvp.StartsWith("Endpoint", StringComparison.InvariantCultureIgnoreCase)) continue;

var endpointParts = kvp.Split('=');

if (endpointParts.Length != 2) continue;

var uri = new Uri(endpointParts[1]);

// Namespace is the host part without the .servicebus.windows.net
@namespace = uri.ToString();

return true;
}

return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using DotNetCore.CAP.AzureServiceBus.Helpers;
using DotNetCore.CAP.AzureServiceBus.Producer;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
Expand Down Expand Up @@ -51,7 +52,7 @@ public IServiceBusProducerDescriptor CreateProducerForMessage(TransportMessage t
_asbOptions.Value.TopicPath);
}

public BrokerAddress BrokerAddress => new("AzureServiceBus", _asbOptions.Value.ConnectionString);
public BrokerAddress BrokerAddress => ServiceBusHelpers.GetBrokerAddress(_asbOptions.Value.ConnectionString, _asbOptions.Value.Namespace);

public async Task<OperateResult> SendAsync(TransportMessage transportMessage)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using System;
using DotNetCore.CAP.AzureServiceBus.Helpers;
using Xunit;

namespace DotNetCore.CAP.AzureServiceBus.Test.Helpers;

public class ServiceBusHelpersTests
{
[Fact]
public void GetBrokerAddress_ShouldThrowArgumentException_WhenBothInputsAreNull()
{
// Arrange
string? connectionString = null;
string? @namespace = null;

// Act & Assert
var ex = Assert.Throws<ArgumentException>(() => ServiceBusHelpers.GetBrokerAddress(connectionString, @namespace));
Assert.Equal("Either connection string or namespace are required.", ex.Message);
}

[Fact]
public void GetBrokerAddress_ShouldReturnNamespace_WhenConnectionStringIsNull()
{
// Arrange
string? connectionString = null;
string? @namespace = "sb://mynamespace.servicebus.windows.net/";

// Act
var result = ServiceBusHelpers.GetBrokerAddress(connectionString, @namespace);

// Assert
Assert.Equal("AzureServiceBus", result.Name);
Assert.Equal("sb://mynamespace.servicebus.windows.net/", result.Endpoint);
}

[Fact]
public void GetBrokerAddress_ShouldReturnExtractedNamespace_WhenNamespaceIsNull()
{
// Arrange
string? connectionString = "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=myPolicy;SharedAccessKey=myKey";
string? @namespace = null;

// Act
var result = ServiceBusHelpers.GetBrokerAddress(connectionString, @namespace);

// Assert
Assert.Equal("AzureServiceBus", result.Name);
Assert.Equal("sb://mynamespace.servicebus.windows.net/", result.Endpoint);
}

[Fact]
public void GetBrokerAddress_ShouldThrowInvalidOperationException_WhenNamespaceExtractionFails()
{
// Arrange
string? connectionString = "InvalidConnectionString";
string? @namespace = null;

// Act & Assert
var ex = Assert.Throws<InvalidOperationException>(() => ServiceBusHelpers.GetBrokerAddress(connectionString, @namespace));
Assert.Equal("Unable to extract namespace from connection string.", ex.Message);
}

[Fact]
public void GetBrokerAddress_ShouldReturnNamespace_WhenBothNamespaceAndConnectionStringAreProvided()
{
// Arrange
string? connectionString = "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=myPolicy;SharedAccessKey=myKey";
string? @namespace = "anothernamespace";

// Act
var result = ServiceBusHelpers.GetBrokerAddress(connectionString, @namespace);

// Assert
Assert.Equal("AzureServiceBus", result.Name);
Assert.Equal("anothernamespace", result.Endpoint);
}

[Fact]
public void GetBrokerAddress_ShouldReturnExtractedNamespace_WhenConnectionStringIsValidAndNamespaceIsEmpty()
{
// Arrange
string? connectionString = "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=myPolicy;SharedAccessKey=myKey";
string? @namespace = "";

// Act
var result = ServiceBusHelpers.GetBrokerAddress(connectionString, @namespace);

// Assert
Assert.Equal("AzureServiceBus", result.Name);
Assert.Equal("sb://mynamespace.servicebus.windows.net/", result.Endpoint);
}

[Fact]
public void GetBrokerAddress_ShouldReturnNamespace_WhenConnectionStringIsEmpty()
{
// Arrange
string? connectionString = "";
string? @namespace = "sb://mynamespace.servicebus.windows.net/";

// Act
var result = ServiceBusHelpers.GetBrokerAddress(connectionString, @namespace);

// Assert
Assert.Equal("AzureServiceBus", result.Name);
Assert.Equal("sb://mynamespace.servicebus.windows.net/", result.Endpoint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,28 @@ public class ServiceBusTransportTests

public ServiceBusTransportTests()
{
var config = new AzureServiceBusOptions();
var config = new AzureServiceBusOptions()
{
ConnectionString = "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=myPolicy;SharedAccessKey=myKey"
};

config.ConfigureCustomProducer<EntityCreated>(cfg => cfg.UseTopic("entity-created").WithSubscription());

_options = Options.Create(config);
}

[Fact]
public void Custom_Producer_Should_Have_Custom_Topic()
public void Transport_ShouldHaveCorrectBrokerAddress()
{
// Given, When
var transport = new AzureServiceBusTransport(NullLogger<AzureServiceBusTransport>.Instance, _options);

// Then
transport.BrokerAddress.Endpoint.ShouldBe("sb://mynamespace.servicebus.windows.net/");
}

[Fact]
public void CustomProducer_ShouldHaveCustomTopic()
{
// Given
var transport = new AzureServiceBusTransport(NullLogger<AzureServiceBusTransport>.Instance, _options);
Expand All @@ -45,7 +59,7 @@ public void Custom_Producer_Should_Have_Custom_Topic()
}

[Fact]
public void Default_Producer_Should_Have_Default_Topic()
public void DefaultProducer_ShouldHaveDefaultTopic()
{
// Given
var transport = new AzureServiceBusTransport(NullLogger<AzureServiceBusTransport>.Instance, _options);
Expand Down

0 comments on commit 225a513

Please sign in to comment.