Skip to content

Commit

Permalink
Adds support for async service scope in request executor (#7826)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael Staib <[email protected]>
  • Loading branch information
PascalSenn and michaelstaib committed Dec 16, 2024
1 parent 2f8d695 commit 5ccdc28
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,25 @@ public VariableBatchRequest(
/// GraphQL request flags allow to limit the GraphQL executor capabilities.
/// </summary>
public GraphQLRequestFlags Flags { get; }

/// <summary>
/// Creates a new request with the specified services.
/// </summary>
/// <param name="services">
/// The services that shall be used while executing the operation.
/// </param>
/// <returns>
/// Returns a new request with the specified services.
/// </returns>
public VariableBatchRequest WithServices(IServiceProvider services) =>
new(
Document,
DocumentId,
DocumentHash,
OperationName,
VariableValues,
Extensions,
ContextData,
services,
Flags);
}
57 changes: 52 additions & 5 deletions src/HotChocolate/Core/src/Execution/RequestExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ internal async Task<IExecutionResult> ExecuteAsync(

if (scopeDataLoader)
{
// we ensure that at the begin of each execution there is a fresh batching scope.
// we ensure that at the beginning of each execution there is a fresh batching scope.
services.InitializeDataLoaderScope();
}

Expand Down Expand Up @@ -153,7 +153,14 @@ internal async Task<IExecutionResult> ExecuteAsync(
_contextPool.Return(context);
}

scope?.Dispose();
if(scope is IAsyncDisposable asyncScope)
{
await asyncScope.DisposeAsync();
}
else
{
scope?.Dispose();
}
}
}

Expand All @@ -174,7 +181,7 @@ public Task<IResponseStream> ExecuteBatchAsync(

private async IAsyncEnumerable<IOperationResult> CreateResponseStream(
OperationRequestBatch requestBatch,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
[EnumeratorCancellation] CancellationToken ct = default)
{
IServiceScope? scope = null;

Expand All @@ -197,6 +204,31 @@ private async IAsyncEnumerable<IOperationResult> CreateResponseStream(
// we ensure that at the start of each execution there is a fresh batching scope.
services.InitializeDataLoaderScope();

try
{
await foreach (var result in ExecuteBatchStream(requestBatch, services, ct).ConfigureAwait(false))
{
yield return result;
}
}
finally
{
if(scope is IAsyncDisposable asyncScope)
{
await asyncScope.DisposeAsync();
}
else
{
scope?.Dispose();
}
}
}

private async IAsyncEnumerable<IOperationResult> ExecuteBatchStream(
OperationRequestBatch requestBatch,
IServiceProvider services,
[EnumeratorCancellation] CancellationToken ct = default)
{
var requests = requestBatch.Requests;
var requestCount = requests.Count;
var tasks = new List<Task>(requestCount);
Expand All @@ -205,7 +237,7 @@ private async IAsyncEnumerable<IOperationResult> CreateResponseStream(

for (var i = 0; i < requestCount; i++)
{
tasks.Add(ExecuteBatchItemAsync(requests[i], i, completed, cancellationToken));
tasks.Add(ExecuteBatchItemAsync(WithServices(requests[i], services), i, completed, ct));
}

var buffer = new IOperationResult[8];
Expand All @@ -228,7 +260,7 @@ private async IAsyncEnumerable<IOperationResult> CreateResponseStream(

if (task.Status is not TaskStatus.RanToCompletion)
{
// we await to throw if its not successful.
// we await to throw if it's not successful.
await task;
}

Expand All @@ -252,6 +284,21 @@ private async IAsyncEnumerable<IOperationResult> CreateResponseStream(
while (tasks.Count > 0 || bufferCount > 0);
}

private static IOperationRequest WithServices(IOperationRequest request, IServiceProvider services)
{
switch (request)
{
case OperationRequest operationRequest:
return operationRequest.WithServices(services);

case VariableBatchRequest variableBatchRequest:
return variableBatchRequest.WithServices(services);

default:
throw new InvalidOperationException("Unexpected request type.");
}
}

private async Task ExecuteBatchItemAsync(
IOperationRequest request,
int requestIndex,
Expand Down

0 comments on commit 5ccdc28

Please sign in to comment.