diff --git a/src/Tes.Repository/PostgreSqlCachingRepository.cs b/src/Tes.Repository/PostgreSqlCachingRepository.cs index ef688d1cc..6b36420b6 100644 --- a/src/Tes.Repository/PostgreSqlCachingRepository.cs +++ b/src/Tes.Repository/PostgreSqlCachingRepository.cs @@ -150,6 +150,8 @@ protected async Task> GetItemsAsync(DbSet dbSet, C /// protected Task AddUpdateOrRemoveItemInDbAsync(TDbItem item, Func getItem, WriteAction action, CancellationToken cancellationToken) { + ArgumentNullException.ThrowIfNull(getItem); + var source = new TaskCompletionSource(); var result = source.Task; @@ -161,7 +163,7 @@ protected Task AddUpdateOrRemoveItemInDbAsync(TDbItem item, Func( "Respository concurrency failure: attempt to update item with previously queued update pending.", - result.ContinueWith(task => getItem(task.Result), TaskContinuationOptions.OnlyOnRanToCompletion)); + getItem(item)); } if (!itemsToWrite.Writer.TryWrite(new(item, action, source))) diff --git a/src/Tes.Repository/RepositoryCollisionException.cs b/src/Tes.Repository/RepositoryCollisionException.cs index 4e2f367ab..3b6b8b9b6 100644 --- a/src/Tes.Repository/RepositoryCollisionException.cs +++ b/src/Tes.Repository/RepositoryCollisionException.cs @@ -7,21 +7,21 @@ namespace Tes.Repository { public class RepositoryCollisionException : Exception where T : RepositoryItem { - public System.Threading.Tasks.Task Task { get; } + public T RepositoryItem { get; } - public RepositoryCollisionException(System.Threading.Tasks.Task task) + public RepositoryCollisionException(T repositoryItem) { - Task = task; + RepositoryItem = repositoryItem; } - public RepositoryCollisionException(string message, System.Threading.Tasks.Task task) : base(message) + public RepositoryCollisionException(string message, T repositoryItem) : base(message) { - Task = task; + RepositoryItem = repositoryItem; } - public RepositoryCollisionException(string message, Exception innerException, System.Threading.Tasks.Task task) : base(message, innerException) + public RepositoryCollisionException(string message, Exception innerException, T repositoryItem) : base(message, innerException) { - Task = task; + RepositoryItem = repositoryItem; } } } diff --git a/src/Tes.SDK/TesClient.cs b/src/Tes.SDK/TesClient.cs index a3ce46e63..75c14906b 100644 --- a/src/Tes.SDK/TesClient.cs +++ b/src/Tes.SDK/TesClient.cs @@ -4,8 +4,6 @@ using System.Net.Http.Headers; using System.Runtime.CompilerServices; using System.Text; -using Azure.Identity; -using Azure.Storage.Blobs; using Newtonsoft.Json; using Polly; using Polly.Retry; diff --git a/src/TesApi.Tests/TaskServiceApiControllerTests.cs b/src/TesApi.Tests/TaskServiceApiControllerTests.cs index 239aca283..209b2d289 100644 --- a/src/TesApi.Tests/TaskServiceApiControllerTests.cs +++ b/src/TesApi.Tests/TaskServiceApiControllerTests.cs @@ -316,7 +316,7 @@ public async Task CancelTaskAsync_ReturnsConflict_ForRepositoryCollision() // Mock UpdateItemAsync to throw a RepositoryCollisionException r.Setup(repo => repo.UpdateItemAsync(It.IsAny(), It.IsAny())) - .ThrowsAsync(new RepositoryCollisionException(Task.FromResult(default))); + .ThrowsAsync(new RepositoryCollisionException(default)); }); var controller = services.GetT(); diff --git a/src/TesApi.Web/Controllers/TaskServiceApi.cs b/src/TesApi.Web/Controllers/TaskServiceApi.cs index b283c2469..da82a6c69 100644 --- a/src/TesApi.Web/Controllers/TaskServiceApi.cs +++ b/src/TesApi.Web/Controllers/TaskServiceApi.cs @@ -266,7 +266,7 @@ public virtual async Task CreateTaskAsync([FromBody] TesTask tesT logger.LogDebug("Creating task with id {TesTask} state {TesTaskState}", tesTask.Id, tesTask.State); tesTask = await repository.CreateItemAsync(tesTask, cancellationToken); - await taskScheduler.ProcessQueuedTesTaskAsync(tesTask, cancellationToken); + taskScheduler.QueueTesTask(tesTask); return StatusCode(200, new TesCreateTaskResponse { Id = tesTask.Id }); } diff --git a/src/TesApi.Web/OrchestrateOnBatchSchedulerServiceBase.cs b/src/TesApi.Web/OrchestrateOnBatchSchedulerServiceBase.cs index 71cf7e222..01e4f373c 100644 --- a/src/TesApi.Web/OrchestrateOnBatchSchedulerServiceBase.cs +++ b/src/TesApi.Web/OrchestrateOnBatchSchedulerServiceBase.cs @@ -134,10 +134,13 @@ protected async Task ExecuteActionOnIntervalAsync(TimeSpan runInterval, Func /// Tag to disambiguate the state and/or action workflow performed in log messages. /// . - /// A for controlling the lifetime of the asynchronous operation. + /// /// - protected async ValueTask ProcessOrchestratedTesTaskAsync(string pollName, RelatedTask task, CancellationToken cancellationToken) + /// A for controlling the lifetime of the asynchronous operation. + protected async ValueTask ProcessOrchestratedTesTaskAsync(string pollName, RelatedTask task, Func, ValueTask> requeue, CancellationToken cancellationToken) { + ArgumentNullException.ThrowIfNull(requeue); + var tesTask = task.Related; try @@ -251,27 +254,19 @@ protected async ValueTask ProcessOrchestratedTesTaskAsync(string pollName, Relat } catch (RepositoryCollisionException rce) { - Logger.LogError(rce, "RepositoryCollisionException in OrchestrateTesTasksOnBatch({Poll})", pollName); + Logger.LogInformation(rce, "RepositoryCollisionException in OrchestrateTesTasksOnBatch({Poll})", pollName); try { - var currentTesTask = await rce.Task; + var currentTesTask = rce.RepositoryItem; - if (currentTesTask is not null && currentTesTask.IsActiveState()) + if (currentTesTask is not null) { - currentTesTask.SetWarning(rce.Message); - - if (currentTesTask.IsActiveState()) - { - // TODO: merge tesTask and currentTesTask - } - - await Repository.UpdateItemAsync(currentTesTask, cancellationToken); + await requeue(rce); } } catch (Exception exc) { - // Consider retrying repository.UpdateItemAsync() if this exception was thrown from 'await rce.Task' Logger.LogError(exc, "Updating TES Task '{TesTask}' threw {ExceptionType}: '{ExceptionMessage}'. Stack trace: {ExceptionStackTrace}", tesTask.Id, exc.GetType().FullName, exc.Message, exc.StackTrace); } } @@ -302,18 +297,21 @@ static bool IsExceptionHttpConflictWhereTaskIsComplete(Exception exc) /// Tag to disambiguate the state and/or action workflow performed in log messages. /// Provides array of s on which to perform actions through . /// Method operating on returning indicating if each needs updating into the repository. + /// /// A for controlling the lifetime of the asynchronous operation. /// Tag to indicate the underlying unit quantity of items processed in log messages. - /// True to process even if there are no tasks processed. /// A that represents this method's operations. + /// True to process even if there are no tasks processed. protected async ValueTask OrchestrateTesTasksOnBatchAsync( string pollName, Func>> tesTaskGetter, Func>> tesTaskProcessor, + Func, ValueTask> requeue, CancellationToken cancellationToken, - string unitsLabel = "tasks", - bool needPoolFlush = false) + string unitsLabel = "tasks", bool needPoolFlush = false) { + ArgumentNullException.ThrowIfNull(requeue); + var tesTasks = await (await tesTaskGetter(cancellationToken)).ToArrayAsync(cancellationToken); var noTasks = tesTasks.All(task => task is null); @@ -327,7 +325,7 @@ protected async ValueTask OrchestrateTesTasksOnBatchAsync( if (!noTasks) { - await Parallel.ForEachAsync(tesTaskProcessor(tesTasks, cancellationToken), cancellationToken, (task, token) => ProcessOrchestratedTesTaskAsync(pollName, task, token)); + await Parallel.ForEachAsync(tesTaskProcessor(tesTasks, cancellationToken), cancellationToken, (task, token) => ProcessOrchestratedTesTaskAsync(pollName, task, requeue, token)); } if (BatchScheduler.NeedPoolFlush) diff --git a/src/TesApi.Web/PoolScheduler.cs b/src/TesApi.Web/PoolScheduler.cs index 5efc0842d..82ea0cb81 100644 --- a/src/TesApi.Web/PoolScheduler.cs +++ b/src/TesApi.Web/PoolScheduler.cs @@ -7,6 +7,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using CommonUtilities; using Microsoft.Azure.Batch; using Microsoft.Azure.Batch.Common; using Microsoft.Extensions.Logging; @@ -159,14 +160,14 @@ private async ValueTask ProcessTasksAsync(IBatchPool pool, DateTime now, IEnumer /// private async ValueTask ProcessCloudTaskStatesAsync(string poolId, IAsyncEnumerable states, CancellationToken cancellationToken) { - var list = new ConcurrentBag<(TesTask TesTask, AzureBatchTaskState State)>(); + ConcurrentBag<(TesTask TesTask, AzureBatchTaskState State)> tasksAndStates = []; await Parallel.ForEachAsync(states, cancellationToken, async (state, token) => { TesTask tesTask = default; if (await Repository.TryGetItemAsync(BatchScheduler.GetTesTaskIdFromCloudTaskId(state.CloudTaskId), token, task => tesTask = task) && tesTask is not null) { - list.Add((tesTask, state.TaskState)); + tasksAndStates.Add((tesTask, state.TaskState)); } else { @@ -174,13 +175,41 @@ await Parallel.ForEachAsync(states, cancellationToken, async (state, token) => } }); - if (!list.IsEmpty) + if (!tasksAndStates.IsEmpty) { - await OrchestrateTesTasksOnBatchAsync( - $"NodeState ({poolId})", - _ => ValueTask.FromResult(list.Select(t => t.TesTask).ToAsyncEnumerable()), - (tesTasks, token) => TaskScheduler.ProcessTesTaskBatchStatesAsync(tesTasks, list.Select(t => t.State).ToArray(), token), - cancellationToken); + ConcurrentBag requeues = []; + Dictionary statesByTask = new(StringComparer.Ordinal); + List tasks = []; + + tasksAndStates.ForEach(t => + { + tasks.Add(t.TesTask); + statesByTask.Add(t.TesTask.Id, t.State); + }); + + do + { + requeues.Clear(); + + await OrchestrateTesTasksOnBatchAsync( + $"NodeState ({poolId})", + _ => ValueTask.FromResult(tasks.ToAsyncEnumerable()), + (tesTasks, token) => TaskScheduler.ProcessTesTaskBatchStatesAsync(tesTasks, tesTasks.Select(task => statesByTask[task.Id]).ToArray(), token), + ex => { requeues.Add(ex.RepositoryItem.Id); return ValueTask.CompletedTask; }, cancellationToken); + + tasks.Clear(); + + await Parallel.ForEachAsync(requeues, cancellationToken, async (id, token) => + { + TesTask tesTask = default; + + if (await Repository.TryGetItemAsync(id, token, task => tesTask = task)) + { + tasks.Add(tesTask); + } + }); + } + while (!requeues.IsEmpty); } else { diff --git a/src/TesApi.Web/RepositoryRetryHandler.cs b/src/TesApi.Web/RepositoryRetryHandler.cs index c9dc9303b..c4963f9e2 100644 --- a/src/TesApi.Web/RepositoryRetryHandler.cs +++ b/src/TesApi.Web/RepositoryRetryHandler.cs @@ -34,7 +34,9 @@ public RepositoryRetryHandler(IRepository repository, IOptions(ex => ex is not RepositoryCollisionException)) + .WithRetryPolicyOptionsWait() .SetOnRetryBehavior(logger) .AsyncBuild(); _repository = repository; diff --git a/src/TesApi.Web/TaskScheduler.cs b/src/TesApi.Web/TaskScheduler.cs index 20d8005f4..15a96a2d6 100644 --- a/src/TesApi.Web/TaskScheduler.cs +++ b/src/TesApi.Web/TaskScheduler.cs @@ -8,6 +8,7 @@ using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; +using CommonUtilities; using Microsoft.Extensions.Logging; using Tes.Models; using Tes.Repository; @@ -26,8 +27,7 @@ public interface ITaskScheduler /// Schedules a /// /// A to schedule on the batch system. - /// A for controlling the lifetime of the asynchronous operation. - Task ProcessQueuedTesTaskAsync(TesTask tesTask, CancellationToken cancellationToken); + void QueueTesTask(TesTask tesTask); /// /// Updates s with task-related state @@ -124,7 +124,7 @@ protected override async ValueTask ExecuteSetupAsync(CancellationToken cancellat } catch (Exception ex) { - await ProcessOrchestratedTesTaskAsync("Initialization", new(Task.FromException(ex), tesTask), cancellationToken); + await ProcessOrchestratedTesTaskAsync("Initialization", new(Task.FromException(ex), tesTask), ex => { Logger.LogCritical(ex, "Unexpected repository failure in initialization with {TesTask}", ex.RepositoryItem.Id); return ValueTask.CompletedTask; }, cancellationToken); } } @@ -198,7 +198,17 @@ private async ValueTask ProcessQueuedTesTasksAsync(CancellationToken cancellatio { while (!cancellationToken.IsCancellationRequested && queuedTesTasks.TryDequeue(out var tesTask)) { - await ProcessOrchestratedTesTaskAsync("Queued", new(BatchScheduler.ProcessQueuedTesTaskAsync(tesTask, cancellationToken), tesTask), cancellationToken); + await ProcessOrchestratedTesTaskAsync("Queued", new(BatchScheduler.ProcessQueuedTesTaskAsync(tesTask, cancellationToken), tesTask), Requeue, cancellationToken); + } + + async ValueTask Requeue(RepositoryCollisionException exception) + { + TesTask tesTask = default; + + if (await Repository.TryGetItemAsync(exception.RepositoryItem.Id, cancellationToken, task => tesTask = task) && (tesTask?.IsActiveState() ?? false) && tesTask?.State != TesState.CANCELING) + { + queuedTesTasks.Enqueue(tesTask); + } } } @@ -239,14 +249,43 @@ private Task ExecuteCancelledTesTasksOnBatchAsync(CancellationToken cancellation .ToAsyncEnumerable()); return ExecuteActionOnIntervalAsync(BatchRunInterval, - token => OrchestrateTesTasksOnBatchAsync( - "Cancelled", - query, - (tasks, ct) => ((ITaskScheduler)this).ProcessTesTaskBatchStatesAsync( - tasks, - Enumerable.Repeat(new(AzureBatchTaskState.TaskState.CancellationRequested), tasks.Length).ToArray(), - ct), - token), + async token => + { + ConcurrentBag requeues = []; + List tasks = []; + + await foreach (var task in await query(cancellationToken)) + { + tasks.Add(task); + } + + do + { + requeues.Clear(); + + await OrchestrateTesTasksOnBatchAsync( + "Cancelled", + query, + (tasks, ct) => ((ITaskScheduler)this).ProcessTesTaskBatchStatesAsync( + tasks, + Enumerable.Repeat(new(AzureBatchTaskState.TaskState.CancellationRequested), tasks.Length).ToArray(), + ct), + ex => { requeues.Add(ex.RepositoryItem.Id); return ValueTask.CompletedTask; }, token); + + tasks.Clear(); + + await Parallel.ForEachAsync(requeues, cancellationToken, async (id, token) => + { + TesTask tesTask = default; + + if (await Repository.TryGetItemAsync(id, token, task => tesTask = task)) + { + tasks.Add(tesTask); + } + }); + } + while (!requeues.IsEmpty); + }, cancellationToken); } @@ -361,14 +400,43 @@ private async ValueTask UpdateTesTasksFromAvailableEventsAsync(IEnumerable<(TesT return; } - // Update TesTasks - await OrchestrateTesTasksOnBatchAsync( + ConcurrentBag requeues = []; + Dictionary MarkProcessedAsync)> statesByTask = new(StringComparer.Ordinal); + List tasks = []; + + eventStates.ForEach(t => + { + tasks.Add(t.Task); + statesByTask.Add(t.Task.Id, (t.State, t.MarkProcessedAsync)); + }); + + do + { + requeues.Clear(); + + + // Update TesTasks + await OrchestrateTesTasksOnBatchAsync( "NodeEvent", - _ => ValueTask.FromResult(eventStates.Select(@event => @event.Task).ToAsyncEnumerable()), - (tesTasks, token) => ((ITaskScheduler)this).ProcessTesTaskBatchStatesAsync(tesTasks, eventStates.Select(@event => @event.State).ToArray(), token), - cancellationToken, + _ => ValueTask.FromResult(tasks.ToAsyncEnumerable()), + (tesTasks, token) => ((ITaskScheduler)this).ProcessTesTaskBatchStatesAsync(tesTasks, tesTasks.Select(task => statesByTask[task.Id].State).ToArray(), token), + ex => { requeues.Add(ex.RepositoryItem.Id); return ValueTask.CompletedTask; }, cancellationToken, "events"); + tasks.Clear(); + + await Parallel.ForEachAsync(requeues, cancellationToken, async (id, token) => + { + TesTask tesTask = default; + + if (await Repository.TryGetItemAsync(id, token, task => tesTask = task)) + { + tasks.Add(tesTask); + } + }); + } + while (!requeues.IsEmpty); + await Parallel.ForEachAsync(eventStates.Select(@event => @event.MarkProcessedAsync).Where(func => func is not null), cancellationToken, async (markEventProcessed, token) => { try @@ -387,10 +455,9 @@ await Parallel.ForEachAsync(eventStates.Select(@event => @event.MarkProcessedAsy } /// - Task ITaskScheduler.ProcessQueuedTesTaskAsync(TesTask tesTask, CancellationToken cancellationToken) + void ITaskScheduler.QueueTesTask(TesTask tesTask) { queuedTesTasks.Enqueue(tesTask); - return Task.CompletedTask; } ///