Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/dotnetcore/CAP
Browse files Browse the repository at this point in the history
  • Loading branch information
yang-xiaodong committed Sep 11, 2023
2 parents c362bfa + 92548ba commit 778d8f1
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 12 deletions.
2 changes: 1 addition & 1 deletion build/version.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<VersionMajor>7</VersionMajor>
<VersionMinor>2</VersionMinor>
<VersionPatch>1</VersionPatch>
<VersionPatch>2</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup>
Expand Down
4 changes: 1 addition & 3 deletions docs/content/user-guide/en/cap/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,12 @@ The expiration time (in seconds) of the failed message. When the message is sent
If `true` then all consumers within the same group pushes received messages to own dispatching pipeline channel. Each channel has set thread count to `ConsumerThreadCount` value.

!!! WARNING "If option set true, the EnableConsumerPrefetch option is disabled"

#### EnableConsumerPrefetch

> Default: false, Before version 7.0 the default behavior is true
By default, CAP will only read one message from the message queue, then execute the subscription method. After the execution is done, it will read the next message for execution.
If set to true, the consumer will prefetch some messages to the memory queue, and then distribute them to the scheduler for execution.
If set to true, the consumer will prefetch some messages to the memory queue, and then distribute them to the .NET thread pool for execution.

!!! note "Precautions"
Setting it to true may cause some problems. When the subscription method executes too slowly and takes too long, it will cause the retry thread to pick up messages that have not yet been executed. The retry thread picks up messages from 4 minutes ago by default, that is to say, if the message backlog of more than 4 minutes on the consumer side will be picked up again and executed again
4 changes: 3 additions & 1 deletion docs/content/user-guide/en/cap/messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ Retrying plays an important role in the overall CAP architecture design, CAP ret

During the message sending process, when the broker crashes or the connection fails or an abnormality occurs, CAP will retry the sending. Retry 3 times for the first time, retry every minute after 4 minutes, and +1 retry. When the total number of retries reaches 50, CAP will stop retrying.

