Skip to content

Commit

Permalink
Reduced memory allocation
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Jul 11, 2023
1 parent 97ea897 commit da1d799
Showing 1 changed file with 26 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,6 @@ public override TimeSpan ReceiveTimeout
private protected override MemoryOwner<byte> AllocateBuffer(int bufferSize)
=> defaultAllocator(bufferSize);

private (ConnectionContext, int, EndPoint?, MemoryAllocator<byte>) PrepareConnectionHandlerArgs(ConnectionContext connection)
{
var transmissionSize = connection.Transport.Output.GetSpan().Length;
var allocator = connection.Features.Get<MemoryAllocator<byte>>()
?? connection.Features.Get<IMemoryPoolFeature>()?.MemoryPool?.ToAllocator()
?? defaultAllocator;

return (connection, transmissionSize, connection.RemoteEndPoint, allocator);
}

private void HandleConnection((ConnectionContext, int, EndPoint?, MemoryAllocator<byte>) args)
=> HandleConnection(args.Item1, args.Item2, args.Item3, args.Item4);

private async void HandleConnection(ConnectionContext connection, int transmissionSize, EndPoint? clientAddress, MemoryAllocator<byte> allocator)
{
var protocol = new ProtocolPipeStream(connection.Transport, allocator, transmissionSize)
Expand Down Expand Up @@ -127,7 +114,7 @@ private async Task Listen(IConnectionListener listener)
if (connection is null)
break;

ThreadPool.UnsafeQueueUserWorkItem(HandleConnection, PrepareConnectionHandlerArgs(connection), preferLocal: false);
ThreadPool.UnsafeQueueUserWorkItem(new ConnectionHandler(this, connection), preferLocal: false);
}
catch (Exception e) when (e is ObjectDisposedException || (e is OperationCanceledException canceledEx && canceledEx.CancellationToken == lifecycleToken))
{
Expand Down Expand Up @@ -172,4 +159,29 @@ protected override ValueTask DisposeAsyncCore()
Cleanup();
return new(listenerTask ?? Task.CompletedTask);
}

private sealed class ConnectionHandler : Tuple<ConnectionContext, int, EndPoint?, MemoryAllocator<byte>>, IThreadPoolWorkItem
{
private readonly WeakReference<GenericServer> server;

internal ConnectionHandler(GenericServer server, ConnectionContext connection)
: base(connection, GetTransmissionSize(connection), connection.RemoteEndPoint, GetMemoryAllocator(server, connection))
=> this.server = new(server, trackResurrection: false);

private static int GetTransmissionSize(ConnectionContext connection)
=> connection.Transport.Output.GetSpan().Length;

private static MemoryAllocator<byte> GetMemoryAllocator(GenericServer server, BaseConnectionContext connection)
{
return connection.Features.Get<MemoryAllocator<byte>>()
?? connection.Features.Get<IMemoryPoolFeature>()?.MemoryPool?.ToAllocator()
?? server.defaultAllocator;
}

void IThreadPoolWorkItem.Execute()
{
if (this.server.TryGetTarget(out var server))
server.HandleConnection(Item1, Item2, Item3, Item4);
}
}
}

0 comments on commit da1d799

Please sign in to comment.