Skip to content

Commit

Permalink
Make a more generic cancelByteEventCallbacksForStreamInternal
Browse files Browse the repository at this point in the history
Summary:
With reliable resets, we'll want to cancel byte events where the offset is beyond what needs to be reliably delivered. Right now, we only have the functionality to cancel all byte events, or only those up to a certain point.

I'm adding a more generic `cancelByteEventCallbacksForStreamInternal` that takes in a function that can be used to determine which byte events need to be removed.

Reviewed By: afrind

Differential Revision: D67353621

fbshipit-source-id: fa7e758ee6cd40d247392108c28118b604ad6dbc
  • Loading branch information
Aman Sharma authored and facebook-github-bot committed Dec 23, 2024
1 parent 8a08b8e commit 2795a0d
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 62 deletions.
127 changes: 69 additions & 58 deletions quic/api/QuicTransportBaseLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,69 +417,21 @@ void QuicTransportBaseLite::cancelDeliveryCallbacksForStream(

void QuicTransportBaseLite::cancelByteEventCallbacksForStream(
const StreamId id,
const Optional<uint64_t>& offset) {
invokeForEachByteEventType(([this, id, &offset](const ByteEvent::Type type) {
cancelByteEventCallbacksForStream(type, id, offset);
}));
const Optional<uint64_t>& offsetUpperBound) {
invokeForEachByteEventType(
([this, id, &offsetUpperBound](const ByteEvent::Type type) {
cancelByteEventCallbacksForStream(type, id, offsetUpperBound);
}));
}

void QuicTransportBaseLite::cancelByteEventCallbacksForStream(
const ByteEvent::Type type,
const StreamId id,
const Optional<uint64_t>& offset) {
if (isReceivingStream(conn_->nodeType, id)) {
return;
}

auto& byteEventMap = getByteEventMap(type);
auto byteEventMapIt = byteEventMap.find(id);
if (byteEventMapIt == byteEventMap.end()) {
switch (type) {
case ByteEvent::Type::ACK:
conn_->streamManager->removeDeliverable(id);
break;
case ByteEvent::Type::TX:
conn_->streamManager->removeTx(id);
break;
}
return;
}
auto& streamByteEvents = byteEventMapIt->second;

// Callbacks are kept sorted by offset, so we can just walk the queue and
// invoke those with offset below provided offset.
while (!streamByteEvents.empty()) {
// decomposition not supported for xplat
const auto cbOffset = streamByteEvents.front().offset;
const auto callback = streamByteEvents.front().callback;
if (!offset.has_value() || cbOffset < *offset) {
streamByteEvents.pop_front();
ByteEventCancellation cancellation{id, cbOffset, type};
callback->onByteEventCanceled(cancellation);
if (closeState_ != CloseState::OPEN) {
// socket got closed - we can't use streamByteEvents anymore,
// closeImpl should take care of cleaning up any remaining callbacks
return;
}
} else {
// Only larger or equal offsets left, exit the loop.
break;
}
}

// Clean up state for this stream if no callbacks left to invoke.
if (streamByteEvents.empty()) {
switch (type) {
case ByteEvent::Type::ACK:
conn_->streamManager->removeDeliverable(id);
break;
case ByteEvent::Type::TX:
conn_->streamManager->removeTx(id);
break;
}
// The callback could have changed the map so erase by id.
byteEventMap.erase(id);
}
const Optional<uint64_t>& offsetUpperBound) {
cancelByteEventCallbacksForStreamInternal(
type, id, [&offsetUpperBound](uint64_t cbOffset) {
return !offsetUpperBound || cbOffset < *offsetUpperBound;
});
}

folly::Expected<folly::Unit, LocalErrorCode>
Expand Down Expand Up @@ -1682,6 +1634,65 @@ QuicTransportBaseLite::resetStreamInternal(
return folly::unit;
}

void QuicTransportBaseLite::cancelByteEventCallbacksForStreamInternal(
const ByteEvent::Type type,
const StreamId id,
const std::function<bool(uint64_t)>& offsetFilter) {
if (isReceivingStream(conn_->nodeType, id)) {
return;
}

auto& byteEventMap = getByteEventMap(type);
auto byteEventMapIt = byteEventMap.find(id);
if (byteEventMapIt == byteEventMap.end()) {
switch (type) {
case ByteEvent::Type::ACK:
conn_->streamManager->removeDeliverable(id);
break;
case ByteEvent::Type::TX:
conn_->streamManager->removeTx(id);
break;
}
return;
}
auto& streamByteEvents = byteEventMapIt->second;

// Callbacks are kept sorted by offset, so we can just walk the queue and
// invoke those with offset below provided offset.
while (!streamByteEvents.empty()) {
// decomposition not supported for xplat
const auto cbOffset = streamByteEvents.front().offset;
const auto callback = streamByteEvents.front().callback;
if (offsetFilter(cbOffset)) {
streamByteEvents.pop_front();
ByteEventCancellation cancellation{id, cbOffset, type};
callback->onByteEventCanceled(cancellation);
if (closeState_ != CloseState::OPEN) {
// socket got closed - we can't use streamByteEvents anymore,
// closeImpl should take care of cleaning up any remaining callbacks
return;
}
} else {
// Only larger or equal offsets left, exit the loop.
break;
}
}

// Clean up state for this stream if no callbacks left to invoke.
if (streamByteEvents.empty()) {
switch (type) {
case ByteEvent::Type::ACK:
conn_->streamManager->removeDeliverable(id);
break;
case ByteEvent::Type::TX:
conn_->streamManager->removeTx(id);
break;
}
// The callback could have changed the map so erase by id.
byteEventMap.erase(id);
}
}

void QuicTransportBaseLite::onSocketWritable() noexcept {
// Remove the writable callback.
socket_->pauseWrite();
Expand Down
14 changes: 10 additions & 4 deletions quic/api/QuicTransportBaseLite.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,22 +106,22 @@ class QuicTransportBaseLite : virtual public QuicSocketLite,
* Cancel byte event callbacks for given stream.
*
* If an offset is provided, cancels only callbacks with an offset less than
* or equal to the provided offset, otherwise cancels all callbacks.
* the provided offset, otherwise cancels all callbacks.
*/
void cancelByteEventCallbacksForStream(
const StreamId id,
const Optional<uint64_t>& offset = none) override;
const Optional<uint64_t>& offsetUpperBound = none) override;

/**
* Cancel byte event callbacks for given type and stream.
*
* If an offset is provided, cancels only callbacks with an offset less than
* or equal to the provided offset, otherwise cancels all callbacks.
* the provided offset, otherwise cancels all callbacks.
*/
void cancelByteEventCallbacksForStream(
const ByteEvent::Type type,
const StreamId id,
const Optional<uint64_t>& offset = none) override;
const Optional<uint64_t>& offsetUpperBound = none) override;

/**
* Register a byte event to be triggered when specified event type occurs for
Expand Down Expand Up @@ -618,6 +618,12 @@ class QuicTransportBaseLite : virtual public QuicSocketLite,
StreamId id,
ApplicationErrorCode errorCode);

// Only remove byte event callbacks if offsetFilter returns true.
void cancelByteEventCallbacksForStreamInternal(
const ByteEvent::Type type,
const StreamId id,
const std::function<bool(uint64_t)>& offsetFilter);

void onSocketWritable() noexcept override;

void handleNewStreamCallbacks(std::vector<StreamId>& newPeerStreams);
Expand Down

0 comments on commit 2795a0d

Please sign in to comment.