diff --git a/source/extensions/filters/network/common/redis/supported_commands.h b/source/extensions/filters/network/common/redis/supported_commands.h index e6b04283e15f..28b9552c8bb7 100644 --- a/source/extensions/filters/network/common/redis/supported_commands.h +++ b/source/extensions/filters/network/common/redis/supported_commands.h @@ -131,6 +131,11 @@ struct SupportedCommands { */ static const std::string& quit() { CONSTRUCT_ON_FIRST_USE(std::string, "quit"); } + /** + * @return quit command + */ + static const std::string& exit() { CONSTRUCT_ON_FIRST_USE(std::string, "exit"); } + /** * @return commands which alters the state of redis */ diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc index 3121dbdd3844..dff57fbb1496 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc @@ -1043,7 +1043,7 @@ SplitRequestPtr InstanceImpl::makeRequest(Common::Redis::RespValuePtr&& request, return nullptr; } // For transaction type commands and blockingcommands , quit needs to be handled from within the command handler - if (command_name == Common::Redis::SupportedCommands::quit() && !callbacks.transaction().active_) { + if ((command_name == Common::Redis::SupportedCommands::quit() || command_name == Common::Redis::SupportedCommands::exit()) && !callbacks.transaction().active_) { callbacks.onQuit(); return nullptr; } diff --git a/source/extensions/filters/network/redis_proxy/config.cc b/source/extensions/filters/network/redis_proxy/config.cc index 51ed2154bbc8..acadcd44b172 100644 --- a/source/extensions/filters/network/redis_proxy/config.cc +++ b/source/extensions/filters/network/redis_proxy/config.cc @@ -101,6 +101,7 @@ Network::FilterFactoryCb RedisProxyFilterConfigFactory::createFilterFactoryFromP filter_config); auto pubsub_cb_ptr = std::make_shared(proxy_filter_shared); proxy_filter_shared->setTransactionPubsubCallback(std::move(pubsub_cb_ptr)); + ENVOY_LOG(debug, "redis: new proxy filter instance and creating pubsub callback"); filter_manager.addReadFilter(proxy_filter_shared); }; } diff --git a/source/extensions/filters/network/redis_proxy/config.h b/source/extensions/filters/network/redis_proxy/config.h index fc32de5163bc..0101a5f31052 100644 --- a/source/extensions/filters/network/redis_proxy/config.h +++ b/source/extensions/filters/network/redis_proxy/config.h @@ -65,7 +65,8 @@ class ProtocolOptionsConfigImpl : public Upstream::ProtocolOptionsConfig { class RedisProxyFilterConfigFactory : public Common::FactoryBase< envoy::extensions::filters::network::redis_proxy::v3::RedisProxy, - envoy::extensions::filters::network::redis_proxy::v3::RedisProtocolOptions> { + envoy::extensions::filters::network::redis_proxy::v3::RedisProtocolOptions>, + public Logger::Loggable{ public: RedisProxyFilterConfigFactory() : FactoryBase(NetworkFilterNames::get().RedisProxy, true) {} diff --git a/source/extensions/filters/network/redis_proxy/proxy_filter.cc b/source/extensions/filters/network/redis_proxy/proxy_filter.cc index 871cabd12fe2..1316aefd5671 100644 --- a/source/extensions/filters/network/redis_proxy/proxy_filter.cc +++ b/source/extensions/filters/network/redis_proxy/proxy_filter.cc @@ -113,13 +113,21 @@ void ProxyFilter::onEvent(Network::ConnectionEvent event) { } if (event == Network::ConnectionEvent::RemoteClose || event == Network::ConnectionEvent::LocalClose) { - ENVOY_LOG(trace, "connection to redis proxy filter closed"); + ENVOY_LOG(debug, "connection to redis proxy filter closed"); while (!pending_requests_.empty()) { if (pending_requests_.front().request_handle_ != nullptr) { pending_requests_.front().request_handle_->cancel(); } pending_requests_.pop_front(); } + ENVOY_LOG(debug,"dereferencing pubsub callback and transaction on exit from proxy filter"); + // As pubsubcallbaks is created in proxy filter irerespecive of its a pubsub command or not this needs to be cleared on exit from proxy filter + // decrement the reference to proxy filter + auto pubsub_cb = dynamic_cast(transaction_.getPubsubCallback().get()); + if (pubsub_cb != nullptr){ + pubsub_cb->clearParent(); + } + transaction_.setPubsubCallback(nullptr); transaction_.close(); } } @@ -195,10 +203,12 @@ void ProxyFilter::onAsyncResponse(Common::Redis::RespValuePtr&& value){ if (transaction_.should_close_ || transaction_.is_blocking_command_) { //Close all upsteam clients and ref to pubsub callbacks if any transaction_.close(); - // decrement the reference to proxy filter - if (!transaction_.is_blocking_command_ ) { - transaction_.setPubsubCallback(nullptr); - } + } + //Need fix for drain close when pubsub chanels are active -- to be done + if(config_->drain_decision_.drainClose() && + config_->runtime_.snapshot().featureEnabled(config_->redis_drain_close_runtime_key_, 100)) { + config_->stats_.downstream_cx_drain_close_.inc(); + callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); } } @@ -238,8 +248,8 @@ void ProxyFilter::onResponse(PendingRequest& request, Common::Redis::RespValuePt if (encoder_buffer_.length() > 0) { callbacks_->connection().write(encoder_buffer_, false); } - if (pending_requests_.empty() && connection_quit_) { + ENVOY_LOG(debug,"closing downstream connection as no pending requests and connection quit"); callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); connection_quit_ = false; return; @@ -254,17 +264,12 @@ void ProxyFilter::onResponse(PendingRequest& request, Common::Redis::RespValuePt // Check if there is an active transaction that needs to be closed. if ((transaction_.should_close_ && pending_requests_.empty()) || (transaction_.is_blocking_command_ && pending_requests_.empty())) { - if (transaction_.isSubscribedMode()){ - // decrement the reference to proxy filter - auto pubsub_cb = dynamic_cast(transaction_.getPubsubCallback().get()); - pubsub_cb->clearParent(); - transaction_.setPubsubCallback(nullptr); - transaction_.subscribed_client_shard_index_ = -1; - } + transaction_.close(); //Not sure if for transaction mode also we need to close the connection in downstream if (transaction_.isSubscribedMode()){ - callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); + transaction_.subscribed_client_shard_index_ = -1; + callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); } connection_quit_ = false; return;