diff --git a/source/extensions/filters/http/istio_stats/istio_stats.cc b/source/extensions/filters/http/istio_stats/istio_stats.cc index fd9e74ed477..e3bb3d55b14 100644 --- a/source/extensions/filters/http/istio_stats/istio_stats.cc +++ b/source/extensions/filters/http/istio_stats/istio_stats.cc @@ -20,6 +20,7 @@ #include "envoy/registry/registry.h" #include "envoy/server/factory_context.h" #include "envoy/singleton/manager.h" +#include "envoy/thread_local/thread_local.h" #include "extensions/common/metadata_object.h" #include "parser/parser.h" #include "source/common/grpc/common.h" @@ -313,7 +314,7 @@ struct Context : public Singleton::Instance { using ContextSharedPtr = std::shared_ptr; -SINGLETON_MANAGER_REGISTRATION(Context) +SINGLETON_MANAGER_REGISTRATION(istio_stats_filter_context) using google::api::expr::runtime::CelValue; @@ -418,21 +419,21 @@ struct MetricOverrides : public Logger::Loggable { // periodically to replace the current scope. // // The replaced stats scope is deleted gracefully after a minimum of 1s delay -// for two reasons: -// -// 1. Stats flushing is asynchronous and the data may be lost if not flushed -// before the deletion (see stats_flush_interval). -// -// 2. The implementation avoids locking by releasing a raw pointer to workers. -// When the rotation happens on the main, the raw pointer may still be in-use -// by workers for a short duration. +// because of stats flushing is asynchronous and the data may be lost if not +// flushed before the deletion (see stats_flush_interval). class RotatingScope : public Logger::Loggable { public: RotatingScope(Server::Configuration::FactoryContext& factory_context, uint64_t rotate_interval_ms, uint64_t delete_interval_ms) : parent_scope_(factory_context.scope()), active_scope_(parent_scope_.createScope("")), - raw_scope_(active_scope_.get()), rotate_interval_ms_(rotate_interval_ms), + tls_scope_(factory_context.serverFactoryContext().threadLocal()), + rotate_interval_ms_(rotate_interval_ms), delete_interval_ms_(delete_interval_ms) { + + tls_scope_.set([&scope = *active_scope_](Event::Dispatcher&){ + return std::make_shared(scope); + }); + if (rotate_interval_ms_ > 0) { ASSERT(delete_interval_ms_ < rotate_interval_ms_); ASSERT(delete_interval_ms_ >= 1000); @@ -452,36 +453,58 @@ class RotatingScope : public Logger::Loggable { delete_timer_.reset(); } } - Stats::Scope* scope() { return raw_scope_.load(); } + Stats::Scope& scope() { return tls_scope_->_scope; } private: + struct TlsCachedScope:ThreadLocal::ThreadLocalObject{ + TlsCachedScope(Stats::Scope& scope):_scope(scope) {}; + std::reference_wrapper _scope; + }; + void onRotate() { ENVOY_LOG(info, "Rotating active Istio stats scope after {}ms.", rotate_interval_ms_); draining_scope_ = active_scope_; - delete_timer_->enableTimer(std::chrono::milliseconds(delete_interval_ms_)); active_scope_ = parent_scope_.createScope(""); - raw_scope_.store(active_scope_.get()); - rotate_timer_->enableTimer(std::chrono::milliseconds(rotate_interval_ms_)); + tls_scope_.runOnAllThreads( + [&scope = *active_scope_](OptRef tls_cache) { + tls_cache->_scope = scope; + }, + // Start the delete and rotate timer after the new scope has been propagated to all worker threads. + // The RotatingScope instance can go away before the dispatcher has a chance to execute the callback + // and the still_alive shared_ptr will be deallocated when the current instance is deallocated. + // We rely on a weak_ptr to still_alive flag to determine if the instance is still valid. + [this, maybe_still_alive = std::weak_ptr(still_alive_)]() -> void { + if(!maybe_still_alive.expired()){ + delete_timer_->enableTimer(std::chrono::milliseconds(delete_interval_ms_)); + rotate_timer_->enableTimer(std::chrono::milliseconds(rotate_interval_ms_)); + } + }); } void onDelete() { ENVOY_LOG(info, "Deleting draining Istio stats scope after {}ms.", delete_interval_ms_); draining_scope_.reset(); } + Stats::Scope& parent_scope_; Stats::ScopeSharedPtr active_scope_; - std::atomic raw_scope_; Stats::ScopeSharedPtr draining_scope_{nullptr}; + ThreadLocal::TypedSlot tls_scope_; const uint64_t rotate_interval_ms_; const uint64_t delete_interval_ms_; Event::TimerPtr rotate_timer_{nullptr}; Event::TimerPtr delete_timer_{nullptr}; + + // A sentinel shared_ptr used for keeping track of whether the RotatingContext is still alive. + // It is only held by a weak reference in the callback that will be invoked after the new active + // scope has been propagated to all worker threads. + std::shared_ptr still_alive_{std::make_shared(true)}; }; struct Config : public Logger::Loggable { Config(const stats::PluginConfig& proto_config, Server::Configuration::FactoryContext& factory_context) : context_(factory_context.serverFactoryContext().singletonManager().getTyped( - SINGLETON_MANAGER_REGISTERED_NAME(Context), + SINGLETON_MANAGER_REGISTERED_NAME(istio_stats_filter_context), [&factory_context] { return std::make_shared( factory_context.serverFactoryContext().scope().symbolTable(), @@ -514,7 +537,7 @@ struct Config : public Logger::Loggable { break; } if (proto_config.metrics_size() > 0 || proto_config.definitions_size() > 0) { - metric_overrides_ = std::make_unique(context_, scope()->symbolTable()); + metric_overrides_ = std::make_unique(context_, scope().symbolTable()); for (const auto& definition : proto_config.definitions()) { const auto& it = context_->all_metrics_.find(definition.name()); if (it != context_->all_metrics_.end()) { @@ -698,12 +721,12 @@ struct Config : public Logger::Loggable { return; } auto new_tags = parent_.metric_overrides_->overrideTags(metric, tags, expr_values_); - Stats::Utility::counterFromStatNames(*parent_.scope(), + Stats::Utility::counterFromStatNames(parent_.scope(), {parent_.context_->stat_namespace_, metric}, new_tags) .add(amount); return; } - Stats::Utility::counterFromStatNames(*parent_.scope(), + Stats::Utility::counterFromStatNames(parent_.scope(), {parent_.context_->stat_namespace_, metric}, tags) .add(amount); } @@ -717,12 +740,12 @@ struct Config : public Logger::Loggable { } auto new_tags = parent_.metric_overrides_->overrideTags(metric, tags, expr_values_); Stats::Utility::histogramFromStatNames( - *parent_.scope(), {parent_.context_->stat_namespace_, metric}, unit, new_tags) + parent_.scope(), {parent_.context_->stat_namespace_, metric}, unit, new_tags) .recordValue(value); return; } Stats::Utility::histogramFromStatNames( - *parent_.scope(), {parent_.context_->stat_namespace_, metric}, unit, tags) + parent_.scope(), {parent_.context_->stat_namespace_, metric}, unit, tags) .recordValue(value); } @@ -735,17 +758,17 @@ struct Config : public Logger::Loggable { switch (metric.type_) { case MetricOverrides::MetricType::Counter: Stats::Utility::counterFromStatNames( - *parent_.scope(), {parent_.context_->stat_namespace_, metric.name_}, tags) + parent_.scope(), {parent_.context_->stat_namespace_, metric.name_}, tags) .add(amount); break; case MetricOverrides::MetricType::Histogram: Stats::Utility::histogramFromStatNames( - *parent_.scope(), {parent_.context_->stat_namespace_, metric.name_}, + parent_.scope(), {parent_.context_->stat_namespace_, metric.name_}, Stats::Histogram::Unit::Bytes, tags) .recordValue(amount); break; case MetricOverrides::MetricType::Gauge: - Stats::Utility::gaugeFromStatNames(*parent_.scope(), + Stats::Utility::gaugeFromStatNames(parent_.scope(), {parent_.context_->stat_namespace_, metric.name_}, Stats::Gauge::ImportMode::Accumulate, tags) .set(amount); @@ -769,14 +792,14 @@ struct Config : public Logger::Loggable { tags.push_back({context_->tag_, context_->istio_version_.empty() ? context_->unknown_ : context_->istio_version_}); - Stats::Utility::gaugeFromStatNames(*scope(), + Stats::Utility::gaugeFromStatNames(scope(), {context_->stat_namespace_, context_->istio_build_}, Stats::Gauge::ImportMode::Accumulate, tags) .set(1); } Reporter reporter() const { return reporter_; } - Stats::Scope* scope() { return scope_.scope(); } + Stats::Scope& scope() { return scope_.scope(); } ContextSharedPtr context_; RotatingScope scope_; @@ -795,7 +818,7 @@ class IstioStatsFilter : public Http::PassThroughFilter, public Network::ConnectionCallbacks { public: IstioStatsFilter(ConfigSharedPtr config) - : config_(config), context_(*config->context_), pool_(config->scope()->symbolTable()), + : config_(config), context_(*config->context_), pool_(config->scope().symbolTable()), stream_(*config_, pool_) { tags_.reserve(25); switch (config_->reporter()) {