Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix for memory leaks and proper downstream connection close #11

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ struct SupportedCommands {
*/
static const std::string& quit() { CONSTRUCT_ON_FIRST_USE(std::string, "quit"); }

/**
* @return quit command
*/
static const std::string& exit() { CONSTRUCT_ON_FIRST_USE(std::string, "exit"); }

/**
* @return commands which alters the state of redis
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,7 @@ SplitRequestPtr InstanceImpl::makeRequest(Common::Redis::RespValuePtr&& request,
return nullptr;
}
// For transaction type commands and blockingcommands , quit needs to be handled from within the command handler
if (command_name == Common::Redis::SupportedCommands::quit() && !callbacks.transaction().active_) {
if ((command_name == Common::Redis::SupportedCommands::quit() || command_name == Common::Redis::SupportedCommands::exit()) && !callbacks.transaction().active_) {
callbacks.onQuit();
return nullptr;
}
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/network/redis_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ Network::FilterFactoryCb RedisProxyFilterConfigFactory::createFilterFactoryFromP
filter_config);
auto pubsub_cb_ptr = std::make_shared<PubsubCallbacks>(proxy_filter_shared);
proxy_filter_shared->setTransactionPubsubCallback(std::move(pubsub_cb_ptr));
ENVOY_LOG(debug, "redis: new proxy filter instance and creating pubsub callback");
filter_manager.addReadFilter(proxy_filter_shared);
};
}
Expand Down
3 changes: 2 additions & 1 deletion source/extensions/filters/network/redis_proxy/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class ProtocolOptionsConfigImpl : public Upstream::ProtocolOptionsConfig {
class RedisProxyFilterConfigFactory
: public Common::FactoryBase<
envoy::extensions::filters::network::redis_proxy::v3::RedisProxy,
envoy::extensions::filters::network::redis_proxy::v3::RedisProtocolOptions> {
envoy::extensions::filters::network::redis_proxy::v3::RedisProtocolOptions>,
public Logger::Loggable<Logger::Id::redis>{
public:
RedisProxyFilterConfigFactory() : FactoryBase(NetworkFilterNames::get().RedisProxy, true) {}

Expand Down
33 changes: 19 additions & 14 deletions source/extensions/filters/network/redis_proxy/proxy_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,21 @@ void ProxyFilter::onEvent(Network::ConnectionEvent event) {
}
if (event == Network::ConnectionEvent::RemoteClose ||
event == Network::ConnectionEvent::LocalClose) {
ENVOY_LOG(trace, "connection to redis proxy filter closed");
ENVOY_LOG(debug, "connection to redis proxy filter closed");
while (!pending_requests_.empty()) {
if (pending_requests_.front().request_handle_ != nullptr) {
pending_requests_.front().request_handle_->cancel();
}
pending_requests_.pop_front();
}
ENVOY_LOG(debug,"dereferencing pubsub callback and transaction on exit from proxy filter");
// As pubsubcallbaks is created in proxy filter irerespecive of its a pubsub command or not this needs to be cleared on exit from proxy filter
// decrement the reference to proxy filter
auto pubsub_cb = dynamic_cast<PubsubCallbacks*>(transaction_.getPubsubCallback().get());
if (pubsub_cb != nullptr){
pubsub_cb->clearParent();
}
transaction_.setPubsubCallback(nullptr);
transaction_.close();
}
}
Expand Down Expand Up @@ -195,10 +203,12 @@ void ProxyFilter::onAsyncResponse(Common::Redis::RespValuePtr&& value){
if (transaction_.should_close_ || transaction_.is_blocking_command_) {
//Close all upsteam clients and ref to pubsub callbacks if any
transaction_.close();
// decrement the reference to proxy filter
if (!transaction_.is_blocking_command_ ) {
transaction_.setPubsubCallback(nullptr);
}
}
//Need fix for drain close when pubsub chanels are active -- to be done
if(config_->drain_decision_.drainClose() &&
config_->runtime_.snapshot().featureEnabled(config_->redis_drain_close_runtime_key_, 100)) {
config_->stats_.downstream_cx_drain_close_.inc();
callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
}

}
Expand Down Expand Up @@ -238,8 +248,8 @@ void ProxyFilter::onResponse(PendingRequest& request, Common::Redis::RespValuePt
if (encoder_buffer_.length() > 0) {
callbacks_->connection().write(encoder_buffer_, false);
}

if (pending_requests_.empty() && connection_quit_) {
ENVOY_LOG(debug,"closing downstream connection as no pending requests and connection quit");
callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
connection_quit_ = false;
return;
Expand All @@ -254,17 +264,12 @@ void ProxyFilter::onResponse(PendingRequest& request, Common::Redis::RespValuePt

// Check if there is an active transaction that needs to be closed.
if ((transaction_.should_close_ && pending_requests_.empty()) || (transaction_.is_blocking_command_ && pending_requests_.empty())) {
if (transaction_.isSubscribedMode()){
// decrement the reference to proxy filter
auto pubsub_cb = dynamic_cast<PubsubCallbacks*>(transaction_.getPubsubCallback().get());
pubsub_cb->clearParent();
transaction_.setPubsubCallback(nullptr);
transaction_.subscribed_client_shard_index_ = -1;
}

transaction_.close();
//Not sure if for transaction mode also we need to close the connection in downstream
if (transaction_.isSubscribedMode()){
callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
transaction_.subscribed_client_shard_index_ = -1;
callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
}
connection_quit_ = false;
return;
Expand Down