Skip to content

Commit

Permalink
Handle RepositoryCollisionException
Browse files Browse the repository at this point in the history
  • Loading branch information
BMurri committed Oct 31, 2024
1 parent ad64b22 commit 17a0cd1
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 58 deletions.
4 changes: 3 additions & 1 deletion src/Tes.Repository/PostgreSqlCachingRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ protected async Task<IEnumerable<TDbItem>> GetItemsAsync(DbSet<TDbItem> dbSet, C
/// <returns></returns>
protected Task<TDbItem> AddUpdateOrRemoveItemInDbAsync(TDbItem item, Func<TDbItem, TItem> getItem, WriteAction action, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(getItem);

var source = new TaskCompletionSource<TDbItem>();
var result = source.Task;

Expand All @@ -161,7 +163,7 @@ protected Task<TDbItem> AddUpdateOrRemoveItemInDbAsync(TDbItem item, Func<TDbIte
{
throw new RepositoryCollisionException<TItem>(
"Respository concurrency failure: attempt to update item with previously queued update pending.",
result.ContinueWith(task => getItem(task.Result), TaskContinuationOptions.OnlyOnRanToCompletion));
getItem(item));
}

if (!itemsToWrite.Writer.TryWrite(new(item, action, source)))
Expand Down
14 changes: 7 additions & 7 deletions src/Tes.Repository/RepositoryCollisionException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@ namespace Tes.Repository
{
public class RepositoryCollisionException<T> : Exception where T : RepositoryItem<T>
{
public System.Threading.Tasks.Task<T> Task { get; }
public T RepositoryItem { get; }

public RepositoryCollisionException(System.Threading.Tasks.Task<T> task)
public RepositoryCollisionException(T repositoryItem)
{
Task = task;
RepositoryItem = repositoryItem;
}

public RepositoryCollisionException(string message, System.Threading.Tasks.Task<T> task) : base(message)
public RepositoryCollisionException(string message, T repositoryItem) : base(message)
{
Task = task;
RepositoryItem = repositoryItem;
}

public RepositoryCollisionException(string message, Exception innerException, System.Threading.Tasks.Task<T> task) : base(message, innerException)
public RepositoryCollisionException(string message, Exception innerException, T repositoryItem) : base(message, innerException)
{
Task = task;
RepositoryItem = repositoryItem;
}
}
}
2 changes: 0 additions & 2 deletions src/Tes.SDK/TesClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
using System.Net.Http.Headers;
using System.Runtime.CompilerServices;
using System.Text;
using Azure.Identity;
using Azure.Storage.Blobs;
using Newtonsoft.Json;
using Polly;
using Polly.Retry;
Expand Down
2 changes: 1 addition & 1 deletion src/TesApi.Tests/TaskServiceApiControllerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ public async Task CancelTaskAsync_ReturnsConflict_ForRepositoryCollision()
// Mock UpdateItemAsync to throw a RepositoryCollisionException
r.Setup(repo => repo.UpdateItemAsync(It.IsAny<TesTask>(), It.IsAny<CancellationToken>()))
.ThrowsAsync(new RepositoryCollisionException<TesTask>(Task.FromResult<TesTask>(default)));
.ThrowsAsync(new RepositoryCollisionException<TesTask>(default));
});

var controller = services.GetT();
Expand Down
2 changes: 1 addition & 1 deletion src/TesApi.Web/Controllers/TaskServiceApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public virtual async Task<IActionResult> CreateTaskAsync([FromBody] TesTask tesT

logger.LogDebug("Creating task with id {TesTask} state {TesTaskState}", tesTask.Id, tesTask.State);
tesTask = await repository.CreateItemAsync(tesTask, cancellationToken);
await taskScheduler.ProcessQueuedTesTaskAsync(tesTask, cancellationToken);
taskScheduler.QueueTesTask(tesTask);
return StatusCode(200, new TesCreateTaskResponse { Id = tesTask.Id });
}

