diff --git a/README.md b/README.md index 6da565674..c160edfbe 100644 --- a/README.md +++ b/README.md @@ -247,8 +247,7 @@ public void ShowTime2(DateTime datetime) } ``` -`ShowTime1` and `ShowTime2` will be called one after another because all received messages are processed linear. -You can change that behaviour to set `UseDispatchingPerGroup` true. +`ShowTime1` and `ShowTime2` will be called at the same time. BTW, You can specify the default group name in the configuration: diff --git a/README.zh-cn.md b/README.zh-cn.md index cd5bb9595..eb3d24f5a 100644 --- a/README.zh-cn.md +++ b/README.zh-cn.md @@ -248,7 +248,7 @@ public void ShowTime2(DateTime datetime) ``` -`ShowTime1` 和 `ShowTime2` 处于不同的组,他们在默认情况下被线性的接连调用,你可以通过设置`UseDispatchingPerGroup`为true来使两者互不影响的同时调用。 +`ShowTime1` 和 `ShowTime2` 将被同时调用。 PS,你可以通过下面的方式来指定默认的消费者组名称: diff --git a/build/version.props b/build/version.props index 4a408b33b..ce4d41834 100644 --- a/build/version.props +++ b/build/version.props @@ -1,7 +1,7 @@ 8 - 2 + 3 0 $(VersionMajor).$(VersionMinor).$(VersionPatch) diff --git a/docs/content/user-guide/en/monitoring/kubernetes.md b/docs/content/user-guide/en/monitoring/kubernetes.md index e2e3d43d5..2db2e40c7 100644 --- a/docs/content/user-guide/en/monitoring/kubernetes.md +++ b/docs/content/user-guide/en/monitoring/kubernetes.md @@ -18,6 +18,25 @@ services.AddCap(x => ``` +## UseK8sDiscovery Configuration + +This configuration option is used to configure the Dashboard/Nodes to list every K8s `service` by default. If this is set to `True` then only services with the `dotnetcore.cap.visibility: show` label will be listed. More information on labels will be found on the **Kubernetes Labels Configuration** section. + +* ShowOnlyExplicitVisibleNodes + +> Default :false + + +```cs +services.AddCap(x => +{ + // ... + x.UseK8sDiscovery(opt=>{ + opt.ShowOnlyExplicitVisibleNodes = true; + }); +}); +``` + The component will automatically detect whether it is inside the cluster. If it is inside the cluster, the Pod must be granted Kubernetes Api permissions. Refer to the next section. ## Assign Pod Access to Kubernetes Api @@ -90,6 +109,67 @@ spec: targetPort: 80 ``` +From version `8.3.0` and onwards you can use a `Role` instead of `ClusterRole` to allow discovery of services only inside the namespace that the dashboard is running. Kubernetes Roles has limited jurisdiction inside the namespace. In the above example just remove ClusterRole and ClusterRoleBinding and instead use the following: + +``` +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: ns-svc-reader +rules: +- apiGroups: [""] + resources: ["services"] + verbs: ["get", "watch", "list"] + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: read-pods +subjects: +- kind: ServiceAccount + name: api-access + namespace: default +roleRef: + kind: ClusterRole + name: ns-svc-reader + apiGroup: rbac.authorization.k8s.io + +``` + +## Kubernetes Labels Configuration + +The list of Nodes showed in the dashboard can be controlled by adding labels to the to your kubernetes services. + + +- `dotnetcore.cap.visibility` label is used to show or hide a service from the list. + + > Allowed Values: show | hide + + > Examples: `dotnetcore.cap.visibility: show` or `dotnetcore.cap.visibility: hide` + +By default every k8s service is listed with the first port found in the service. However if more ports are present on the service you can select the wanted by using the following labels: + +- `dotnetcore.cap.portName` label is used to filter the wanted port of the service. + + > Allowed Values: string + + > Examples: `dotnetcore.cap.portName: grpc` or `dotnetcore.cap.portName: http` + +If not found any port with the given name, it will try to match the next label portIndex + +- `dotnetcore.cap.portIndex` label is used to filter the wanted port of the service. This filter is taken into consideration only if no label portName is set or a non matching portName is set. + + > Allowed Values: number represented as string ex: '2' or '14' + + > Examples: `dotnetcore.cap.portIndex: '1'` or `dotnetcore.cap.portIndex: '3'` + + If the provided index is outside of bounds then it will fallback to the first port (index:0) + + + + + ## Using Dashboard Standalone You can use the Dashboard standalone without configuring CAP, in this case, the Dashboard can be deployed as a separate Pod in the Kubernetes cluster just for data viewing. The service to be viewed no longer needs to configure the `cap.UseK8sDiscovery()` option. diff --git a/docs/content/user-guide/en/transport/rabbitmq.md b/docs/content/user-guide/en/transport/rabbitmq.md index 7ac13c14c..bfd1285e1 100644 --- a/docs/content/user-guide/en/transport/rabbitmq.md +++ b/docs/content/user-guide/en/transport/rabbitmq.md @@ -50,6 +50,7 @@ VirtualHost | Broker virtual host | string | / Port | Port | int | -1 ExchangeName | Default exchange name | string | cap.default.topic QueueArguments | Extra queue `x-arguments` | QueueArgumentsOptions | N/A +QueueOptions | Change Options for created queue | QueueRabbitOptions | { Durable=true, Exclusive=false, AutoDelete=false } ConnectionFactoryOptions | RabbitMQClient native connection options | ConnectionFactory | N/A CustomHeadersBuilder | Custom subscribe headers | See the blow | N/A PublishConfirms | Enable [publish confirms](https://www.rabbitmq.com/confirms.html#publisher-confirms) | bool | false diff --git a/docs/content/user-guide/zh/monitoring/kubernetes.md b/docs/content/user-guide/zh/monitoring/kubernetes.md index b0fa1c5d4..b153bebdb 100644 --- a/docs/content/user-guide/zh/monitoring/kubernetes.md +++ b/docs/content/user-guide/zh/monitoring/kubernetes.md @@ -18,6 +18,25 @@ services.AddCap(x => ``` +## 使用K8sDiscovery配置 + +此配置选项用于配置仪表板/节点以默认列出每个 K8s `service` 。如果将此设置为 `true`,则只会列出带有`dotnetcore.cap.visibility: show` 标签的服务。有关标签的更多信息可以在 **Kubernetes 标签配置** 部分找到。 + +* ShowOnlyExplicitVisibleNodes + +> 默认值:false + + +```cs +services.AddCap(x => +{ + // ... + x.UseK8sDiscovery(opt=>{ + opt.ShowOnlyExplicitVisibleNodes = true; + }); +}); +``` + 组件将会自动检测是否处于集群内部,如果处于集群内部在需要赋予Pod Kubernetes Api 的权限。参考下一章节。 ## 分配 Pod 访问 Kubernetes Api @@ -90,6 +109,65 @@ spec: targetPort: 80 ``` +从版本 `8.3.0` 及更高版本,您可以使用 `Role` 而不是 `ClusterRole`,以允许仅在仪表板运行的命名空间内发现服务。 Kubernetes 角色在命名空间内拥有有限的管辖权。在上面的示例中,只需删除 ClusterRole 和 ClusterRoleBinding 并改为使用以下内容 + +``` +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: ns-svc-reader +rules: +- apiGroups: [""] + resources: ["services"] + verbs: ["get", "watch", "list"] + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: read-pods +subjects: +- kind: ServiceAccount + name: api-access + namespace: default +roleRef: + kind: ClusterRole + name: ns-svc-reader + apiGroup: rbac.authorization.k8s.io + +``` + +## Kubernetes 标签配置 + +可以通过向 kubernetes 服务添加标签来控制仪表板中显示的节点列表。 + + +- `dotnetcore.cap.visibility` 标签用于显示或隐藏列表中的服务。 + + > 可能的值: show | hide + + > 示例: `dotnetcore.cap.visibility: show` or `dotnetcore.cap.visibility: hide` + +默认情况下,每个 k8s 服务都会列出该服务中找到的第一个端口。但是,如果服务上存在更多端口,您可以使用以下标签选择所需的端口: + +- `dotnetcore.cap.portName` 标签用于过滤需要的服务端口。 + + > 可能的值: string + + > 示例: `dotnetcore.cap.portName: grpc` or `dotnetcore.cap.portName: http` + +If not found any port with the given name, it will try to match the next label portIndex + +- `dotnetcore.cap.portIndex` 标签用于过滤需要的服务端口。 仅当未设置标签 portName 或设置不匹配的 portName 时,才会考虑此过滤器。 + + > 可能的值: 数字表示为字符串 ex: '2' or '14' + + > 示例: `dotnetcore.cap.portIndex: '1'` or `dotnetcore.cap.portIndex: '3'` + + 如果提供的索引超出范围,那么它将回退到第一个端口(索引:0) + + + ## 独立使用 Dashboard 你可以独立使用 Dashboard 而不需要配置CAP,此时相当于 Dashboard 可作为单独的 Pod 部署到 Kubernetes 集群中仅用作查看数据,待查看的服务不再需要配置 `cap.UseK8sDiscovery()` 配置项。 diff --git a/docs/content/user-guide/zh/transport/rabbitmq.md b/docs/content/user-guide/zh/transport/rabbitmq.md index 111fb8a38..678d879ca 100644 --- a/docs/content/user-guide/zh/transport/rabbitmq.md +++ b/docs/content/user-guide/zh/transport/rabbitmq.md @@ -51,6 +51,7 @@ VirtualHost | 虚拟主机 | string | / Port | 端口号 | int | -1 ExchangeName | CAP默认Exchange名称 | string | cap.default.topic QueueArguments | 队列额外参数 x-arguments | QueueArgumentsOptions | N/A +QueueOptions | 更改已创建队列的选项 | QueueRabbitOptions | { Durable=true, Exclusive=false, AutoDelete=false } ConnectionFactoryOptions | RabbitMQClient原生参数 | ConnectionFactory | N/A CustomHeadersBuilder | 订阅者自定义头信息 | 见下文 | N/A PublishConfirms | 是否启用[发布确认](https://www.rabbitmq.com/confirms.html#publisher-confirms) | bool | false diff --git a/src/DotNetCore.CAP.AmazonSQS/DotNetCore.CAP.AmazonSQS.csproj b/src/DotNetCore.CAP.AmazonSQS/DotNetCore.CAP.AmazonSQS.csproj index 25af0b408..e27aef573 100644 --- a/src/DotNetCore.CAP.AmazonSQS/DotNetCore.CAP.AmazonSQS.csproj +++ b/src/DotNetCore.CAP.AmazonSQS/DotNetCore.CAP.AmazonSQS.csproj @@ -12,8 +12,8 @@ - - + + diff --git a/src/DotNetCore.CAP.AzureServiceBus/DotNetCore.CAP.AzureServiceBus.csproj b/src/DotNetCore.CAP.AzureServiceBus/DotNetCore.CAP.AzureServiceBus.csproj index 67129eb56..a3541a96b 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/DotNetCore.CAP.AzureServiceBus.csproj +++ b/src/DotNetCore.CAP.AzureServiceBus/DotNetCore.CAP.AzureServiceBus.csproj @@ -17,7 +17,7 @@ - + diff --git a/src/DotNetCore.CAP.Dashboard.K8s/DotNetCore.CAP.Dashboard.K8s.csproj b/src/DotNetCore.CAP.Dashboard.K8s/DotNetCore.CAP.Dashboard.K8s.csproj index 6bda5a9d7..cc6c6fa70 100644 --- a/src/DotNetCore.CAP.Dashboard.K8s/DotNetCore.CAP.Dashboard.K8s.csproj +++ b/src/DotNetCore.CAP.Dashboard.K8s/DotNetCore.CAP.Dashboard.K8s.csproj @@ -5,7 +5,7 @@ enable - + diff --git a/src/DotNetCore.CAP.Dashboard.K8s/K8sDiscoveryOptions.cs b/src/DotNetCore.CAP.Dashboard.K8s/K8sDiscoveryOptions.cs index d22098262..426ab23c3 100644 --- a/src/DotNetCore.CAP.Dashboard.K8s/K8sDiscoveryOptions.cs +++ b/src/DotNetCore.CAP.Dashboard.K8s/K8sDiscoveryOptions.cs @@ -14,7 +14,14 @@ public class K8sDiscoveryOptions public K8sDiscoveryOptions() { K8SClientConfig = KubernetesClientConfiguration.BuildDefaultConfig(); + ShowOnlyExplicitVisibleNodes = true; } public KubernetesClientConfiguration K8SClientConfig { get; set; } + + /// + /// If this is set to TRUE will make all nodes hidden by default. Only kubernetes services + /// with label "dotnetcore.cap.visibility:show" will be listed in the nodes section. + /// + public bool ShowOnlyExplicitVisibleNodes { get; set; } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.Dashboard.K8s/K8sNodeDiscoveryProvider.cs b/src/DotNetCore.CAP.Dashboard.K8s/K8sNodeDiscoveryProvider.cs index 42bc1efa2..576aa616a 100644 --- a/src/DotNetCore.CAP.Dashboard.K8s/K8sNodeDiscoveryProvider.cs +++ b/src/DotNetCore.CAP.Dashboard.K8s/K8sNodeDiscoveryProvider.cs @@ -11,20 +11,21 @@ namespace DotNetCore.CAP.Dashboard.K8s; // ReSharper disable once InconsistentNaming public class K8sNodeDiscoveryProvider : INodeDiscoveryProvider { + const string TagPrefix = "dotnetcore.cap"; private readonly ILogger _logger; - private readonly KubernetesClientConfiguration _options; + private readonly K8sDiscoveryOptions _options; public K8sNodeDiscoveryProvider(ILoggerFactory logger, K8sDiscoveryOptions options) { _logger = logger.CreateLogger(); - _options = options.K8SClientConfig; + _options = options; } public async Task GetNode(string svcName, string ns, CancellationToken cancellationToken = default) { try { - var client = new Kubernetes(_options); + var client = new Kubernetes(_options.K8SClientConfig); var service = await client.CoreV1.ReadNamespacedServiceAsync(svcName, ns, cancellationToken: cancellationToken); @@ -50,24 +51,11 @@ public async Task> GetNodes(string? ns, CancellationToken cancellati { try { - if (ns == null) return new List(); + ns = _options.K8SClientConfig.Namespace; - var client = new Kubernetes(_options); - var services = await client.CoreV1.ListNamespacedServiceAsync(ns, cancellationToken: cancellationToken); + if (ns == null) return new List(); - var nodes = new List(); - foreach (var service in services.Items) - { - nodes.Add(new Node - { - Id = service.Uid(), - Name = service.Name(), - Address = "http://" + service.Metadata.Name + "." + ns, - Port = service.Spec.Ports?[0].Port ?? 0, - Tags = string.Join(',', - service.Labels()?.Select(x => x.Key + ":" + x.Value) ?? Array.Empty()) - }); - } + var nodes = await ListServices(ns); CapCache.Global.AddOrUpdate("cap.nodes.count", nodes.Count, TimeSpan.FromSeconds(60), true); @@ -85,29 +73,243 @@ public async Task> GetNodes(string? ns, CancellationToken cancellati public async Task> GetNamespaces(CancellationToken cancellationToken) { - var client = new Kubernetes(_options); - var namespaces = await client.ListNamespaceAsync(cancellationToken: cancellationToken); - return namespaces.Items.Select(x => x.Name()).ToList(); + var client = new Kubernetes(_options.K8SClientConfig); + try + { + var namespaces = await client.ListNamespaceAsync(cancellationToken: cancellationToken); + return namespaces.Items.Select(x => x.Name()).ToList(); + } + catch (Exception) + { + if (string.IsNullOrEmpty(_options.K8SClientConfig.Namespace)) + { + return new List(); + } + + return new List() { _options.K8SClientConfig.Namespace }; + } } public async Task> ListServices(string? ns = null) { - var client = new Kubernetes(_options); + var client = new Kubernetes(_options.K8SClientConfig); var services = await client.CoreV1.ListNamespacedServiceAsync(ns); var result = new List(); foreach (var service in services.Items) { + IDictionary tags = service.Labels(); + + var filterResult = FilterNodesByTags(tags); + + if (filterResult.hideNode) + { + continue; + } + + int port = GetPortByNameOrIndex(service, filterResult.filteredPortName, filterResult.filteredPortIndex); + result.Add(new Node { Id = service.Uid(), Name = service.Name(), Address = "http://" + service.Metadata.Name + "." + ns, - Port = service.Spec.Ports?[0].Port ?? 0, + Port = port, Tags = string.Join(',', service.Labels()?.Select(x => x.Key + ":" + x.Value) ?? Array.Empty()) }); } return result; } + + + /// + /// Given the filters (filterPortName and filterPortIndex) this method will try to find the port + /// filterPortName is checked first and if no port is found by that name filterPortIndex is checked + /// Returns 0 if service is null or no port specified in the service + /// Returns the portNumber of the matched port if something is found + /// + /// + /// + /// + /// + private static int GetPortByNameOrIndex(V1Service? service, string filterPortName, int filterPortIndex) + { + if (service is null) + { + return 0; + } + + if (service.Spec.Ports is null) + { + return 0; + } + + var result = GetPortByName(service.Spec.Ports, filterPortName); + if (result > 0) + { + return result; + } + + result = GetPortByIndex(service.Spec.Ports, filterPortIndex); + if (result > 0) + { + return result; + } + + return service.Spec.Ports[0]?.Port ?? 0; + } + + /// + /// This method will try to find a port with the specified Index + /// Will Return 0 if index is not found + /// Returns: port number or 0 if not found + /// + /// + /// + /// + private static int GetPortByIndex(IList servicePorts, int filterIndex) + { + + var portByIndex = servicePorts.ElementAtOrDefault(filterIndex); + if (portByIndex is null) + { + return 0; + } + + return portByIndex.Port; + } + + /// + /// This method will try to find a port with the specified name + /// Will Return 0 if none found + /// Returns: port number or 0 if not found + /// + /// + /// + /// + private static int GetPortByName(IList servicePorts, string portName) + { + if (!string.IsNullOrEmpty(portName)) + { + return 0; + } + + var portByName = servicePorts.FirstOrDefault(p => p.Name == portName); + if (portByName is null) + { + return 0; + } + + return portByName.Port; + } + + private record TagFilterResult(bool hideNode, int filteredPortIndex, string filteredPortName); + + private TagFilterResult FilterNodesByTags(IDictionary tags) + { + var isNodeHidden = _options.ShowOnlyExplicitVisibleNodes; + var filteredPortIndex = 0; //this the default port index + var filteredPortName = string.Empty; //this the default port index + + if (tags == null) + { + + return new TagFilterResult(isNodeHidden, filteredPortIndex, filteredPortName); + } + + + foreach (var tag in tags) + { + //look out for dotnetcore.cap tags + //based on value will do conditions + var isCapTag = tag.Key.StartsWith(TagPrefix, StringComparison.InvariantCultureIgnoreCase); + + if (!isCapTag) + { + continue; + } + + string capTagScope = GetTagScope(tag); + + //check for hide Tag + if (IsNodeHidden(tag, capTagScope)) + { + return new TagFilterResult(true, filteredPortIndex, filteredPortName); + } + else + { + isNodeHidden = false; + } + + //check for portIndex-X tag. + //If multiple tags with portIndex are found only the last has power + var hasNewPort = CheckFilterPortIndex(tag, capTagScope); + if (hasNewPort.HasValue) + { + filteredPortIndex = hasNewPort.Value; + } + + //check for portName-X tag. + //If multiple tags with portName are found only the last has power + if (capTagScope.Equals("portName", StringComparison.InvariantCultureIgnoreCase)) + { + filteredPortName = tag.Value; + } + + } + + return new TagFilterResult(isNodeHidden, filteredPortIndex, filteredPortName); + } + + private int? CheckFilterPortIndex(KeyValuePair tag, string capTagScope) + { + if (!capTagScope.Equals("portIndex", StringComparison.InvariantCultureIgnoreCase)) + { + return null; + } + + var hasPort = int.TryParse(tag.Value, out int filterPort); + if (!hasPort) + { + return null; + } + + return filterPort; + } + + private bool IsNodeHidden(KeyValuePair tag, string capTagScope) + { + if (!capTagScope.Equals("visibility", StringComparison.InvariantCultureIgnoreCase)) + { + return false; + } + + //We will not show the node if the tag value is "dotnetcore.cap.visibility:hide" + if (tag.Value.Equals("hide", StringComparison.InvariantCultureIgnoreCase)) + { + return true; + } + + //We will not show the node if the K8s Dashboard option is + //ShowOnlyExplicitVisibleNodes=True + //and the tag value is NOT "dotnetcore.cap.visibility:show" + if (!_options.ShowOnlyExplicitVisibleNodes) + { + return false; + } + + return !tag.Value.Equals("show", StringComparison.InvariantCultureIgnoreCase); + } + + private string GetTagScope(KeyValuePair tag) + { + var capTagScope = tag.Key.Replace(TagPrefix, "", StringComparison.InvariantCultureIgnoreCase); + if (capTagScope.StartsWith(".")) + { + capTagScope = capTagScope.Substring(1); + } + + return capTagScope; + } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.Dashboard/DotNetCore.CAP.Dashboard.csproj b/src/DotNetCore.CAP.Dashboard/DotNetCore.CAP.Dashboard.csproj index 6313ec61e..68e25511c 100644 --- a/src/DotNetCore.CAP.Dashboard/DotNetCore.CAP.Dashboard.csproj +++ b/src/DotNetCore.CAP.Dashboard/DotNetCore.CAP.Dashboard.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj index f396ce3f4..d615b8806 100644 --- a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj +++ b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj @@ -12,7 +12,7 @@ - + diff --git a/src/DotNetCore.CAP.MongoDB/DotNetCore.CAP.MongoDB.csproj b/src/DotNetCore.CAP.MongoDB/DotNetCore.CAP.MongoDB.csproj index 489e85016..feea187e6 100644 --- a/src/DotNetCore.CAP.MongoDB/DotNetCore.CAP.MongoDB.csproj +++ b/src/DotNetCore.CAP.MongoDB/DotNetCore.CAP.MongoDB.csproj @@ -11,7 +11,7 @@ - + diff --git a/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj b/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj index bc8a97ec3..11183e2f9 100644 --- a/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj +++ b/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs b/src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs index a7cdf1252..9ed695be5 100644 --- a/src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; @@ -7,26 +7,25 @@ using Microsoft.Extensions.DependencyInjection; // ReSharper disable once CheckNamespace -namespace DotNetCore.CAP +namespace DotNetCore.CAP; + +internal sealed class NATSCapOptionsExtension : ICapOptionsExtension { - internal sealed class NATSCapOptionsExtension : ICapOptionsExtension - { - private readonly Action _configure; + private readonly Action _configure; - public NATSCapOptionsExtension(Action configure) - { - _configure = configure; - } + public NATSCapOptionsExtension(Action configure) + { + _configure = configure; + } - public void AddServices(IServiceCollection services) - { - services.AddSingleton(new CapMessageQueueMakerService("NATS JetStream")); + public void AddServices(IServiceCollection services) + { + services.AddSingleton(new CapMessageQueueMakerService("NATS JetStream")); - services.Configure(_configure); + services.Configure(_configure); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - } + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs b/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs index b9482e4f4..c2651d202 100644 --- a/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs +++ b/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; @@ -7,43 +7,42 @@ using NATS.Client.JetStream; // ReSharper disable once CheckNamespace -namespace DotNetCore.CAP +namespace DotNetCore.CAP; + +/// +/// Provides programmatic configuration for the CAP NATS project. +/// +public class NATSOptions { /// - /// Provides programmatic configuration for the CAP NATS project. + /// Gets or sets the server url/urls used to connect to the NATs server. + /// + /// This may contain username/password information. + public string Servers { get; set; } = "nats://127.0.0.1:4222"; + + /// + /// connection pool size, default is 10 + /// + public int ConnectionPoolSize { get; set; } = 10; + + /// + /// Allows a nats consumer client to dynamically create a stream and configure the expected subjects on the stream. Defaults to true. + /// + public bool EnableSubscriberClientStreamAndSubjectCreation { get; set; } = true; + + /// + /// Used to setup all NATs client options + /// + public Options? Options { get; set; } + + public Action? StreamOptions { get; set; } + + public Action? ConsumerOptions { get; set; } + + /// + /// If you need to get additional native delivery args, you can use this function to write into . /// - public class NATSOptions - { - /// - /// Gets or sets the server url/urls used to connect to the NATs server. - /// - /// This may contain username/password information. - public string Servers { get; set; } = "nats://127.0.0.1:4222"; - - /// - /// connection pool size, default is 10 - /// - public int ConnectionPoolSize { get; set; } = 10; - - /// - /// Allows a nats consumer client to dynamically create a stream and configure the expected subjects on the stream. Defaults to true. - /// - public bool EnableSubscriberClientStreamAndSubjectCreation { get; set; } = true; - - /// - /// Used to setup all NATs client options - /// - public Options? Options { get; set; } - - public Action? StreamOptions { get; set; } - - public Action? ConsumerOptions { get; set; } - - /// - /// If you need to get additional native delivery args, you can use this function to write into . - /// - public Func>>? CustomHeadersBuilder { get; set; } - - public Func NormalizeStreamName { get; set; } = origin => origin.Split('.')[0]; - } + public Func>>? CustomHeadersBuilder { get; set; } + + public Func NormalizeStreamName { get; set; } = origin => origin.Split('.')[0]; } \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs index c6be74b24..1068f7ab3 100644 --- a/src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs +++ b/src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs @@ -1,44 +1,43 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; using DotNetCore.CAP; // ReSharper disable once CheckNamespace -namespace Microsoft.Extensions.DependencyInjection +namespace Microsoft.Extensions.DependencyInjection; + +public static class CapOptionsExtensions { - public static class CapOptionsExtensions + /// + /// Configuration to use NATS in CAP. + /// + /// CAP configuration options + /// NATS bootstrap server urls. + public static CapOptions UseNATS(this CapOptions options, string? bootstrapServers = null) { - /// - /// Configuration to use NATS in CAP. - /// - /// CAP configuration options - /// NATS bootstrap server urls. - public static CapOptions UseNATS(this CapOptions options, string? bootstrapServers = null) + return options.UseNATS(opt => { - return options.UseNATS(opt => - { - if (bootstrapServers != null) - opt.Servers = bootstrapServers; - }); - } + if (bootstrapServers != null) + opt.Servers = bootstrapServers; + }); + } - /// - /// Configuration to use NATS in CAP. - /// - /// CAP configuration options - /// Provides programmatic configuration for the NATS. - /// - public static CapOptions UseNATS(this CapOptions options, Action configure) + /// + /// Configuration to use NATS in CAP. + /// + /// CAP configuration options + /// Provides programmatic configuration for the NATS. + /// + public static CapOptions UseNATS(this CapOptions options, Action configure) + { + if (configure == null) { - if (configure == null) - { - throw new ArgumentNullException(nameof(configure)); - } + throw new ArgumentNullException(nameof(configure)); + } - options.RegisterExtension(new NATSCapOptionsExtension(configure)); + options.RegisterExtension(new NATSCapOptionsExtension(configure)); - return options; - } + return options; } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj b/src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj index 192463c66..f584457b2 100644 --- a/src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj +++ b/src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj @@ -7,7 +7,7 @@ - + diff --git a/src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs b/src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs index 361b959a7..0af8a8720 100644 --- a/src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs +++ b/src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; @@ -8,78 +8,77 @@ using Microsoft.Extensions.Options; using NATS.Client; -namespace DotNetCore.CAP.NATS +namespace DotNetCore.CAP.NATS; + +public class ConnectionPool : IConnectionPool, IDisposable { - public class ConnectionPool : IConnectionPool, IDisposable - { - private readonly NATSOptions _options; - private readonly ConcurrentQueue _connectionPool; + private readonly NATSOptions _options; + private readonly ConcurrentQueue _connectionPool; - private readonly ConnectionFactory _connectionFactory; - private int _pCount; - private int _maxSize; + private readonly ConnectionFactory _connectionFactory; + private int _pCount; + private int _maxSize; - public ConnectionPool(ILogger logger, IOptions options) - { - _options = options.Value; - _connectionPool = new ConcurrentQueue(); - _connectionFactory = new ConnectionFactory(); - _maxSize = _options.ConnectionPoolSize; + public ConnectionPool(ILogger logger, IOptions options) + { + _options = options.Value; + _connectionPool = new ConcurrentQueue(); + _connectionFactory = new ConnectionFactory(); + _maxSize = _options.ConnectionPoolSize; - logger.LogDebug("NATS configuration: {0}", options.Value.Options); - } + logger.LogDebug("NATS configuration: {0}", options.Value.Options); + } - public string ServersAddress => _options.Servers; + public string ServersAddress => _options.Servers; - public IConnection RentConnection() + public IConnection RentConnection() + { + if (_connectionPool.TryDequeue(out var connection)) { - if (_connectionPool.TryDequeue(out var connection)) - { - Interlocked.Decrement(ref _pCount); - - return connection; - } - - if (_options.Options != null) - { - _options.Options.Url = _options.Servers; - connection = _connectionFactory.CreateConnection(_options.Options); - } - else - { - connection = _connectionFactory.CreateConnection(_options.Servers); - } + Interlocked.Decrement(ref _pCount); return connection; } - public bool Return(IConnection connection) + if (_options.Options != null) { - if (Interlocked.Increment(ref _pCount) <= _maxSize && connection.State == ConnState.CONNECTED) - { - _connectionPool.Enqueue(connection); - - return true; - } + _options.Options.Url = _options.Servers; + connection = _connectionFactory.CreateConnection(_options.Options); + } + else + { + connection = _connectionFactory.CreateConnection(_options.Servers); + } - if (!connection.IsReconnecting()) - { - connection.Dispose(); - } + return connection; + } - Interlocked.Decrement(ref _pCount); + public bool Return(IConnection connection) + { + if (Interlocked.Increment(ref _pCount) <= _maxSize && connection.State == ConnState.CONNECTED) + { + _connectionPool.Enqueue(connection); - return false; + return true; } - public void Dispose() + if (!connection.IsReconnecting()) { - _maxSize = 0; + connection.Dispose(); + } + + Interlocked.Decrement(ref _pCount); - while (_connectionPool.TryDequeue(out var context)) - { - context.Dispose(); - } + return false; + } + + public void Dispose() + { + _maxSize = 0; + + while (_connectionPool.TryDequeue(out var context)) + { + context.Dispose(); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/IConnectionPool.cs b/src/DotNetCore.CAP.NATS/IConnectionPool.cs index e6d78e6f8..b43004527 100644 --- a/src/DotNetCore.CAP.NATS/IConnectionPool.cs +++ b/src/DotNetCore.CAP.NATS/IConnectionPool.cs @@ -1,16 +1,15 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using NATS.Client; -namespace DotNetCore.CAP.NATS +namespace DotNetCore.CAP.NATS; + +public interface IConnectionPool { - public interface IConnectionPool - { - string ServersAddress { get; } + string ServersAddress { get; } - IConnection RentConnection(); + IConnection RentConnection(); - bool Return(IConnection connection); - } + bool Return(IConnection connection); } \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/ITransport.NATS.cs b/src/DotNetCore.CAP.NATS/ITransport.NATS.cs index cdb07c63a..1049989e4 100644 --- a/src/DotNetCore.CAP.NATS/ITransport.NATS.cs +++ b/src/DotNetCore.CAP.NATS/ITransport.NATS.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; @@ -10,61 +10,60 @@ using NATS.Client; using NATS.Client.JetStream; -namespace DotNetCore.CAP.NATS +namespace DotNetCore.CAP.NATS; + +internal class NATSTransport : ITransport { - internal class NATSTransport : ITransport + private readonly IConnectionPool _connectionPool; + private readonly ILogger _logger; + private readonly JetStreamOptions _jetStreamOptions; + + public NATSTransport(ILogger logger, IConnectionPool connectionPool) { - private readonly IConnectionPool _connectionPool; - private readonly ILogger _logger; - private readonly JetStreamOptions _jetStreamOptions; + _logger = logger; + _connectionPool = connectionPool; - public NATSTransport(ILogger logger, IConnectionPool connectionPool) - { - _logger = logger; - _connectionPool = connectionPool; + _jetStreamOptions = JetStreamOptions.Builder().WithPublishNoAck(false).WithRequestTimeout(3000).Build(); + } - _jetStreamOptions = JetStreamOptions.Builder().WithPublishNoAck(false).WithRequestTimeout(3000).Build(); - } + public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _connectionPool.ServersAddress); - public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _connectionPool.ServersAddress); + public async Task SendAsync(TransportMessage message) + { + var connection = _connectionPool.RentConnection(); - public async Task SendAsync(TransportMessage message) + try { - var connection = _connectionPool.RentConnection(); - - try + var msg = new Msg(message.GetName(), message.Body.ToArray()); + foreach (var header in message.Headers) { - var msg = new Msg(message.GetName(), message.Body.ToArray()); - foreach (var header in message.Headers) - { - msg.Header[header.Key] = header.Value; - } - - var js = connection.CreateJetStreamContext(_jetStreamOptions); + msg.Header[header.Key] = header.Value; + } - var builder = PublishOptions.Builder().WithMessageId(message.GetId()); + var js = connection.CreateJetStreamContext(_jetStreamOptions); - var resp = await js.PublishAsync(msg, builder.Build()); + var builder = PublishOptions.Builder().WithMessageId(message.GetId()); - if (resp.Seq > 0) - { - _logger.LogDebug($"NATS stream message [{message.GetName()}] has been published."); + var resp = await js.PublishAsync(msg, builder.Build()); - return OperateResult.Success; - } - - throw new PublisherSentFailedException("NATS message send failed, no consumer reply!"); - } - catch (Exception ex) + if (resp.Seq > 0) { - var warpEx = new PublisherSentFailedException(ex.Message, ex); + _logger.LogDebug($"NATS stream message [{message.GetName()}] has been published."); - return OperateResult.Failed(warpEx); - } - finally - { - _connectionPool.Return(connection); + return OperateResult.Success; } + + throw new PublisherSentFailedException("NATS message send failed, no consumer reply!"); + } + catch (Exception ex) + { + var warpEx = new PublisherSentFailedException(ex.Message, ex); + + return OperateResult.Failed(warpEx); + } + finally + { + _connectionPool.Return(connection); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs index f8f2dc31f..0e0216d66 100644 --- a/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs +++ b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; @@ -13,241 +13,240 @@ using NATS.Client; using NATS.Client.JetStream; -namespace DotNetCore.CAP.NATS +namespace DotNetCore.CAP.NATS; + +internal sealed class NATSConsumerClient : IConsumerClient { - internal sealed class NATSConsumerClient : IConsumerClient - { - private static readonly object ConnectionLock = new(); + private static readonly object ConnectionLock = new(); - private readonly string _groupName; - private readonly byte _groupConcurrent; - private readonly IServiceProvider _serviceProvider; - private readonly NATSOptions _natsOptions; - private readonly SemaphoreSlim _semaphore; - private IConnection? _consumerClient; + private readonly string _groupName; + private readonly byte _groupConcurrent; + private readonly IServiceProvider _serviceProvider; + private readonly NATSOptions _natsOptions; + private readonly SemaphoreSlim _semaphore; + private IConnection? _consumerClient; - public NATSConsumerClient(string groupName, byte groupConcurrent, IOptions options, IServiceProvider serviceProvider) - { - _groupName = groupName; - _groupConcurrent = groupConcurrent; - _serviceProvider = serviceProvider; - _semaphore = new SemaphoreSlim(groupConcurrent); - _natsOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); - } + public NATSConsumerClient(string groupName, byte groupConcurrent, IOptions options, IServiceProvider serviceProvider) + { + _groupName = groupName; + _groupConcurrent = groupConcurrent; + _serviceProvider = serviceProvider; + _semaphore = new SemaphoreSlim(groupConcurrent); + _natsOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); + } - public Func? OnMessageCallback { get; set; } + public Func? OnMessageCallback { get; set; } - public Action? OnLogCallback { get; set; } + public Action? OnLogCallback { get; set; } - public BrokerAddress BrokerAddress => new("NATS", _natsOptions.Servers); + public BrokerAddress BrokerAddress => new("NATS", _natsOptions.Servers); - public ICollection FetchTopics(IEnumerable topicNames) + public ICollection FetchTopics(IEnumerable topicNames) + { + if (_natsOptions.EnableSubscriberClientStreamAndSubjectCreation) { - if (_natsOptions.EnableSubscriberClientStreamAndSubjectCreation) - { - Connect(); + Connect(); - var jsm = _consumerClient!.CreateJetStreamManagementContext(); + var jsm = _consumerClient!.CreateJetStreamManagementContext(); - var streamSubjectsGroups = topicNames.GroupBy(x => _natsOptions.NormalizeStreamName(x)); + var streamSubjectsGroups = topicNames.GroupBy(x => _natsOptions.NormalizeStreamName(x)); - foreach (var streamSubjectsGroup in streamSubjectsGroups) - { - var builder = StreamConfiguration.Builder() - .WithName(streamSubjectsGroup.Key) - .WithNoAck(false) - .WithStorageType(StorageType.Memory) - .WithSubjects(streamSubjectsGroup.ToList()); + foreach (var streamSubjectsGroup in streamSubjectsGroups) + { + var builder = StreamConfiguration.Builder() + .WithName(streamSubjectsGroup.Key) + .WithNoAck(false) + .WithStorageType(StorageType.Memory) + .WithSubjects(streamSubjectsGroup.ToList()); - _natsOptions.StreamOptions?.Invoke(builder); + _natsOptions.StreamOptions?.Invoke(builder); + try + { + jsm.GetStreamInfo(streamSubjectsGroup.Key); // this throws if the stream does not exist + + jsm.UpdateStream(builder.Build()); + } + catch (NATSJetStreamException) + { try { - jsm.GetStreamInfo(streamSubjectsGroup.Key); // this throws if the stream does not exist - - jsm.UpdateStream(builder.Build()); + jsm.AddStream(builder.Build()); } - catch (NATSJetStreamException) + catch { - try - { - jsm.AddStream(builder.Build()); - } - catch - { - // ignored - } + // ignored } } } - - return topicNames.ToList(); } - public void Subscribe(IEnumerable topics) + return topicNames.ToList(); + } + + public void Subscribe(IEnumerable topics) + { + if (topics == null) { - if (topics == null) - { - throw new ArgumentNullException(nameof(topics)); - } + throw new ArgumentNullException(nameof(topics)); + } - Connect(); + Connect(); - var js = _consumerClient!.CreateJetStreamContext(); - var streamGroup = topics.GroupBy(x => _natsOptions.NormalizeStreamName(x)); + var js = _consumerClient!.CreateJetStreamContext(); + var streamGroup = topics.GroupBy(x => _natsOptions.NormalizeStreamName(x)); - lock (ConnectionLock) + lock (ConnectionLock) + { + foreach (var subjectStream in streamGroup) { - foreach (var subjectStream in streamGroup) + var groupName = Helper.Normalized(_groupName); + + foreach (var subject in subjectStream) { - var groupName = Helper.Normalized(_groupName); + try + { + var consumerConfig = ConsumerConfiguration.Builder() + .WithDurable(Helper.Normalized(groupName + "-" + subject)) + .WithDeliverPolicy(DeliverPolicy.New) + .WithAckWait(30000) + .WithAckPolicy(AckPolicy.Explicit); + + _natsOptions.ConsumerOptions?.Invoke(consumerConfig); - foreach (var subject in subjectStream) + var pso = PushSubscribeOptions.Builder() + .WithStream(subjectStream.Key) + .WithConfiguration(consumerConfig.Build()) + .Build(); + + js.PushSubscribeAsync(subject, groupName, SubscriptionMessageHandler, false, pso); + } + catch (Exception e) { - try + OnLogCallback!(new LogMessageEventArgs() { - var consumerConfig = ConsumerConfiguration.Builder() - .WithDurable(Helper.Normalized(groupName + "-" + subject)) - .WithDeliverPolicy(DeliverPolicy.New) - .WithAckWait(30000) - .WithAckPolicy(AckPolicy.Explicit); - - _natsOptions.ConsumerOptions?.Invoke(consumerConfig); - - var pso = PushSubscribeOptions.Builder() - .WithStream(subjectStream.Key) - .WithConfiguration(consumerConfig.Build()) - .Build(); - - js.PushSubscribeAsync(subject, groupName, SubscriptionMessageHandler, false, pso); - } - catch (Exception e) - { - OnLogCallback!(new LogMessageEventArgs() - { - LogType = MqLogType.ConnectError, - Reason = $"An error was encountered when attempting to subscribe to subject: {subject}.{Environment.NewLine}" + - $"{e.Message}" - }); - } + LogType = MqLogType.ConnectError, + Reason = $"An error was encountered when attempting to subscribe to subject: {subject}.{Environment.NewLine}" + + $"{e.Message}" + }); } } } } + } - public void Listening(TimeSpan timeout, CancellationToken cancellationToken) + public void Listening(TimeSpan timeout, CancellationToken cancellationToken) + { + while (true) { - while (true) - { - cancellationToken.ThrowIfCancellationRequested(); - cancellationToken.WaitHandle.WaitOne(timeout); - } - // ReSharper disable once FunctionNeverReturns + cancellationToken.ThrowIfCancellationRequested(); + cancellationToken.WaitHandle.WaitOne(timeout); } + // ReSharper disable once FunctionNeverReturns + } - private void SubscriptionMessageHandler(object? sender, MsgHandlerEventArgs e) + private void SubscriptionMessageHandler(object? sender, MsgHandlerEventArgs e) + { + if (_groupConcurrent > 0) { - if (_groupConcurrent > 0) - { - _semaphore.Wait(); - Task.Run(() => Consume()).ConfigureAwait(false); - } - else - { - Consume().GetAwaiter().GetResult(); - } + _semaphore.Wait(); + Task.Run(() => Consume()).ConfigureAwait(false); + } + else + { + Consume().GetAwaiter().GetResult(); + } - Task Consume() - { - var headers = new Dictionary(); + Task Consume() + { + var headers = new Dictionary(); - foreach (string h in e.Message.Header.Keys) - { - headers.Add(h, e.Message.Header[h]); - } + foreach (string h in e.Message.Header.Keys) + { + headers.Add(h, e.Message.Header[h]); + } - headers.Add(Headers.Group, _groupName); + headers.Add(Headers.Group, _groupName); - if (_natsOptions.CustomHeadersBuilder != null) + if (_natsOptions.CustomHeadersBuilder != null) + { + var customHeaders = _natsOptions.CustomHeadersBuilder(e, _serviceProvider); + foreach (var customHeader in customHeaders) { - var customHeaders = _natsOptions.CustomHeadersBuilder(e, _serviceProvider); - foreach (var customHeader in customHeaders) - { - headers[customHeader.Key] = customHeader.Value; - } + headers[customHeader.Key] = customHeader.Value; } - - return OnMessageCallback!(new TransportMessage(headers, e.Message.Data), e.Message); } + + return OnMessageCallback!(new TransportMessage(headers, e.Message.Data), e.Message); } + } - public void Commit(object? sender) + public void Commit(object? sender) + { + if (sender is Msg msg) { - if (sender is Msg msg) - { - msg.Ack(); - } - _semaphore.Release(); + msg.Ack(); } + _semaphore.Release(); + } - public void Reject(object? sender) + public void Reject(object? sender) + { + if (sender is Msg msg) { - if (sender is Msg msg) - { - msg.Nak(); - } - _semaphore.Release(); + msg.Nak(); } + _semaphore.Release(); + } + + public void Dispose() + { + _consumerClient?.Dispose(); + } - public void Dispose() + public void Connect() + { + if (_consumerClient != null) { - _consumerClient?.Dispose(); + return; } - public void Connect() + lock (ConnectionLock) { - if (_consumerClient != null) + if (_consumerClient == null) { - return; - } - - lock (ConnectionLock) - { - if (_consumerClient == null) - { - var opts = _natsOptions.Options ?? ConnectionFactory.GetDefaultOptions(); - opts.Url ??= _natsOptions.Servers; - opts.DisconnectedEventHandler = DisconnectedEventHandler; - opts.AsyncErrorEventHandler = AsyncErrorEventHandler; - opts.Timeout = 5000; - opts.AllowReconnect = false; - opts.NoEcho = true; - - _consumerClient = new ConnectionFactory().CreateConnection(opts); - } + var opts = _natsOptions.Options ?? ConnectionFactory.GetDefaultOptions(); + opts.Url ??= _natsOptions.Servers; + opts.DisconnectedEventHandler = DisconnectedEventHandler; + opts.AsyncErrorEventHandler = AsyncErrorEventHandler; + opts.Timeout = 5000; + opts.AllowReconnect = false; + opts.NoEcho = true; + + _consumerClient = new ConnectionFactory().CreateConnection(opts); } } + } - private void DisconnectedEventHandler(object? sender, ConnEventArgs e) - { - if (e.Error is null) return; + private void DisconnectedEventHandler(object? sender, ConnEventArgs e) + { + if (e.Error is null) return; - var logArgs = new LogMessageEventArgs - { - LogType = MqLogType.ConnectError, - Reason = e.Error.ToString() - }; - OnLogCallback!(logArgs); - } + var logArgs = new LogMessageEventArgs + { + LogType = MqLogType.ConnectError, + Reason = e.Error.ToString() + }; + OnLogCallback!(logArgs); + } - private void AsyncErrorEventHandler(object? sender, ErrEventArgs e) + private void AsyncErrorEventHandler(object? sender, ErrEventArgs e) + { + var logArgs = new LogMessageEventArgs { - var logArgs = new LogMessageEventArgs - { - LogType = MqLogType.AsyncErrorEvent, - Reason = e.Error - }; - OnLogCallback!(logArgs); - } + LogType = MqLogType.AsyncErrorEvent, + Reason = e.Error + }; + OnLogCallback!(logArgs); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs b/src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs index 2e1386664..ced7043e2 100644 --- a/src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs +++ b/src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs @@ -1,35 +1,34 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; using DotNetCore.CAP.Transport; using Microsoft.Extensions.Options; -namespace DotNetCore.CAP.NATS +namespace DotNetCore.CAP.NATS; + +internal sealed class NATSConsumerClientFactory : IConsumerClientFactory { - internal sealed class NATSConsumerClientFactory : IConsumerClientFactory + private readonly IOptions _natsOptions; + private readonly IServiceProvider _serviceProvider; + + public NATSConsumerClientFactory(IOptions natsOptions, IServiceProvider serviceProvider) { - private readonly IOptions _natsOptions; - private readonly IServiceProvider _serviceProvider; + _natsOptions = natsOptions; + _serviceProvider = serviceProvider; + } - public NATSConsumerClientFactory(IOptions natsOptions, IServiceProvider serviceProvider) + public IConsumerClient Create(string groupName, byte groupConcurrent) + { + try { - _natsOptions = natsOptions; - _serviceProvider = serviceProvider; + var client = new NATSConsumerClient(groupName, groupConcurrent, _natsOptions, _serviceProvider); + client.Connect(); + return client; } - - public IConsumerClient Create(string groupName, byte groupConcurrent) + catch (System.Exception e) { - try - { - var client = new NATSConsumerClient(groupName, groupConcurrent, _natsOptions, _serviceProvider); - client.Connect(); - return client; - } - catch (System.Exception e) - { - throw new BrokerConnectionException(e); - } + throw new BrokerConnectionException(e); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.OpenTelemetry/DotNetCore.CAP.OpenTelemetry.csproj b/src/DotNetCore.CAP.OpenTelemetry/DotNetCore.CAP.OpenTelemetry.csproj index 1eba8d625..fa2cd07f3 100644 --- a/src/DotNetCore.CAP.OpenTelemetry/DotNetCore.CAP.OpenTelemetry.csproj +++ b/src/DotNetCore.CAP.OpenTelemetry/DotNetCore.CAP.OpenTelemetry.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/DotNetCore.CAP.Pulsar/DotNetCore.CAP.Pulsar.csproj b/src/DotNetCore.CAP.Pulsar/DotNetCore.CAP.Pulsar.csproj index c30e4b4f3..000f22528 100644 --- a/src/DotNetCore.CAP.Pulsar/DotNetCore.CAP.Pulsar.csproj +++ b/src/DotNetCore.CAP.Pulsar/DotNetCore.CAP.Pulsar.csproj @@ -12,11 +12,11 @@ - + - + diff --git a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs index 06fd7d6a6..44cae5b9d 100644 --- a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs +++ b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs @@ -80,6 +80,13 @@ public class RabbitMQOptions /// public QueueArgumentsOptions QueueArguments { get; set; } = new(); + + /// + /// Optional queue arguments, also known as "x-arguments" because of their field name in the AMQP 0-9-1 protocol, + /// is a map (dictionary) of arbitrary key/value pairs that can be provided by clients when a queue is declared. + /// + public QueueRabbitOptions QueueOptions { get; set; } = new(); + /// /// If you need to get additional native delivery args, you can use this function to write into . /// @@ -155,4 +162,12 @@ public BasicQos(ushort prefetchCount, bool global = false) /// public bool Global { get; } } -} \ No newline at end of file + + public class QueueRabbitOptions + { + public bool Durable { get; set; } = true; + public bool Exclusive { get; set; } = false; + public bool AutoDelete { get; set; } = false; + } +} + diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs index 39c74a9a9..66815e16d 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; @@ -11,146 +11,145 @@ using RabbitMQ.Client; using RabbitMQ.Client.Events; -namespace DotNetCore.CAP.RabbitMQ +namespace DotNetCore.CAP.RabbitMQ; + +public class RabbitMQBasicConsumer : AsyncDefaultBasicConsumer { - public class RabbitMQBasicConsumer : AsyncDefaultBasicConsumer + private readonly SemaphoreSlim _semaphore; + private readonly string _groupName; + private readonly bool _usingTaskRun; + private readonly Func _msgCallback; + private readonly Action _logCallback; + private readonly Func>>? _customHeadersBuilder; + private readonly IServiceProvider _serviceProvider; + + public RabbitMQBasicConsumer(IModel? model, + byte concurrent, string groupName, + Func msgCallback, + Action logCallback, + Func>>? customHeadersBuilder, + IServiceProvider serviceProvider) + : base(model) { - private readonly SemaphoreSlim _semaphore; - private readonly string _groupName; - private readonly bool _usingTaskRun; - private readonly Func _msgCallback; - private readonly Action _logCallback; - private readonly Func>>? _customHeadersBuilder; - private readonly IServiceProvider _serviceProvider; - - public RabbitMQBasicConsumer(IModel? model, - byte concurrent, string groupName, - Func msgCallback, - Action logCallback, - Func>>? customHeadersBuilder, - IServiceProvider serviceProvider) - : base(model) - { - _semaphore = new SemaphoreSlim(concurrent); - _groupName = groupName; - _usingTaskRun = concurrent > 0; - _msgCallback = msgCallback; - _logCallback = logCallback; - _customHeadersBuilder = customHeadersBuilder; - _serviceProvider = serviceProvider; - } + _semaphore = new SemaphoreSlim(concurrent); + _groupName = groupName; + _usingTaskRun = concurrent > 0; + _msgCallback = msgCallback; + _logCallback = logCallback; + _customHeadersBuilder = customHeadersBuilder; + _serviceProvider = serviceProvider; + } - public override async Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, - string routingKey, IBasicProperties properties, ReadOnlyMemory body) + public override async Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, + string routingKey, IBasicProperties properties, ReadOnlyMemory body) + { + if (_usingTaskRun) { - if (_usingTaskRun) - { - await _semaphore.WaitAsync(); + await _semaphore.WaitAsync(); - _ = Task.Run(Consume).ConfigureAwait(false); - } - else - { - await Consume().ConfigureAwait(false); - } + _ = Task.Run(Consume).ConfigureAwait(false); + } + else + { + await Consume().ConfigureAwait(false); + } - Task Consume() - { - var headers = new Dictionary(); + Task Consume() + { + var headers = new Dictionary(); - if (properties.Headers != null) - foreach (var header in properties.Headers) - { - if (header.Value is byte[] val) - headers.Add(header.Key, Encoding.UTF8.GetString(val)); - else - headers.Add(header.Key, header.Value?.ToString()); - } + if (properties.Headers != null) + foreach (var header in properties.Headers) + { + if (header.Value is byte[] val) + headers.Add(header.Key, Encoding.UTF8.GetString(val)); + else + headers.Add(header.Key, header.Value?.ToString()); + } - headers.Add(Messages.Headers.Group, _groupName); + headers.Add(Messages.Headers.Group, _groupName); - if (_customHeadersBuilder != null) + if (_customHeadersBuilder != null) + { + var e = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); + var customHeaders = _customHeadersBuilder(e, _serviceProvider); + foreach (var customHeader in customHeaders) { - var e = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); - var customHeaders = _customHeadersBuilder(e, _serviceProvider); - foreach (var customHeader in customHeaders) - { - headers[customHeader.Key] = customHeader.Value; - } + headers[customHeader.Key] = customHeader.Value; } + } - var message = new TransportMessage(headers, body); + var message = new TransportMessage(headers, body); - return _msgCallback(message, deliveryTag); - } + return _msgCallback(message, deliveryTag); } + } - public void BasicAck(ulong deliveryTag) - { - if (Model.IsOpen) - Model.BasicAck(deliveryTag, false); + public void BasicAck(ulong deliveryTag) + { + if (Model.IsOpen) + Model.BasicAck(deliveryTag, false); - _semaphore.Release(); - } + _semaphore.Release(); + } - public void BasicReject(ulong deliveryTag) - { - if (Model.IsOpen) - Model.BasicReject(deliveryTag, true); + public void BasicReject(ulong deliveryTag) + { + if (Model.IsOpen) + Model.BasicReject(deliveryTag, true); - _semaphore.Release(); - } + _semaphore.Release(); + } - public override async Task OnCancel(params string[] consumerTags) + public override async Task OnCancel(params string[] consumerTags) + { + await base.OnCancel(consumerTags); + + var args = new LogMessageEventArgs { - await base.OnCancel(consumerTags); + LogType = MqLogType.ConsumerCancelled, + Reason = string.Join(",", consumerTags) + }; - var args = new LogMessageEventArgs - { - LogType = MqLogType.ConsumerCancelled, - Reason = string.Join(",", consumerTags) - }; + _logCallback(args); + } - _logCallback(args); - } + public override async Task HandleBasicCancelOk(string consumerTag) + { + await base.HandleBasicCancelOk(consumerTag); - public override async Task HandleBasicCancelOk(string consumerTag) + var args = new LogMessageEventArgs { - await base.HandleBasicCancelOk(consumerTag); + LogType = MqLogType.ConsumerUnregistered, + Reason = consumerTag + }; - var args = new LogMessageEventArgs - { - LogType = MqLogType.ConsumerUnregistered, - Reason = consumerTag - }; + _logCallback(args); + } - _logCallback(args); - } + public override async Task HandleBasicConsumeOk(string consumerTag) + { + await base.HandleBasicConsumeOk(consumerTag); - public override async Task HandleBasicConsumeOk(string consumerTag) + var args = new LogMessageEventArgs { - await base.HandleBasicConsumeOk(consumerTag); + LogType = MqLogType.ConsumerRegistered, + Reason = consumerTag + }; - var args = new LogMessageEventArgs - { - LogType = MqLogType.ConsumerRegistered, - Reason = consumerTag - }; + _logCallback(args); + } - _logCallback(args); - } + public override async Task HandleModelShutdown(object model, ShutdownEventArgs reason) + { + await base.HandleModelShutdown(model, reason); - public override async Task HandleModelShutdown(object model, ShutdownEventArgs reason) + var args = new LogMessageEventArgs { - await base.HandleModelShutdown(model, reason); + LogType = MqLogType.ConsumerShutdown, + Reason = reason.ReplyText + }; - var args = new LogMessageEventArgs - { - LogType = MqLogType.ConsumerShutdown, - Reason = reason.ReplyText - }; - - _logCallback(args); - } + _logCallback(args); } } diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs index b1fc83423..42f4665aa 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs @@ -132,7 +132,7 @@ public void Connect() try { - _channel.QueueDeclare(_queueName, true, false, false, arguments); + _channel.QueueDeclare(_queueName, _rabbitMQOptions.QueueOptions.Durable, _rabbitMQOptions.QueueOptions.Exclusive, _rabbitMQOptions.QueueOptions.AutoDelete, arguments); } catch (TimeoutException ex) { diff --git a/src/DotNetCore.CAP.RedisStreams/DotNetCore.CAP.RedisStreams.csproj b/src/DotNetCore.CAP.RedisStreams/DotNetCore.CAP.RedisStreams.csproj index c52f376e1..0e8a3eec2 100644 --- a/src/DotNetCore.CAP.RedisStreams/DotNetCore.CAP.RedisStreams.csproj +++ b/src/DotNetCore.CAP.RedisStreams/DotNetCore.CAP.RedisStreams.csproj @@ -12,7 +12,7 @@ - + diff --git a/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj b/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj index 4e859ad96..4986a8c94 100644 --- a/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj +++ b/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/DotNetCore.CAP/Diagnostics/EventCounterSource.Cap.cs b/src/DotNetCore.CAP/Diagnostics/EventCounterSource.Cap.cs index 8739b56d0..bd186d054 100644 --- a/src/DotNetCore.CAP/Diagnostics/EventCounterSource.Cap.cs +++ b/src/DotNetCore.CAP/Diagnostics/EventCounterSource.Cap.cs @@ -1,87 +1,86 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; using System.Diagnostics.Tracing; -namespace DotNetCore.CAP.Diagnostics +namespace DotNetCore.CAP.Diagnostics; + +[EventSource(Name = CapDiagnosticListenerNames.MetricListenerName)] +public class CapEventCounterSource : EventSource { - [EventSource(Name = CapDiagnosticListenerNames.MetricListenerName)] - public class CapEventCounterSource : EventSource - { - public static readonly CapEventCounterSource Log = new(); + public static readonly CapEventCounterSource Log = new(); - private IncrementingEventCounter? _publishPerSecondCounter; - private IncrementingEventCounter? _consumePerSecondCounter; - private IncrementingEventCounter? _subscriberInvokePerSecondCounter; + private IncrementingEventCounter? _publishPerSecondCounter; + private IncrementingEventCounter? _consumePerSecondCounter; + private IncrementingEventCounter? _subscriberInvokePerSecondCounter; - private EventCounter? _invokeCounter; + private EventCounter? _invokeCounter; - private CapEventCounterSource() { } + private CapEventCounterSource() { } - protected override void OnEventCommand(EventCommandEventArgs args) + protected override void OnEventCommand(EventCommandEventArgs args) + { + if (args.Command == EventCommand.Enable) { - if (args.Command == EventCommand.Enable) + _publishPerSecondCounter ??= new IncrementingEventCounter(CapDiagnosticListenerNames.PublishedPerSec, this) { - _publishPerSecondCounter ??= new IncrementingEventCounter(CapDiagnosticListenerNames.PublishedPerSec, this) - { - DisplayName = "Publish Rate", - DisplayRateTimeScale = TimeSpan.FromSeconds(1) - }; - - _consumePerSecondCounter ??= new IncrementingEventCounter(CapDiagnosticListenerNames.ConsumePerSec, this) - { - DisplayName = "Consume Rate", - DisplayRateTimeScale = TimeSpan.FromSeconds(1) - }; - - _subscriberInvokePerSecondCounter ??= new IncrementingEventCounter(CapDiagnosticListenerNames.InvokeSubscriberPerSec, this) - { - DisplayName = "Invoke Subscriber Rate", - DisplayRateTimeScale = TimeSpan.FromSeconds(1) - }; - - _invokeCounter ??= new EventCounter(CapDiagnosticListenerNames.InvokeSubscriberElapsedMs, this) - { - DisplayName = "Invoke Subscriber Elapsed Time", - DisplayUnits = "ms" - }; - } - } + DisplayName = "Publish Rate", + DisplayRateTimeScale = TimeSpan.FromSeconds(1) + }; - public void WritePublishMetrics() - { - _publishPerSecondCounter?.Increment(); - } + _consumePerSecondCounter ??= new IncrementingEventCounter(CapDiagnosticListenerNames.ConsumePerSec, this) + { + DisplayName = "Consume Rate", + DisplayRateTimeScale = TimeSpan.FromSeconds(1) + }; - public void WriteConsumeMetrics() - { - _consumePerSecondCounter?.Increment(); - } + _subscriberInvokePerSecondCounter ??= new IncrementingEventCounter(CapDiagnosticListenerNames.InvokeSubscriberPerSec, this) + { + DisplayName = "Invoke Subscriber Rate", + DisplayRateTimeScale = TimeSpan.FromSeconds(1) + }; - public void WriteInvokeMetrics() - { - _subscriberInvokePerSecondCounter?.Increment(); + _invokeCounter ??= new EventCounter(CapDiagnosticListenerNames.InvokeSubscriberElapsedMs, this) + { + DisplayName = "Invoke Subscriber Elapsed Time", + DisplayUnits = "ms" + }; } + } - public void WriteInvokeTimeMetrics(double elapsedMs) - { - _invokeCounter?.WriteMetric(elapsedMs); - } + public void WritePublishMetrics() + { + _publishPerSecondCounter?.Increment(); + } - protected override void Dispose(bool disposing) - { - _publishPerSecondCounter?.Dispose(); - _consumePerSecondCounter?.Dispose(); - _subscriberInvokePerSecondCounter?.Dispose(); - _invokeCounter?.Dispose(); + public void WriteConsumeMetrics() + { + _consumePerSecondCounter?.Increment(); + } + + public void WriteInvokeMetrics() + { + _subscriberInvokePerSecondCounter?.Increment(); + } - _publishPerSecondCounter = null; - _consumePerSecondCounter = null; - _subscriberInvokePerSecondCounter = null; - _invokeCounter = null; + public void WriteInvokeTimeMetrics(double elapsedMs) + { + _invokeCounter?.WriteMetric(elapsedMs); + } - base.Dispose(disposing); - } + protected override void Dispose(bool disposing) + { + _publishPerSecondCounter?.Dispose(); + _consumePerSecondCounter?.Dispose(); + _subscriberInvokePerSecondCounter?.Dispose(); + _invokeCounter?.Dispose(); + + _publishPerSecondCounter = null; + _consumePerSecondCounter = null; + _subscriberInvokePerSecondCounter = null; + _invokeCounter = null; + + base.Dispose(disposing); } } diff --git a/src/DotNetCore.CAP/ICapTransaction.Base.cs b/src/DotNetCore.CAP/ICapTransaction.Base.cs index ad778074b..2bf90a2fd 100644 --- a/src/DotNetCore.CAP/ICapTransaction.Base.cs +++ b/src/DotNetCore.CAP/ICapTransaction.Base.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Concurrent; +using System.Globalization; using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Messages; @@ -55,7 +56,7 @@ protected virtual void Flush() if (isDelayMessage) { - _dispatcher.EnqueueToScheduler(message, DateTime.Parse(message.Origin.Headers[Headers.SentTime]!)).ConfigureAwait(false); + _dispatcher.EnqueueToScheduler(message, DateTime.Parse(message.Origin.Headers[Headers.SentTime]!, CultureInfo.InvariantCulture)).ConfigureAwait(false); } else diff --git a/src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs b/src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs index 238239ff2..4a0fdffd0 100644 --- a/src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs +++ b/src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Globalization; using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Diagnostics; @@ -142,11 +143,11 @@ private async Task PublishInternalAsync(string name, T? value, IDictionary