diff --git a/src/TesApi.Web/PoolScheduler.cs b/src/TesApi.Web/PoolScheduler.cs index 82ea0cb81..0ebc7b155 100644 --- a/src/TesApi.Web/PoolScheduler.cs +++ b/src/TesApi.Web/PoolScheduler.cs @@ -190,24 +190,27 @@ await Parallel.ForEachAsync(states, cancellationToken, async (state, token) => do { requeues.Clear(); - await OrchestrateTesTasksOnBatchAsync( $"NodeState ({poolId})", _ => ValueTask.FromResult(tasks.ToAsyncEnumerable()), (tesTasks, token) => TaskScheduler.ProcessTesTaskBatchStatesAsync(tesTasks, tesTasks.Select(task => statesByTask[task.Id]).ToArray(), token), ex => { requeues.Add(ex.RepositoryItem.Id); return ValueTask.CompletedTask; }, cancellationToken); - tasks.Clear(); - + // Fetch updated TesTasks from the repository + ConcurrentBag requeuedTasks = []; await Parallel.ForEachAsync(requeues, cancellationToken, async (id, token) => { TesTask tesTask = default; if (await Repository.TryGetItemAsync(id, token, task => tesTask = task)) { - tasks.Add(tesTask); + requeuedTasks.Add(tesTask); } }); + + // Stage next loop + tasks.Clear(); + requeuedTasks.ForEach(tasks.Add); } while (!requeues.IsEmpty); } diff --git a/src/TesApi.Web/TaskScheduler.cs b/src/TesApi.Web/TaskScheduler.cs index 15a96a2d6..d2bc8658f 100644 --- a/src/TesApi.Web/TaskScheduler.cs +++ b/src/TesApi.Web/TaskScheduler.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Collections.Immutable; using System.Linq; using System.Threading; using System.Threading.Channels; @@ -262,7 +263,6 @@ private Task ExecuteCancelledTesTasksOnBatchAsync(CancellationToken cancellation do { requeues.Clear(); - await OrchestrateTesTasksOnBatchAsync( "Cancelled", query, @@ -272,17 +272,21 @@ await OrchestrateTesTasksOnBatchAsync( ct), ex => { requeues.Add(ex.RepositoryItem.Id); return ValueTask.CompletedTask; }, token); - tasks.Clear(); - + // Fetch updated TesTasks from the repository + ConcurrentBag requeuedTasks = []; await Parallel.ForEachAsync(requeues, cancellationToken, async (id, token) => { TesTask tesTask = default; if (await Repository.TryGetItemAsync(id, token, task => tesTask = task)) { - tasks.Add(tesTask); + requeuedTasks.Add(tesTask); } }); + + // Stage next loop + tasks.Clear(); + requeuedTasks.ForEach(tasks.Add); } while (!requeues.IsEmpty); }, @@ -401,39 +405,58 @@ private async ValueTask UpdateTesTasksFromAvailableEventsAsync(IEnumerable<(TesT } ConcurrentBag requeues = []; - Dictionary MarkProcessedAsync)> statesByTask = new(StringComparer.Ordinal); - List tasks = []; + ConcurrentDictionary MarkProcessedAsync)>> statesByTask = new(StringComparer.Ordinal); + HashSet tasks = []; eventStates.ForEach(t => { - tasks.Add(t.Task); - statesByTask.Add(t.Task.Id, (t.State, t.MarkProcessedAsync)); + _ = tasks.Add(t.Task); + _ = statesByTask.AddOrUpdate(t.Task.Id, _ => [(t.State, t.MarkProcessedAsync)], (_, array) => array.Add((t.State, t.MarkProcessedAsync))); }); do { + // Update TesTasks one event each per loop requeues.Clear(); - - - // Update TesTasks await OrchestrateTesTasksOnBatchAsync( - "NodeEvent", - _ => ValueTask.FromResult(tasks.ToAsyncEnumerable()), - (tesTasks, token) => ((ITaskScheduler)this).ProcessTesTaskBatchStatesAsync(tesTasks, tesTasks.Select(task => statesByTask[task.Id].State).ToArray(), token), - ex => { requeues.Add(ex.RepositoryItem.Id); return ValueTask.CompletedTask; }, cancellationToken, - "events"); + "NodeEvent", + _ => ValueTask.FromResult(tasks.ToAsyncEnumerable()), + (tesTasks, token) => ((ITaskScheduler)this).ProcessTesTaskBatchStatesAsync(tesTasks, tesTasks.Select(task => statesByTask[task.Id][0].State).ToArray(), token), + ex => { requeues.Add(ex.RepositoryItem.Id); return ValueTask.CompletedTask; }, + cancellationToken, + "events"); + + // Get next state for each task (if any) for next loop + _ = Parallel.ForEach(tasks, task => + { + // Don't remove current state if there was a repository conflict + if (!requeues.Contains(task.Id)) + { + var states = statesByTask[task.Id].RemoveAt(0); - tasks.Clear(); + if (!states.IsEmpty) + { + statesByTask[task.Id] = states; + requeues.Add(task.Id); + } + } + }); + // Fetch updated TesTasks from the repository + ConcurrentBag requeuedTasks = []; await Parallel.ForEachAsync(requeues, cancellationToken, async (id, token) => { TesTask tesTask = default; if (await Repository.TryGetItemAsync(id, token, task => tesTask = task)) { - tasks.Add(tesTask); + requeuedTasks.Add(tesTask); } }); + + // Stage next loop + tasks.Clear(); + requeuedTasks.ForEach(task => _ = tasks.Add(task)); } while (!requeues.IsEmpty);