Skip to content

Commit

Permalink
Update docs & code cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
yang-xiaodong committed Dec 14, 2023
1 parent f0c18eb commit 3ddea59
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 162 deletions.
8 changes: 3 additions & 5 deletions docs/content/user-guide/en/cap/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Add unified prefixes for topic/queue name. https://github.com/dotnetcore/CAP/pu

> Default: v1
This is a new configuration option introduced in the CAP v2.4 version. It is used to specify a version of a message to isolate messages of different versions of the service. It is often used in A/B testing or multi-service version scenarios. Following are application scenarios that needs versioning:
It is used to specify a version of a message to isolate messages of different versions of the service. It is often used in A/B testing or multi-service version scenarios. Following are application scenarios that needs versioning:

!!! info "Business Iterative and compatible"
Due to the rapid iteration of services, the data structure of the message is not fixed during each service integration process. Sometimes we add or modify certain data structures to accommodate the newly introduced requirements. If you have a brand new system, there's no problem, but if your system is already deployed to a production environment and serves customers, this will cause new features to be incompatible with the old data structure when they go online, and then these changes can cause serious problems. To work around this issue, you can only clean up message queues and persistent messages before starting the application, which is obviously not acceptable for production environments.
Expand Down Expand Up @@ -112,13 +112,11 @@ Number of consumer threads, when this value is greater than 1, the order of mess
Maximum number of retries. When this value is reached, retry will stop and the maximum number of retries will be modified by setting this parameter.


#### FallbackWindowLookbackSeconds

> Default: 240
Time in seconds to wait before continue retrying.
> Default: 240 sec
Configure the retry processor to pick up the fallback window lookback time for `Scheduled` or `Failed` status messages.

#### FailedThresholdCallback

Expand Down
10 changes: 8 additions & 2 deletions docs/content/user-guide/zh/cap/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ services.AddCap(config =>

默认值:v1

这是在CAP v2.4 版本中引入的新配置项,用于给消息指定版本来隔离不同版本服务的消息,常用于A/B测试或者多服务版本的场景。以下是其应用场景:
用于给消息指定版本来隔离不同版本服务的消息,常用于A/B测试或者多服务版本的场景。以下是其应用场景:

!!! info "业务快速迭代,需要向前兼容"
由于业务的快速迭代,在各个服务集成的过程中,消息的数据结构并不是固定不变的,有些时候我们为了适应新引入的需求,会添加或者修改一些数据结构。如果你是一套全新的系统这没有什么问题,但是如果你的系统已经部署到生产环境了并且正在服务客户,这就会导致新的功能在上线的时候和旧的数据结构发生不兼容,那么这些改变可能会导致出现严重的问题,要想解决这个问题,只能把消息队列和持久化的消息全部清空,然后才能启动应用程序,这对于生产环境来说显然是致命的。
Expand Down Expand Up @@ -113,6 +113,12 @@ services.AddCap(config =>
重试的最大次数。当达到此设置值时,将不会再继续重试,通过改变此参数来设置重试的最大次数。

#### FallbackWindowLookbackSeconds

> 默认值:240 秒
配置重试处理器拾取 `Scheduled``Failed` 状态消息的回退时间窗。

#### FailedThresholdCallback

> 默认值:NULL
Expand Down Expand Up @@ -150,4 +156,4 @@ services.AddCap(config =>
如果设置为 true, 消费端会将消息预取到内存队列,然后再放入.NET 线程池并行执行。

!!! note "注意事项"
设置为 true 可能会产生一些问题,当订阅方法执行过慢耗时太久时,会导致重试线程拾取到还未执行的的消息。重试线程默认拾取4分钟前的消息,也就是说如果消费端积压了超过4分钟的消息就会被重新拾取到再次执行
设置为 true 可能会产生一些问题,当订阅方法执行过慢耗时太久时,会导致重试线程拾取到还未执行的的消息。重试线程默认拾取4分钟前(FallbackWindowLookbackSeconds 配置项)的消息,也就是说如果消费端积压了超过4分钟(FallbackWindowLookbackSeconds 配置项)的消息就会被重新拾取到再次执行
8 changes: 4 additions & 4 deletions src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ public Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCou
return Task.FromResult(removed);
}

public Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime)
public Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan lookbackSeconds)
{
IEnumerable<MediumMessage> result = PublishedMessages.Values
.Where(x => x.Retries < _capOptions.Value.FailedRetryCount
&& x.Added < DateTime.Now.Subtract(coolDownTime)
&& x.Added < DateTime.Now.Subtract(lookbackSeconds)
&& (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed))
.Take(200)
.Select(x => (MediumMessage)x).ToList();
Expand All @@ -197,11 +197,11 @@ public Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan
return Task.FromResult(result);
}

public Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime)
public Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan lookbackSeconds)
{
IEnumerable<MediumMessage> result = ReceivedMessages.Values
.Where(x => x.Retries < _capOptions.Value.FailedRetryCount
&& x.Added < DateTime.Now.Subtract(coolDownTime)
&& x.Added < DateTime.Now.Subtract(lookbackSeconds)
&& (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed))
.Take(200)
.Select(x => (MediumMessage)x).ToList();
Expand Down
8 changes: 4 additions & 4 deletions src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,9 @@ public async Task<int> DeleteExpiresAsync(string collection, DateTime timeout, i
}
}

public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime)
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan lookbackSeconds)
{
var fourMinAgo = DateTime.Now.Subtract(coolDownTime);
var fourMinAgo = DateTime.Now.Subtract(lookbackSeconds);
var collection = _database.GetCollection<PublishedMessage>(_options.Value.PublishedCollection);
var queryResult = await collection
.Find(x => x.Retries < _capOptions.Value.FailedRetryCount
Expand All @@ -269,9 +269,9 @@ public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(Ti
}).ToList();
}

