From 3ddea59b5defc06ac1e61379dada97270e3d952c Mon Sep 17 00:00:00 2001 From: savorboard Date: Thu, 14 Dec 2023 20:41:16 +0800 Subject: [PATCH] Update docs & code cleanup. --- .../user-guide/en/cap/configuration.md | 8 +- .../user-guide/zh/cap/configuration.md | 10 +- .../IDataStorage.InMemory.cs | 8 +- .../IDataStorage.MongoDB.cs | 8 +- .../IDataStorage.MySql.cs | 12 +- .../IDataStorage.PostgreSql.cs | 12 +- .../IDataStorage.SqlServer.cs | 12 +- src/DotNetCore.CAP/CAP.Options.cs | 6 +- .../Persistence/IDataStorage.cs | 4 +- .../Processor/IProcessor.NeedRetry.cs | 252 +++++++++--------- 10 files changed, 170 insertions(+), 162 deletions(-) diff --git a/docs/content/user-guide/en/cap/configuration.md b/docs/content/user-guide/en/cap/configuration.md index d47858fa5..adf26a37b 100644 --- a/docs/content/user-guide/en/cap/configuration.md +++ b/docs/content/user-guide/en/cap/configuration.md @@ -60,7 +60,7 @@ Add unified prefixes for topic/queue name. https://github.com/dotnetcore/CAP/pu > Default: v1 -This is a new configuration option introduced in the CAP v2.4 version. It is used to specify a version of a message to isolate messages of different versions of the service. It is often used in A/B testing or multi-service version scenarios. Following are application scenarios that needs versioning: +It is used to specify a version of a message to isolate messages of different versions of the service. It is often used in A/B testing or multi-service version scenarios. Following are application scenarios that needs versioning: !!! info "Business Iterative and compatible" Due to the rapid iteration of services, the data structure of the message is not fixed during each service integration process. Sometimes we add or modify certain data structures to accommodate the newly introduced requirements. If you have a brand new system, there's no problem, but if your system is already deployed to a production environment and serves customers, this will cause new features to be incompatible with the old data structure when they go online, and then these changes can cause serious problems. To work around this issue, you can only clean up message queues and persistent messages before starting the application, which is obviously not acceptable for production environments. @@ -112,13 +112,11 @@ Number of consumer threads, when this value is greater than 1, the order of mess Maximum number of retries. When this value is reached, retry will stop and the maximum number of retries will be modified by setting this parameter. - #### FallbackWindowLookbackSeconds -> Default: 240 - -Time in seconds to wait before continue retrying. +> Default: 240 sec +Configure the retry processor to pick up the fallback window lookback time for `Scheduled` or `Failed` status messages. #### FailedThresholdCallback diff --git a/docs/content/user-guide/zh/cap/configuration.md b/docs/content/user-guide/zh/cap/configuration.md index c0f424285..b7b50609c 100644 --- a/docs/content/user-guide/zh/cap/configuration.md +++ b/docs/content/user-guide/zh/cap/configuration.md @@ -60,7 +60,7 @@ services.AddCap(config => 默认值:v1 -这是在CAP v2.4 版本中引入的新配置项,用于给消息指定版本来隔离不同版本服务的消息,常用于A/B测试或者多服务版本的场景。以下是其应用场景: +用于给消息指定版本来隔离不同版本服务的消息,常用于A/B测试或者多服务版本的场景。以下是其应用场景: !!! info "业务快速迭代,需要向前兼容" 由于业务的快速迭代,在各个服务集成的过程中,消息的数据结构并不是固定不变的,有些时候我们为了适应新引入的需求,会添加或者修改一些数据结构。如果你是一套全新的系统这没有什么问题,但是如果你的系统已经部署到生产环境了并且正在服务客户,这就会导致新的功能在上线的时候和旧的数据结构发生不兼容,那么这些改变可能会导致出现严重的问题,要想解决这个问题,只能把消息队列和持久化的消息全部清空,然后才能启动应用程序,这对于生产环境来说显然是致命的。 @@ -113,6 +113,12 @@ services.AddCap(config => 重试的最大次数。当达到此设置值时,将不会再继续重试,通过改变此参数来设置重试的最大次数。 +#### FallbackWindowLookbackSeconds + +> 默认值:240 秒 + +配置重试处理器拾取 `Scheduled` 或 `Failed` 状态消息的回退时间窗。 + #### FailedThresholdCallback > 默认值:NULL @@ -150,4 +156,4 @@ services.AddCap(config => 如果设置为 true, 消费端会将消息预取到内存队列,然后再放入.NET 线程池并行执行。 !!! note "注意事项" - 设置为 true 可能会产生一些问题,当订阅方法执行过慢耗时太久时,会导致重试线程拾取到还未执行的的消息。重试线程默认拾取4分钟前的消息,也就是说如果消费端积压了超过4分钟的消息就会被重新拾取到再次执行 \ No newline at end of file + 设置为 true 可能会产生一些问题,当订阅方法执行过慢耗时太久时,会导致重试线程拾取到还未执行的的消息。重试线程默认拾取4分钟前(FallbackWindowLookbackSeconds 配置项)的消息,也就是说如果消费端积压了超过4分钟(FallbackWindowLookbackSeconds 配置项)的消息就会被重新拾取到再次执行 \ No newline at end of file diff --git a/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs index 9346e73d4..93e040d8a 100644 --- a/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs +++ b/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs @@ -180,11 +180,11 @@ public Task DeleteExpiresAsync(string table, DateTime timeout, int batchCou return Task.FromResult(removed); } - public Task> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime) + public Task> GetPublishedMessagesOfNeedRetry(TimeSpan lookbackSeconds) { IEnumerable result = PublishedMessages.Values .Where(x => x.Retries < _capOptions.Value.FailedRetryCount - && x.Added < DateTime.Now.Subtract(coolDownTime) + && x.Added < DateTime.Now.Subtract(lookbackSeconds) && (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed)) .Take(200) .Select(x => (MediumMessage)x).ToList(); @@ -197,11 +197,11 @@ public Task> GetPublishedMessagesOfNeedRetry(TimeSpan return Task.FromResult(result); } - public Task> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime) + public Task> GetReceivedMessagesOfNeedRetry(TimeSpan lookbackSeconds) { IEnumerable result = ReceivedMessages.Values .Where(x => x.Retries < _capOptions.Value.FailedRetryCount - && x.Added < DateTime.Now.Subtract(coolDownTime) + && x.Added < DateTime.Now.Subtract(lookbackSeconds) && (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed)) .Take(200) .Select(x => (MediumMessage)x).ToList(); diff --git a/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs index d9563fd3b..4180a167a 100644 --- a/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs +++ b/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs @@ -248,9 +248,9 @@ public async Task DeleteExpiresAsync(string collection, DateTime timeout, i } } - public async Task> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime) + public async Task> GetPublishedMessagesOfNeedRetry(TimeSpan lookbackSeconds) { - var fourMinAgo = DateTime.Now.Subtract(coolDownTime); + var fourMinAgo = DateTime.Now.Subtract(lookbackSeconds); var collection = _database.GetCollection(_options.Value.PublishedCollection); var queryResult = await collection .Find(x => x.Retries < _capOptions.Value.FailedRetryCount @@ -269,9 +269,9 @@ public async Task> GetPublishedMessagesOfNeedRetry(Ti }).ToList(); } - public async Task> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime) + public async Task> GetReceivedMessagesOfNeedRetry(TimeSpan lookbackSeconds) { - var fourMinAgo = DateTime.Now.Subtract(coolDownTime); + var fourMinAgo = DateTime.Now.Subtract(lookbackSeconds); var collection = _database.GetCollection(_options.Value.ReceivedCollection); var queryResult = await collection .Find(x => x.Retries < _capOptions.Value.FailedRetryCount diff --git a/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs b/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs index b1c3254f7..35a7f91ef 100644 --- a/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs +++ b/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs @@ -216,14 +216,14 @@ public async Task DeleteExpiresAsync(string table, DateTime timeout, int ba .ConfigureAwait(false); } - public async Task> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime) + public Task> GetPublishedMessagesOfNeedRetry(TimeSpan lookbackSeconds) { - return await GetMessagesOfNeedRetryAsync(_pubName, coolDownTime).ConfigureAwait(false); + return GetMessagesOfNeedRetryAsync(_pubName, lookbackSeconds); } - public async Task> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime) + public Task> GetReceivedMessagesOfNeedRetry(TimeSpan lookbackSeconds) { - return await GetMessagesOfNeedRetryAsync(_recName, coolDownTime).ConfigureAwait(false); + return GetMessagesOfNeedRetryAsync(_recName, lookbackSeconds); } public async Task ScheduleMessagesOfDelayedAsync(Func, Task> scheduleTask, @@ -313,9 +313,9 @@ private async Task StoreReceivedMessage(object[] sqlParams) await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false); } - private async Task> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan coolDownTime) + private async Task> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan lookbackSeconds) { - var fourMinAgo = DateTime.Now.Subtract(coolDownTime); + var fourMinAgo = DateTime.Now.Subtract(lookbackSeconds); var sql = $"SELECT `Id`,`Content`,`Retries`,`Added` FROM `{tableName}` WHERE `Retries`<@Retries " + $"AND `Version`=@Version AND `Added`<@Added AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;"; diff --git a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs index 264d74882..43d0b879c 100644 --- a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs @@ -214,14 +214,14 @@ public async Task DeleteExpiresAsync(string table, DateTime timeout, int ba .ConfigureAwait(false); } - public async Task> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime) + public async Task> GetPublishedMessagesOfNeedRetry(TimeSpan lookbackSeconds) { - return await GetMessagesOfNeedRetryAsync(_pubName, coolDownTime).ConfigureAwait(false); + return await GetMessagesOfNeedRetryAsync(_pubName, lookbackSeconds).ConfigureAwait(false); } - public async Task> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime) + public async Task> GetReceivedMessagesOfNeedRetry(TimeSpan lookbackSeconds) { - return await GetMessagesOfNeedRetryAsync(_recName, coolDownTime).ConfigureAwait(false); + return await GetMessagesOfNeedRetryAsync(_recName, lookbackSeconds).ConfigureAwait(false); } public async Task ScheduleMessagesOfDelayedAsync(Func, Task> scheduleTask, @@ -313,9 +313,9 @@ private async Task StoreReceivedMessage(object[] sqlParams) await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false); } - private async Task> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan coolDownTime) + private async Task> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan lookbackSeconds) { - var fourMinAgo = DateTime.Now.Subtract(coolDownTime); + var fourMinAgo = DateTime.Now.Subtract(lookbackSeconds); var sql = $"SELECT \"Id\",\"Content\",\"Retries\",\"Added\" FROM {tableName} WHERE \"Retries\"<@Retries " + $"AND \"Version\"=@Version AND \"Added\"<@Added AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; diff --git a/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs index fd81a6e14..3c888e390 100644 --- a/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs @@ -213,14 +213,14 @@ public async Task DeleteExpiresAsync(string table, DateTime timeout, int ba new SqlParameter("@timeout", timeout), new SqlParameter("@batchCount", batchCount)).ConfigureAwait(false); } - public async Task> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime) + public Task> GetPublishedMessagesOfNeedRetry(TimeSpan lookbackSeconds) { - return await GetMessagesOfNeedRetryAsync(_pubName, coolDownTime).ConfigureAwait(false); + return GetMessagesOfNeedRetryAsync(_pubName, lookbackSeconds); } - public async Task> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime) + public Task> GetReceivedMessagesOfNeedRetry(TimeSpan lookbackSeconds) { - return await GetMessagesOfNeedRetryAsync(_recName, coolDownTime).ConfigureAwait(false); + return GetMessagesOfNeedRetryAsync(_recName, lookbackSeconds); } public async Task ScheduleMessagesOfDelayedAsync(Func, Task> scheduleTask, @@ -307,9 +307,9 @@ private async Task StoreReceivedMessage(object[] sqlParams) await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false); } - private async Task> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan substract) + private async Task> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan lookbackSeconds) { - var fourMinAgo = DateTime.Now.Subtract(substract); + var fourMinAgo = DateTime.Now.Subtract(lookbackSeconds); var sql = $"SELECT TOP (200) Id, Content, Retries, Added FROM {tableName} WITH (readpast) WHERE Retries<@Retries " + $"AND Version=@Version AND Added<@Added AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index 7f637561d..2edc32edd 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -91,7 +91,7 @@ public CapOptions() public int ConsumerThreadCount { get; set; } /// - /// If true, the message will be pre fetch to memory queue for parallel execute by thread pool. + /// If true, the message will be prefetch to memory queue for parallel execute by .net thread pool. /// Default is false /// public bool EnableConsumerPrefetch { get; set; } @@ -103,6 +103,10 @@ public CapOptions() /// public bool UseDispatchingPerGroup { get; set; } + /// + /// Configure the retry processor to pick up the backtrack time window for Scheduled or Failed status messages. + /// Default is 240 seconds. + /// public int FallbackWindowLookbackSeconds { get; set; } /// diff --git a/src/DotNetCore.CAP/Persistence/IDataStorage.cs b/src/DotNetCore.CAP/Persistence/IDataStorage.cs index d6ee056b8..74d8609aa 100644 --- a/src/DotNetCore.CAP/Persistence/IDataStorage.cs +++ b/src/DotNetCore.CAP/Persistence/IDataStorage.cs @@ -33,11 +33,11 @@ public interface IDataStorage Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default); - Task> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime); + Task> GetPublishedMessagesOfNeedRetry(TimeSpan lookbackSeconds); Task ScheduleMessagesOfDelayedAsync(Func, Task> scheduleTask, CancellationToken token = default); - Task> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime); + Task> GetReceivedMessagesOfNeedRetry(TimeSpan lookbackSeconds); //dashboard api IMonitoringApi GetMonitoringApi(); diff --git a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs index b76729679..d2ebf5520 100644 --- a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs +++ b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs @@ -1,131 +1,131 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using DotNetCore.CAP.Internal; -using DotNetCore.CAP.Persistence; -using DotNetCore.CAP.Transport; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; - -namespace DotNetCore.CAP.Processor; - -public class MessageNeedToRetryProcessor : IProcessor -{ - const int minSuggestedValueForFallbackWindowLookbackSeconds = 30; - private readonly ILogger _logger; - private readonly IDispatcher _dispatcher; - private readonly TimeSpan _waitingInterval; - private readonly IOptions _options; - private readonly IDataStorage _dataStorage; - private readonly TimeSpan _ttl; - private readonly TimeSpan _coolDownTime; - private readonly string _instance; - private Task? _failedRetryConsumeTask; - - public MessageNeedToRetryProcessor(IOptions options, ILogger logger, - IDispatcher dispatcher, IDataStorage dataStorage) - { - _options = options; - _logger = logger; - _dispatcher = dispatcher; - _waitingInterval = TimeSpan.FromSeconds(options.Value.FailedRetryInterval); - _coolDownTime = TimeSpan.FromSeconds(options.Value.FallbackWindowLookbackSeconds); - _dataStorage = dataStorage; - _ttl = _waitingInterval.Add(TimeSpan.FromSeconds(10)); - - _instance = string.Concat(Helper.GetInstanceHostname(), "_", Util.GenerateWorkerId(1023)); - - CheckSafeOptionsSet(); - } - - public virtual async Task ProcessAsync(ProcessingContext context) - { - if (context == null) throw new ArgumentNullException(nameof(context)); - - var storage = context.Provider.GetRequiredService(); - - _ = Task.Run(() => ProcessPublishedAsync(storage, context)); - - if (_options.Value.UseStorageLock && _failedRetryConsumeTask is { IsCompleted: false }) - { - await _dataStorage.RenewLockAsync($"received_retry_{_options.Value.Version}", _ttl, _instance, context.CancellationToken); - - await context.WaitAsync(_waitingInterval).ConfigureAwait(false); - - return; - } - - _failedRetryConsumeTask = Task.Run(() => ProcessReceivedAsync(storage, context)); - - _ = _failedRetryConsumeTask.ContinueWith(_ => { _failedRetryConsumeTask = null; }); - - await context.WaitAsync(_waitingInterval).ConfigureAwait(false); - } - - private async Task ProcessPublishedAsync(IDataStorage connection, ProcessingContext context) - { - context.ThrowIfStopping(); - - if (_options.Value.UseStorageLock && !await connection.AcquireLockAsync($"publish_retry_{_options.Value.Version}", _ttl, _instance, context.CancellationToken)) - return; - - var messages = await GetSafelyAsync(connection.GetPublishedMessagesOfNeedRetry, _coolDownTime).ConfigureAwait(false); - - foreach (var message in messages) - { - context.ThrowIfStopping(); - - await _dispatcher.EnqueueToPublish(message).ConfigureAwait(false); - } - - if (_options.Value.UseStorageLock) - await connection.ReleaseLockAsync($"publish_retry_{_options.Value.Version}", _instance, context.CancellationToken); - } - - private async Task ProcessReceivedAsync(IDataStorage connection, ProcessingContext context) - { - context.ThrowIfStopping(); - - if (_options.Value.UseStorageLock && !await connection.AcquireLockAsync($"received_retry_{_options.Value.Version}", _ttl, _instance, context.CancellationToken)) - return; - - var messages = await GetSafelyAsync(connection.GetReceivedMessagesOfNeedRetry, _coolDownTime).ConfigureAwait(false); - - foreach (var message in messages) - { - context.ThrowIfStopping(); - - await _dispatcher.EnqueueToExecute(message).ConfigureAwait(false); - } - - if (_options.Value.UseStorageLock) - await connection.ReleaseLockAsync($"received_retry_{_options.Value.Version}", _instance, context.CancellationToken); - } - - private async Task> GetSafelyAsync(Func>> getMessagesAsync, TimeSpan coolDownTime) - { - try - { - return await getMessagesAsync(coolDownTime).ConfigureAwait(false); - } - catch (Exception ex) - { - _logger.LogWarning(1, ex, "Get messages from storage failed. Retrying..."); - - return Enumerable.Empty(); - } - } - +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Persistence; +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.Processor; + +public class MessageNeedToRetryProcessor : IProcessor +{ + private const int MinSuggestedValueForFallbackWindowLookbackSeconds = 30; + private readonly ILogger _logger; + private readonly IDispatcher _dispatcher; + private readonly TimeSpan _waitingInterval; + private readonly IOptions _options; + private readonly IDataStorage _dataStorage; + private readonly TimeSpan _ttl; + private readonly TimeSpan _lookbackSeconds; + private readonly string _instance; + private Task? _failedRetryConsumeTask; + + public MessageNeedToRetryProcessor(IOptions options, ILogger logger, + IDispatcher dispatcher, IDataStorage dataStorage) + { + _options = options; + _logger = logger; + _dispatcher = dispatcher; + _waitingInterval = TimeSpan.FromSeconds(options.Value.FailedRetryInterval); + _lookbackSeconds = TimeSpan.FromSeconds(options.Value.FallbackWindowLookbackSeconds); + _dataStorage = dataStorage; + _ttl = _waitingInterval.Add(TimeSpan.FromSeconds(10)); + + _instance = string.Concat(Helper.GetInstanceHostname(), "_", Util.GenerateWorkerId(1023)); + + CheckSafeOptionsSet(); + } + + public virtual async Task ProcessAsync(ProcessingContext context) + { + if (context == null) throw new ArgumentNullException(nameof(context)); + + var storage = context.Provider.GetRequiredService(); + + _ = Task.Run(() => ProcessPublishedAsync(storage, context)); + + if (_options.Value.UseStorageLock && _failedRetryConsumeTask is { IsCompleted: false }) + { + await _dataStorage.RenewLockAsync($"received_retry_{_options.Value.Version}", _ttl, _instance, context.CancellationToken); + + await context.WaitAsync(_waitingInterval).ConfigureAwait(false); + + return; + } + + _failedRetryConsumeTask = Task.Run(() => ProcessReceivedAsync(storage, context)); + + _ = _failedRetryConsumeTask.ContinueWith(_ => { _failedRetryConsumeTask = null; }); + + await context.WaitAsync(_waitingInterval).ConfigureAwait(false); + } + + private async Task ProcessPublishedAsync(IDataStorage connection, ProcessingContext context) + { + context.ThrowIfStopping(); + + if (_options.Value.UseStorageLock && !await connection.AcquireLockAsync($"publish_retry_{_options.Value.Version}", _ttl, _instance, context.CancellationToken)) + return; + + var messages = await GetSafelyAsync(connection.GetPublishedMessagesOfNeedRetry, _lookbackSeconds).ConfigureAwait(false); + + foreach (var message in messages) + { + context.ThrowIfStopping(); + + await _dispatcher.EnqueueToPublish(message).ConfigureAwait(false); + } + + if (_options.Value.UseStorageLock) + await connection.ReleaseLockAsync($"publish_retry_{_options.Value.Version}", _instance, context.CancellationToken); + } + + private async Task ProcessReceivedAsync(IDataStorage connection, ProcessingContext context) + { + context.ThrowIfStopping(); + + if (_options.Value.UseStorageLock && !await connection.AcquireLockAsync($"received_retry_{_options.Value.Version}", _ttl, _instance, context.CancellationToken)) + return; + + var messages = await GetSafelyAsync(connection.GetReceivedMessagesOfNeedRetry, _lookbackSeconds).ConfigureAwait(false); + + foreach (var message in messages) + { + context.ThrowIfStopping(); + + await _dispatcher.EnqueueToExecute(message).ConfigureAwait(false); + } + + if (_options.Value.UseStorageLock) + await connection.ReleaseLockAsync($"received_retry_{_options.Value.Version}", _instance, context.CancellationToken); + } + + private async Task> GetSafelyAsync(Func>> getMessagesAsync, TimeSpan lookbackSeconds) + { + try + { + return await getMessagesAsync(lookbackSeconds).ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogWarning(1, ex, "Get messages from storage failed. Retrying..."); + + return Enumerable.Empty(); + } + } + private void CheckSafeOptionsSet() { - if (_coolDownTime < TimeSpan.FromSeconds(minSuggestedValueForFallbackWindowLookbackSeconds)) + if (_lookbackSeconds < TimeSpan.FromSeconds(MinSuggestedValueForFallbackWindowLookbackSeconds)) { - _logger.LogWarning("The provided FallbackWindowLookbackSeconds of {currentSetFallbackWindowLookbackSeconds} is set to a value lower than {minSuggestedSeconds} seconds. This might cause unwanted unsafe behavior if the consumer takes more than the provided FallbackWindowLookbackSeconds to execute. ", _options.Value.FallbackWindowLookbackSeconds, minSuggestedValueForFallbackWindowLookbackSeconds); + _logger.LogWarning("The provided FallbackWindowLookbackSeconds of {currentSetFallbackWindowLookbackSeconds} is set to a value lower than {minSuggestedSeconds} seconds. This might cause unwanted unsafe behavior if the consumer takes more than the provided FallbackWindowLookbackSeconds to execute. ", _options.Value.FallbackWindowLookbackSeconds, MinSuggestedValueForFallbackWindowLookbackSeconds); } - } + } } \ No newline at end of file