Prefetching IAsyncEnumerable items? #1704
Unanswered
TonyValenti
asked this question in
Q&A
Replies: 2 comments
-
I have a |
Beta Was this translation helpful? Give feedback.
0 replies
-
I think this is the most important missing operator. Since it introduce concurrency it should probably go to public static IAsyncEnumerable<T> Prefetch<T>(this IAsyncEnumerable<T> source, int count)
{
if (source is null)
{
throw new ArgumentNullException(nameof(source));
}
if (count < 0)
{
throw new ArgumentOutOfRangeException(nameof(count));
}
if (count == 0)
{
// Hide source
return source.Skip(0);
}
return Core(source, count);
static async IAsyncEnumerable<T> Core(IAsyncEnumerable<T> s, int c, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var channel = Channel.CreateBounded<T>(new BoundedChannelOptions(c)
{
SingleReader = true,
SingleWriter = true
});
var readChannel = channel.Reader;
var writeChannel = channel.Writer;
FillChannel(writeChannel, s, cancellationToken);
while (await readChannel.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (readChannel.TryRead(out var item))
{
yield return item;
}
}
static async void FillChannel(ChannelWriter<T> writer, IAsyncEnumerable<T> s, CancellationToken token)
{
try
{
await foreach (var item in s.WithCancellation(token).ConfigureAwait(false))
{
await writer.WriteAsync(item).ConfigureAwait(false);
}
}
catch (Exception ex)
{
writer.Complete(ex);
return;
}
writer.Complete();
}
}
} The version of @akarnokd is certainly more tested. |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I have an IAsyncEnumerable that returns batches of items from a web service.
It takes ~5 seconds to return a batch and ~5 seconds to process a batch.
This means that it takes ~10 seconds for each batch to process.
However, this could be amortized down to ~5 seconds if it were possible for the batch enumerator to keep running in the background and store the items in buffer. Is there an API that does this?
I'm hoping to do something like this:
Beta Was this translation helpful? Give feedback.
All reactions