public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime)
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan lookbackSeconds)
{
var fourMinAgo = DateTime.Now.Subtract(coolDownTime);
var fourMinAgo = DateTime.Now.Subtract(lookbackSeconds);
var collection = _database.GetCollection<ReceivedMessage>(_options.Value.ReceivedCollection);
var queryResult = await collection
.Find(x => x.Retries < _capOptions.Value.FailedRetryCount
Expand Down
12 changes: 6 additions & 6 deletions src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,14 @@ public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int ba
.ConfigureAwait(false);
}

public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime)
public Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan lookbackSeconds)
{
return await GetMessagesOfNeedRetryAsync(_pubName, coolDownTime).ConfigureAwait(false);
return GetMessagesOfNeedRetryAsync(_pubName, lookbackSeconds);
}

public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime)
public Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan lookbackSeconds)
{
return await GetMessagesOfNeedRetryAsync(_recName, coolDownTime).ConfigureAwait(false);
return GetMessagesOfNeedRetryAsync(_recName, lookbackSeconds);
}

public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<MediumMessage>, Task> scheduleTask,
Expand Down Expand Up @@ -313,9 +313,9 @@ private async Task StoreReceivedMessage(object[] sqlParams)
await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false);
}

private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan coolDownTime)
private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan lookbackSeconds)
{
var fourMinAgo = DateTime.Now.Subtract(coolDownTime);
var fourMinAgo = DateTime.Now.Subtract(lookbackSeconds);
var sql =
$"SELECT `Id`,`Content`,`Retries`,`Added` FROM `{tableName}` WHERE `Retries`<@Retries " +
$"AND `Version`=@Version AND `Added`<@Added AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
Expand Down
12 changes: 6 additions & 6 deletions src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,14 @@ public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int ba
.ConfigureAwait(false);
}

public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime)
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan lookbackSeconds)
{
return await GetMessagesOfNeedRetryAsync(_pubName, coolDownTime).ConfigureAwait(false);
return await GetMessagesOfNeedRetryAsync(_pubName, lookbackSeconds).ConfigureAwait(false);
}

public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime)
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan lookbackSeconds)
{
return await GetMessagesOfNeedRetryAsync(_recName, coolDownTime).ConfigureAwait(false);
return await GetMessagesOfNeedRetryAsync(_recName, lookbackSeconds).ConfigureAwait(false);
}

public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<MediumMessage>, Task> scheduleTask,
Expand Down Expand Up @@ -313,9 +313,9 @@ private async Task StoreReceivedMessage(object[] sqlParams)
await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false);
}

private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan coolDownTime)
private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan lookbackSeconds)
{
var fourMinAgo = DateTime.Now.Subtract(coolDownTime);
var fourMinAgo = DateTime.Now.Subtract(lookbackSeconds);
var sql =
$"SELECT \"Id\",\"Content\",\"Retries\",\"Added\" FROM {tableName} WHERE \"Retries\"<@Retries " +
$"AND \"Version\"=@Version AND \"Added\"<@Added AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
Expand Down
12 changes: 6 additions & 6 deletions src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,14 @@ public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int ba
new SqlParameter("@timeout", timeout), new SqlParameter("@batchCount", batchCount)).ConfigureAwait(false);
}

public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime)
public Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan lookbackSeconds)
{
return await GetMessagesOfNeedRetryAsync(_pubName, coolDownTime).ConfigureAwait(false);
return GetMessagesOfNeedRetryAsync(_pubName, lookbackSeconds);
}

public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime)
public Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan lookbackSeconds)
{
return await GetMessagesOfNeedRetryAsync(_recName, coolDownTime).ConfigureAwait(false);
return GetMessagesOfNeedRetryAsync(_recName, lookbackSeconds);
}

public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<MediumMessage>, Task> scheduleTask,
Expand Down Expand Up @@ -307,9 +307,9 @@ private async Task StoreReceivedMessage(object[] sqlParams)
await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false);
}

private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan substract)
private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan lookbackSeconds)
{
var fourMinAgo = DateTime.Now.Subtract(substract);
var fourMinAgo = DateTime.Now.Subtract(lookbackSeconds);
var sql =
$"SELECT TOP (200) Id, Content, Retries, Added FROM {tableName} WITH (readpast) WHERE Retries<@Retries " +
$"AND Version=@Version AND Added<@Added AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
Expand Down
6 changes: 5 additions & 1 deletion src/DotNetCore.CAP/CAP.Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public CapOptions()
public int ConsumerThreadCount { get; set; }

/// <summary>
/// If true, the message will be pre fetch to memory queue for parallel execute by thread pool.
/// If true, the message will be prefetch to memory queue for parallel execute by .net thread pool.
/// Default is false
/// </summary>
public bool EnableConsumerPrefetch { get; set; }
Expand All @@ -103,6 +103,10 @@ public CapOptions()
/// </summary>
public bool UseDispatchingPerGroup { get; set; }

/// <summary>
/// Configure the retry processor to pick up the backtrack time window for Scheduled or Failed status messages.
/// Default is 240 seconds.
/// </summary>
public int FallbackWindowLookbackSeconds { get; set; }

/// <summary>
Expand Down
4 changes: 2 additions & 2 deletions src/DotNetCore.CAP/Persistence/IDataStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ public interface IDataStorage

Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default);

Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime);
Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan lookbackSeconds);

Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<MediumMessage>, Task> scheduleTask, CancellationToken token = default);

Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime);
Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan lookbackSeconds);

//dashboard api
IMonitoringApi GetMonitoringApi();
Expand Down
Loading

0 comments on commit 3ddea59

Please sign in to comment.