You can adjust the total number of retries by setting [FailedRetryCount](../configuration#failedretrycount) in CapOptions.
You can adjust the total number of retries by setting [FailedRetryCount](../configuration#failedretrycount) in CapOptions Or use [FailedThresholdCallback](../configuration#failedthresholdcallback) to receive notifications when the maximum retry count is reached.

It will stop when the maximum number of times is reached. You can see the reason for the failure in Dashboard and choose whether to manually retry.

Expand All @@ -124,6 +124,8 @@ The consumer method is executed when the Consumer receives the message and will

We introduced database-based distributed locks in version 7.1.0 to deal with the problem of concurrent data acquisition of database retries under multiple instances, you need to explicitly configure `UseStorageLock` option to true.

Whether sending fails or consumption fails, we will store the exception message in the cap-exception field within the message header. You can find it in the Content field's JSON in the database table.

## Data Cleanup

There is an `ExpiresAt` field in the database message table indicating the expiration time of the message. When the message is sent successfully, status will be changed to `Successed`, and `ExpiresAt` will be set to **1 day** later.
Expand Down
4 changes: 2 additions & 2 deletions docs/content/user-guide/zh/cap/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,14 @@ services.AddCap(config =>
默认情况下,CAP会将所有消费者组的消息都先放置到内存同一个Channel中,然后线性处理。
如果设置为 true,则每个消费者组都会根据 `ConsumerThreadCount` 设置的值创建单独的线程进行处理。

!!! WARNING "如果此设置项目为 true,则 EnableConsumerPrefetch 设置项无效"
在同时配合使用 `EnableConsumerPrefetch` 时,请参考 issue [#1399](https://github.com/dotnetcore/CAP/issues/1399) 以清晰其预期行为。

#### EnableConsumerPrefetch

> 默认值: false, 在 7.0 版本之前默认行为 true
默认情况下,CAP只会从消息队列读取一条,然后执行订阅方法,执行完成后才会读取下一条来执行.
如果设置为 true, 消费端会预取一部分消息到内存队列,然后再分发给调度器执行
如果设置为 true, 消费端会将消息预取到内存队列,然后再放入.NET 线程池并行执行

!!! note "注意事项"
设置为 true 可能会产生一些问题,当订阅方法执行过慢耗时太久时,会导致重试线程拾取到还未执行的的消息。重试线程默认拾取4分钟前的消息,也就是说如果消费端积压了超过4分钟的消息就会被重新拾取到再次执行
4 changes: 3 additions & 1 deletion docs/content/user-guide/zh/cap/messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,16 @@ CAP 接收到消息之后会将消息进行 Persistent(持久化), 有关

在消息发送过程中,当出现 Broker 宕机或者连接失败的情况亦或者出现异常的情况下,这个时候 CAP 会对发送的重试,第一次重试次数为 3,4分钟后以后每分钟重试一次,进行次数 +1,当总次数达到50次后,CAP将不对其进行重试。

你可以在 CapOptions 中设置 [FailedRetryCount](../configuration#failedretrycount) 来调整默认重试的总次数。
你可以在 CapOptions 中设置 [FailedRetryCount](../configuration#failedretrycount) 来调整默认重试的总次数,或使用 [FailedThresholdCallback](../configuration#FailedThresholdCallback) 在达到最大重试次数时收到通知

当失败总次数达到默认失败总次数后,就不会进行重试了,你可以在 Dashboard 中查看消息失败的原因,然后进行人工重试处理。

2、 消费重试

当 Consumer 接收到消息时,会执行消费者方法,在执行消费者方法出现异常时,会进行重试。这个重试策略和上面的 发送重试 是相同的。

无论发送失败或者消费失败,我们会将异常消息同时存储到消息 header 中的 cap-exception 字段中,你可以在数据库表的 Content 字段的json中找到。

## 消息数据清理

数据库消息表中具有一个 ExpiresAt 字段表示消息的过期时间,当消息发送成功或者消费成功后,CAP会将消息状态为 Successed 的 ExpiresAt 设置为 1天 后过期,会将消息状态为 Failed 的 ExpiresAt 设置为 15天 后过期(可通过 [FailedMessageExpiredAfter](../configuration#failedmessageexpiredafter) 配置)。
Expand Down
3 changes: 0 additions & 3 deletions src/DotNetCore.CAP/CAP.Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,15 @@ public CapOptions()

/// <summary>
/// If true, the message will be pre fetch to memory queue for parallel execute by thread pool.
/// <para>Not available when <see cref="UseDispatchingPerGroup"/> true.</para>
/// Default is false
/// </summary>
public bool EnableConsumerPrefetch { get; set; }

/// <summary>
/// If true then each message group will have own independent dispatching pipeline. Each pipeline use as many threads as
/// <see cref="ConsumerThreadCount" /> value is.
/// <para>If true, the <see cref="EnableConsumerPrefetch"/> is not available.</para>
/// Default is false.
/// </summary>
[Obsolete("Use EnableConsumerPrefetch instead. Setting it to true means that each consumer is now executed concurrently by thread pool, regardless of whether they are in different groups.")]
public bool UseDispatchingPerGroup { get; set; }

/// <summary>
Expand Down
2 changes: 2 additions & 0 deletions src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public void Execute()
ICollection<string> topics;
try
{
// ReSharper disable once ConvertToUsingDeclaration
using (var client = _consumerClientFactory.Create(matchGroup.Key))
{
client.OnLogCallback = WriteLog;
Expand All @@ -139,6 +140,7 @@ public void Execute()
{
try
{
// ReSharper disable once ConvertToUsingDeclaration
using (var client = _consumerClientFactory.Create(matchGroup.Key))
{
_serverAddress = client.BrokerAddress;
Expand Down
12 changes: 11 additions & 1 deletion src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ internal class DispatcherPerGroup : IDispatcher
private readonly IMessageSender _sender;
private readonly IDataStorage _storage;
private readonly PriorityQueue<MediumMessage, DateTime> _schedulerQueue;
private readonly bool _enablePrefetch;

private Channel<MediumMessage> _publishedChannel = default!;
private ConcurrentDictionary<string, Channel<(MediumMessage, ConsumerExecutorDescriptor?)>> _receivedChannels = default!;
Expand All @@ -45,6 +46,7 @@ public DispatcherPerGroup(ILogger<Dispatcher> logger,
_executor = executor;
_schedulerQueue = new PriorityQueue<MediumMessage, DateTime>();
_storage = storage;
_enablePrefetch = options.Value.EnableConsumerPrefetch;
}

public async Task Start(CancellationToken stoppingToken)
Expand Down Expand Up @@ -242,7 +244,15 @@ private async ValueTask Processing(string group, Channel<(MediumMessage, Consume

var item1 = message.Item1;
var item2 = message.Item2;
_ = Task.Run(() => _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false));

if (_enablePrefetch)
{
_ = Task.Run(() => _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false));
}
else
{
await _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
Expand Down

0 comments on commit 778d8f1

Please sign in to comment.