Skip to content

Commit

Permalink
Reuse memory
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelstaib committed Dec 16, 2024
1 parent b946434 commit cf4be7e
Showing 1 changed file with 61 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Buffers;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
Expand All @@ -11,7 +12,7 @@ namespace HotChocolate.PersistedOperations.AzureBlobStorage;
/// </summary>
public class AzureBlobOperationDocumentStorage : IOperationDocumentStorage
{
private static readonly BlobOpenWriteOptions _defaultBlobOpenWriteOptions = new()
private static readonly BlobOpenWriteOptions _writeOptions = new()
{
HttpHeaders = new BlobHttpHeaders
{
Expand All @@ -21,28 +22,20 @@ public class AzureBlobOperationDocumentStorage : IOperationDocumentStorage
}
};

private readonly BlobContainerClient _blobContainerClient;
private readonly string _blobNamePrefix;
private readonly string _blobNameSuffix;
private readonly BlobContainerClient _client;

/// <summary>
/// Initializes a new instance of the class.
/// </summary>
/// <param name="containerClient">The blob container client instance.</param>
/// <param name="blobNamePrefix">This prefix string is prepended before the hash of the document.</param>
/// <param name="blobNameSuffix">This suffix is appended after the hash of the document.</param>
public AzureBlobOperationDocumentStorage(
BlobContainerClient containerClient,
string blobNamePrefix,
string blobNameSuffix)
/// <param name="client">The blob container client instance.</param>
public AzureBlobOperationDocumentStorage(BlobContainerClient client)
{
ArgumentNullException.ThrowIfNull(containerClient);
ArgumentNullException.ThrowIfNull(blobNamePrefix);
ArgumentNullException.ThrowIfNull(blobNameSuffix);
if (client == null)
{
throw new ArgumentNullException(nameof(client));
}

_blobContainerClient = containerClient;
_blobNamePrefix = blobNamePrefix;
_blobNameSuffix = blobNameSuffix;
_client = client;
}

/// <inheritdoc />
Expand All @@ -60,20 +53,41 @@ public AzureBlobOperationDocumentStorage(

private async ValueTask<IOperationDocument?> TryReadInternalAsync(
OperationDocumentId documentId,
CancellationToken cancellationToken)
CancellationToken ct)
{
var blobClient = _blobContainerClient.GetBlobClient(BlobName(documentId));
var blobClient = _client.GetBlobClient(documentId.ToString());
var buffer = ArrayPool<byte>.Shared.Rent(1024);
var position = 0;

try
{
await using var blobStream = await blobClient
.OpenReadAsync(cancellationToken: cancellationToken).ConfigureAwait(false);

await using var memoryStream = new MemoryStream();
await blobStream.CopyToAsync(memoryStream, cancellationToken).ConfigureAwait(false);
return memoryStream.Length == 0
? null
: new OperationDocument(Utf8GraphQLParser.Parse(memoryStream.ToArray()));
await using var blobStream = await blobClient.OpenReadAsync(cancellationToken: ct).ConfigureAwait(false);
while (true)
{
if (buffer.Length < position + 256)
{
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Array.Copy(buffer, newBuffer, buffer.Length);
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
}

var read = await blobStream.ReadAsync(buffer, position, 256, ct);
position += read;

if (read < 256)
{
break;
}
}

if (position == 0)
{
return null;
}

var span = new ReadOnlySpan<byte>(buffer, 0, position);
return new OperationDocument(Utf8GraphQLParser.Parse(span));
}
catch (RequestFailedException e)
{
Expand All @@ -84,6 +98,15 @@ public AzureBlobOperationDocumentStorage(

throw;
}
finally
{
if(position > 0)
{
buffer.AsSpan().Slice(0, position).Clear();
}

ArrayPool<byte>.Shared.Return(buffer);
}
}

/// <inheritdoc />
Expand All @@ -92,10 +115,14 @@ public ValueTask SaveAsync(
IOperationDocument document,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(document);
if(document == null)
{
throw new ArgumentNullException(nameof(document));
}

if (OperationDocumentId.IsNullOrEmpty(documentId))
{
throw new ArgumentNullException(nameof(documentId));
throw new ArgumentException(nameof(documentId));
}

return SaveInternalAsync(documentId, document, cancellationToken);
Expand All @@ -104,15 +131,11 @@ public ValueTask SaveAsync(
private async ValueTask SaveInternalAsync(
OperationDocumentId documentId,
IOperationDocument document,
CancellationToken cancellationToken)
CancellationToken ct)
{
var blobClient = _blobContainerClient.GetBlobClient(BlobName(documentId));
await using var outStream = await blobClient
.OpenWriteAsync(true, _defaultBlobOpenWriteOptions, cancellationToken).ConfigureAwait(false);

await document.WriteToAsync(outStream, cancellationToken).ConfigureAwait(false);
await outStream.FlushAsync(cancellationToken).ConfigureAwait(false);
var blobClient = _client.GetBlobClient(documentId.ToString());
await using var outStream = await blobClient.OpenWriteAsync(true, _writeOptions, ct).ConfigureAwait(false);
await document.WriteToAsync(outStream, ct).ConfigureAwait(false);
await outStream.FlushAsync(ct).ConfigureAwait(false);
}

private string BlobName(OperationDocumentId documentId) => $"{_blobNamePrefix}{documentId.Value}{_blobNameSuffix}";
}

0 comments on commit cf4be7e

Please sign in to comment.