Skip to content

Commit

Permalink
Before thinking of a good solution to fixes issue #1429.
Browse files Browse the repository at this point in the history
  • Loading branch information
yang-xiaodong committed Nov 19, 2023
1 parent 2e4542f commit aa4c540
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,11 @@ public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<Medium
$"SELECT \"Id\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\" FROM {_pubName} WHERE \"Version\"=@Version " +
$"AND ((\"ExpiresAt\"< @TwoMinutesLater AND \"StatusName\" = '{StatusName.Delayed}') OR (\"ExpiresAt\"< @OneMinutesAgo AND \"StatusName\" = '{StatusName.Queued}')) FOR UPDATE SKIP LOCKED;";

object[] sqlParams =
var sqlParams = new object[]
{
new NpgsqlParameter("@Version", _capOptions.Value.Version),
new NpgsqlParameter("@TwoMinutesLater", DateTime.Now.AddMinutes(2)),
new NpgsqlParameter("@OneMinutesAgo", DateTime.Now.AddMinutes(-1))
new NpgsqlParameter("@OneMinutesAgo", QueuedMessageFetchTime())
};

await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
Expand Down Expand Up @@ -269,6 +269,11 @@ public IMonitoringApi GetMonitoringApi()
return new PostgreSqlMonitoringApi(_options, _initializer, _serializer);
}

protected virtual DateTime QueuedMessageFetchTime()
{
return DateTime.Now.AddMinutes(-1);
}

private async Task ChangeMessageStateAsync(string tableName, MediumMessage message, StatusName state,
object? transaction = null)
{
Expand Down

0 comments on commit aa4c540

Please sign in to comment.