Expand Down
34 changes: 16 additions & 18 deletions src/TesApi.Web/OrchestrateOnBatchSchedulerServiceBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,13 @@ protected async Task ExecuteActionOnIntervalAsync(TimeSpan runInterval, Func<Can
/// </summary>
/// <param name="pollName">Tag to disambiguate the state and/or action workflow performed in log messages.</param>
/// <param name="task"><see cref="TesTask"/>.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> for controlling the lifetime of the asynchronous operation.</param>
/// <param name="requeue"></param>
/// <returns></returns>
protected async ValueTask ProcessOrchestratedTesTaskAsync(string pollName, RelatedTask<TesTask, bool> task, CancellationToken cancellationToken)
/// <param name="cancellationToken">A <see cref="CancellationToken"/> for controlling the lifetime of the asynchronous operation.</param>
protected async ValueTask ProcessOrchestratedTesTaskAsync(string pollName, RelatedTask<TesTask, bool> task, Func<RepositoryCollisionException<TesTask>, ValueTask> requeue, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(requeue);

var tesTask = task.Related;

try
Expand Down Expand Up @@ -251,27 +254,19 @@ protected async ValueTask ProcessOrchestratedTesTaskAsync(string pollName, Relat
}
catch (RepositoryCollisionException<TesTask> rce)
{
Logger.LogError(rce, "RepositoryCollisionException in OrchestrateTesTasksOnBatch({Poll})", pollName);
Logger.LogInformation(rce, "RepositoryCollisionException in OrchestrateTesTasksOnBatch({Poll})", pollName);

try
{
var currentTesTask = await rce.Task;
var currentTesTask = rce.RepositoryItem;

if (currentTesTask is not null && currentTesTask.IsActiveState())
if (currentTesTask is not null)
{
currentTesTask.SetWarning(rce.Message);

if (currentTesTask.IsActiveState())
{
// TODO: merge tesTask and currentTesTask
}

await Repository.UpdateItemAsync(currentTesTask, cancellationToken);
await requeue(rce);
}
}
catch (Exception exc)
{
// Consider retrying repository.UpdateItemAsync() if this exception was thrown from 'await rce.Task'
Logger.LogError(exc, "Updating TES Task '{TesTask}' threw {ExceptionType}: '{ExceptionMessage}'. Stack trace: {ExceptionStackTrace}", tesTask.Id, exc.GetType().FullName, exc.Message, exc.StackTrace);
}
}
Expand Down Expand Up @@ -302,18 +297,21 @@ static bool IsExceptionHttpConflictWhereTaskIsComplete(Exception exc)
/// <param name="pollName">Tag to disambiguate the state and/or action workflow performed in log messages.</param>
/// <param name="tesTaskGetter">Provides array of <see cref="TesTask"/>s on which to perform actions through <paramref name="tesTaskProcessor"/>.</param>
/// <param name="tesTaskProcessor">Method operating on <paramref name="tesTaskGetter"/> returning <see cref="RelatedTask{TRelated, TResult}"/> indicating if each <see cref="TesTask"/> needs updating into the repository.</param>
/// <param name="requeue"></param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> for controlling the lifetime of the asynchronous operation.</param>
/// <param name="unitsLabel">Tag to indicate the underlying unit quantity of items processed in log messages.</param>
/// <param name="needPoolFlush">True to process <see cref="IBatchScheduler.NeedPoolFlush"/> even if there are no tasks processed.</param>
/// <returns>A <see cref="ValueTask"/> that represents this method's operations.</returns>
/// <param name="needPoolFlush">True to process <see cref="IBatchScheduler.NeedPoolFlush"/> even if there are no tasks processed.</param>
protected async ValueTask OrchestrateTesTasksOnBatchAsync(
string pollName,
Func<CancellationToken, ValueTask<IAsyncEnumerable<TesTask>>> tesTaskGetter,
Func<TesTask[], CancellationToken, IAsyncEnumerable<RelatedTask<TesTask, bool>>> tesTaskProcessor,
Func<RepositoryCollisionException<TesTask>, ValueTask> requeue,
CancellationToken cancellationToken,
string unitsLabel = "tasks",
bool needPoolFlush = false)
string unitsLabel = "tasks", bool needPoolFlush = false)
{
ArgumentNullException.ThrowIfNull(requeue);

var tesTasks = await (await tesTaskGetter(cancellationToken)).ToArrayAsync(cancellationToken);
var noTasks = tesTasks.All(task => task is null);

Expand All @@ -327,7 +325,7 @@ protected async ValueTask OrchestrateTesTasksOnBatchAsync(

if (!noTasks)
{
await Parallel.ForEachAsync(tesTaskProcessor(tesTasks, cancellationToken), cancellationToken, (task, token) => ProcessOrchestratedTesTaskAsync(pollName, task, token));
await Parallel.ForEachAsync(tesTaskProcessor(tesTasks, cancellationToken), cancellationToken, (task, token) => ProcessOrchestratedTesTaskAsync(pollName, task, requeue, token));
}

if (BatchScheduler.NeedPoolFlush)
Expand Down
45 changes: 37 additions & 8 deletions src/TesApi.Web/PoolScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using CommonUtilities;
using Microsoft.Azure.Batch;
using Microsoft.Azure.Batch.Common;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -159,28 +160,56 @@ private async ValueTask ProcessTasksAsync(IBatchPool pool, DateTime now, IEnumer
/// <returns></returns>
private async ValueTask ProcessCloudTaskStatesAsync(string poolId, IAsyncEnumerable<CloudTaskBatchTaskState> states, CancellationToken cancellationToken)
{
var list = new ConcurrentBag<(TesTask TesTask, AzureBatchTaskState State)>();
ConcurrentBag<(TesTask TesTask, AzureBatchTaskState State)> tasksAndStates = [];

await Parallel.ForEachAsync(states, cancellationToken, async (state, token) =>
{
TesTask tesTask = default;
if (await Repository.TryGetItemAsync(BatchScheduler.GetTesTaskIdFromCloudTaskId(state.CloudTaskId), token, task => tesTask = task) && tesTask is not null)
{
list.Add((tesTask, state.TaskState));
tasksAndStates.Add((tesTask, state.TaskState));
}
else
{
Logger.LogError(@"Unable to locate TesTask for CloudTask '{CloudTask}' with action state {ActionState}.", state.CloudTaskId, state.TaskState.State);
}
});

if (!list.IsEmpty)
if (!tasksAndStates.IsEmpty)
{
await OrchestrateTesTasksOnBatchAsync(
$"NodeState ({poolId})",
_ => ValueTask.FromResult(list.Select(t => t.TesTask).ToAsyncEnumerable()),
(tesTasks, token) => TaskScheduler.ProcessTesTaskBatchStatesAsync(tesTasks, list.Select(t => t.State).ToArray(), token),
cancellationToken);
ConcurrentBag<string> requeues = [];
Dictionary<string, AzureBatchTaskState> statesByTask = new(StringComparer.Ordinal);
List<TesTask> tasks = [];

tasksAndStates.ForEach(t =>
{
tasks.Add(t.TesTask);
statesByTask.Add(t.TesTask.Id, t.State);
});

do
{
requeues.Clear();

await OrchestrateTesTasksOnBatchAsync(
$"NodeState ({poolId})",
_ => ValueTask.FromResult(tasks.ToAsyncEnumerable()),
(tesTasks, token) => TaskScheduler.ProcessTesTaskBatchStatesAsync(tesTasks, tesTasks.Select(task => statesByTask[task.Id]).ToArray(), token),
ex => { requeues.Add(ex.RepositoryItem.Id); return ValueTask.CompletedTask; }, cancellationToken);

tasks.Clear();

await Parallel.ForEachAsync(requeues, cancellationToken, async (id, token) =>
{
TesTask tesTask = default;
if (await Repository.TryGetItemAsync(id, token, task => tesTask = task))
{
tasks.Add(tesTask);
}
});
}
while (!requeues.IsEmpty);
}
else
{
Expand Down
4 changes: 3 additions & 1 deletion src/TesApi.Web/RepositoryRetryHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ public RepositoryRetryHandler(IRepository<T> repository, IOptions<RetryPolicyOpt
ArgumentNullException.ThrowIfNull(retryPolicyOptions);

_asyncRetryPolicy = new RetryPolicyBuilder(retryPolicyOptions)
.DefaultRetryPolicyBuilder()
.PolicyBuilder
.OpinionatedRetryPolicy(Polly.Policy.Handle<Exception>(ex => ex is not RepositoryCollisionException<T>))
.WithRetryPolicyOptionsWait()
.SetOnRetryBehavior(logger)
.AsyncBuild();
_repository = repository;
Expand Down
Loading

0 comments on commit 17a0cd1

Please sign in to comment.