Skip to content

Commit

Permalink
Fix task event processing loop
Browse files Browse the repository at this point in the history
  • Loading branch information
BMurri committed Nov 1, 2024
1 parent 17a0cd1 commit a6d9420
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 22 deletions.
11 changes: 7 additions & 4 deletions src/TesApi.Web/PoolScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,24 +190,27 @@ await Parallel.ForEachAsync(states, cancellationToken, async (state, token) =>
do
{
requeues.Clear();

await OrchestrateTesTasksOnBatchAsync(
$"NodeState ({poolId})",
_ => ValueTask.FromResult(tasks.ToAsyncEnumerable()),
(tesTasks, token) => TaskScheduler.ProcessTesTaskBatchStatesAsync(tesTasks, tesTasks.Select(task => statesByTask[task.Id]).ToArray(), token),
ex => { requeues.Add(ex.RepositoryItem.Id); return ValueTask.CompletedTask; }, cancellationToken);

tasks.Clear();

// Fetch updated TesTasks from the repository
ConcurrentBag<TesTask> requeuedTasks = [];
await Parallel.ForEachAsync(requeues, cancellationToken, async (id, token) =>
{
TesTask tesTask = default;
if (await Repository.TryGetItemAsync(id, token, task => tesTask = task))
{
tasks.Add(tesTask);
requeuedTasks.Add(tesTask);
}
});

// Stage next loop
tasks.Clear();
requeuedTasks.ForEach(tasks.Add);
}
while (!requeues.IsEmpty);
}
Expand Down
59 changes: 41 additions & 18 deletions src/TesApi.Web/TaskScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
Expand Down Expand Up @@ -262,7 +263,6 @@ private Task ExecuteCancelledTesTasksOnBatchAsync(CancellationToken cancellation
do
{
requeues.Clear();
await OrchestrateTesTasksOnBatchAsync(
"Cancelled",
query,
Expand All @@ -272,17 +272,21 @@ await OrchestrateTesTasksOnBatchAsync(
ct),
ex => { requeues.Add(ex.RepositoryItem.Id); return ValueTask.CompletedTask; }, token);
tasks.Clear();
// Fetch updated TesTasks from the repository
ConcurrentBag<TesTask> requeuedTasks = [];
await Parallel.ForEachAsync(requeues, cancellationToken, async (id, token) =>
{
TesTask tesTask = default;
if (await Repository.TryGetItemAsync(id, token, task => tesTask = task))
{
tasks.Add(tesTask);
requeuedTasks.Add(tesTask);
}
});
// Stage next loop
tasks.Clear();
requeuedTasks.ForEach(tasks.Add);
}
while (!requeues.IsEmpty);
},
Expand Down Expand Up @@ -401,39 +405,58 @@ private async ValueTask UpdateTesTasksFromAvailableEventsAsync(IEnumerable<(TesT
}

ConcurrentBag<string> requeues = [];
Dictionary<string, (AzureBatchTaskState State, Func<CancellationToken, Task> MarkProcessedAsync)> statesByTask = new(StringComparer.Ordinal);
List<TesTask> tasks = [];
ConcurrentDictionary<string, ImmutableArray<(AzureBatchTaskState State, Func<CancellationToken, Task> MarkProcessedAsync)>> statesByTask = new(StringComparer.Ordinal);
HashSet<TesTask> tasks = [];

eventStates.ForEach(t =>
{
tasks.Add(t.Task);
statesByTask.Add(t.Task.Id, (t.State, t.MarkProcessedAsync));
_ = tasks.Add(t.Task);
_ = statesByTask.AddOrUpdate(t.Task.Id, _ => [(t.State, t.MarkProcessedAsync)], (_, array) => array.Add((t.State, t.MarkProcessedAsync)));
});

do
{
// Update TesTasks one event each per loop
requeues.Clear();


// Update TesTasks
await OrchestrateTesTasksOnBatchAsync(
"NodeEvent",
_ => ValueTask.FromResult(tasks.ToAsyncEnumerable()),
(tesTasks, token) => ((ITaskScheduler)this).ProcessTesTaskBatchStatesAsync(tesTasks, tesTasks.Select(task => statesByTask[task.Id].State).ToArray(), token),
ex => { requeues.Add(ex.RepositoryItem.Id); return ValueTask.CompletedTask; }, cancellationToken,
"events");
"NodeEvent",
_ => ValueTask.FromResult(tasks.ToAsyncEnumerable()),
(tesTasks, token) => ((ITaskScheduler)this).ProcessTesTaskBatchStatesAsync(tesTasks, tesTasks.Select(task => statesByTask[task.Id][0].State).ToArray(), token),
ex => { requeues.Add(ex.RepositoryItem.Id); return ValueTask.CompletedTask; },
cancellationToken,
"events");

// Get next state for each task (if any) for next loop
_ = Parallel.ForEach(tasks, task =>
{
// Don't remove current state if there was a repository conflict
if (!requeues.Contains(task.Id))
{
var states = statesByTask[task.Id].RemoveAt(0);
tasks.Clear();
if (!states.IsEmpty)
{
statesByTask[task.Id] = states;
requeues.Add(task.Id);
}
}
});

// Fetch updated TesTasks from the repository
ConcurrentBag<TesTask> requeuedTasks = [];
await Parallel.ForEachAsync(requeues, cancellationToken, async (id, token) =>
{
TesTask tesTask = default;
if (await Repository.TryGetItemAsync(id, token, task => tesTask = task))
{
tasks.Add(tesTask);
requeuedTasks.Add(tesTask);
}
});

// Stage next loop
tasks.Clear();
requeuedTasks.ForEach(task => _ = tasks.Add(task));
}
while (!requeues.IsEmpty);

Expand Down

0 comments on commit a6d9420

Please sign in to comment.