diff --git a/nucliadb_core/src/metrics/meters/mod.rs b/nucliadb_core/src/metrics/meters/mod.rs index 5aa7dfefc8..f70ed652c4 100644 --- a/nucliadb_core/src/metrics/meters/mod.rs +++ b/nucliadb_core/src/metrics/meters/mod.rs @@ -40,4 +40,6 @@ pub trait Meter: Send + Sync { } fn record_replicated_bytes(&self, value: u64); fn record_replication_op(&self, key: replication::ReplicationOpsKey); + fn set_shard_cache_gauge(&self, value: i64); + fn record_shard_cache_eviction(&self); } diff --git a/nucliadb_core/src/metrics/meters/noop.rs b/nucliadb_core/src/metrics/meters/noop.rs index 0e40b5d622..074e0001b8 100644 --- a/nucliadb_core/src/metrics/meters/noop.rs +++ b/nucliadb_core/src/metrics/meters/noop.rs @@ -34,4 +34,7 @@ impl Meter for NoOpMeter { } fn record_replicated_bytes(&self, _value: u64) {} fn record_replication_op(&self, _key: replication::ReplicationOpsKey) {} + + fn set_shard_cache_gauge(&self, _value: i64) {} + fn record_shard_cache_eviction(&self) {} } diff --git a/nucliadb_core/src/metrics/meters/prometheus.rs b/nucliadb_core/src/metrics/meters/prometheus.rs index 2c31628593..e33a036ee4 100644 --- a/nucliadb_core/src/metrics/meters/prometheus.rs +++ b/nucliadb_core/src/metrics/meters/prometheus.rs @@ -25,7 +25,7 @@ use crate::metrics::metric::grpc_ops::{GrpcOpKey, GrpcOpMetric, GrpcOpValue}; use crate::metrics::metric::request_time::{RequestTimeKey, RequestTimeMetric, RequestTimeValue}; use crate::metrics::metric::tokio_runtime::TokioRuntimeObserver; use crate::metrics::metric::tokio_tasks::TokioTasksObserver; -use crate::metrics::metric::{grpc_ops, replication, request_time}; +use crate::metrics::metric::{grpc_ops, replication, request_time, shard_cache}; use crate::metrics::task_monitor::{Monitor, TaskId}; use crate::tracing::{debug, error}; use crate::NodeResult; @@ -38,6 +38,8 @@ pub struct PrometheusMeter { tokio_runtime_observer: TokioRuntimeObserver, replicated_bytes_metric: replication::ReplicatedBytesMetric, replication_ops_metric: replication::ReplicationOpsMetric, + open_shards_metric: shard_cache::OpenShardsMetric, + evicted_shards_metric: shard_cache::EvictedShardsMetric, } impl Default for PrometheusMeter { @@ -82,6 +84,14 @@ impl Meter for PrometheusMeter { fn record_replication_op(&self, key: replication::ReplicationOpsKey) { self.replication_ops_metric.get_or_create(&key).inc(); } + + fn set_shard_cache_gauge(&self, value: i64) { + self.open_shards_metric.get_or_create(&()).set(value); + } + + fn record_shard_cache_eviction(&self) { + self.evicted_shards_metric.get_or_create(&()).inc(); + } } impl PrometheusMeter { @@ -92,6 +102,8 @@ impl PrometheusMeter { let grpc_op_metric = grpc_ops::register_grpc_ops(&mut registry); let replicated_bytes_metric = replication::register_replicated_bytes_ops(&mut registry); let replication_ops_metric = replication::register_replication_operations(&mut registry); + let open_shards_metric = shard_cache::register_open_shards_metric(&mut registry); + let evicted_shards_metric = shard_cache::register_evicted_shards_metric(&mut registry); let prefixed_subregistry = registry.sub_registry_with_prefix("nucliadb_node"); let tokio_tasks_observer = TokioTasksObserver::new(prefixed_subregistry); @@ -105,6 +117,8 @@ impl PrometheusMeter { tokio_runtime_observer, replicated_bytes_metric, replication_ops_metric, + open_shards_metric, + evicted_shards_metric, } } } diff --git a/nucliadb_core/src/metrics/metric/mod.rs b/nucliadb_core/src/metrics/metric/mod.rs index 9eb2c90f8e..d88fd871af 100644 --- a/nucliadb_core/src/metrics/metric/mod.rs +++ b/nucliadb_core/src/metrics/metric/mod.rs @@ -27,5 +27,6 @@ pub mod grpc_ops; pub mod replication; pub mod request_time; +pub mod shard_cache; pub mod tokio_runtime; pub mod tokio_tasks; diff --git a/nucliadb_core/src/metrics/metric/shard_cache.rs b/nucliadb_core/src/metrics/metric/shard_cache.rs new file mode 100644 index 0000000000..8e4d4cb72b --- /dev/null +++ b/nucliadb_core/src/metrics/metric/shard_cache.rs @@ -0,0 +1,47 @@ +// Copyright (C) 2021 Bosutech XXI S.L. +// +// nucliadb is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at info@nuclia.com. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use prometheus_client::metrics::counter::Counter; +use prometheus_client::metrics::family::Family; +use prometheus_client::metrics::gauge::Gauge; +use prometheus_client::registry::Registry; + +pub type OpenShardsMetric = Family<(), Gauge>; + +pub fn register_open_shards_metric(registry: &mut Registry) -> OpenShardsMetric { + let open_shards = OpenShardsMetric::default(); + registry.register( + "nucliadb_shard_cache_open", + "Open shards", + open_shards.clone(), + ); + open_shards +} + +pub type EvictedShardsMetric = Family<(), Counter>; + +pub fn register_evicted_shards_metric(registry: &mut Registry) -> EvictedShardsMetric { + let evicted_shards = EvictedShardsMetric::default(); + registry.register( + "nucliadb_shard_cache_evicted", + "Evicted shards", + evicted_shards.clone(), + ); + evicted_shards +} diff --git a/nucliadb_node/src/grpc/grpc_reader.rs b/nucliadb_node/src/grpc/grpc_reader.rs index e7bde881e5..d8b86f9066 100644 --- a/nucliadb_node/src/grpc/grpc_reader.rs +++ b/nucliadb_node/src/grpc/grpc_reader.rs @@ -24,7 +24,6 @@ use nucliadb_core::prelude::{DocumentIterator, ParagraphIterator}; use nucliadb_core::protos::node_reader_server::NodeReader; use nucliadb_core::protos::*; use nucliadb_core::tracing::{info_span, Span}; -use nucliadb_core::NodeResult; use Shard as ShardPB; use crate::settings::Settings; diff --git a/nucliadb_node/src/grpc/grpc_writer.rs b/nucliadb_node/src/grpc/grpc_writer.rs index 9ed1abebd5..0533a82848 100644 --- a/nucliadb_node/src/grpc/grpc_writer.rs +++ b/nucliadb_node/src/grpc/grpc_writer.rs @@ -27,7 +27,7 @@ use nucliadb_core::protos::{ ShardId, ShardIds, VectorSetId, VectorSetList, }; use nucliadb_core::tracing::{self, Span, *}; -use nucliadb_core::{Channel, NodeResult}; +use nucliadb_core::Channel; use tokio::sync::mpsc::UnboundedSender; use tonic::{Request, Response, Status}; diff --git a/nucliadb_node/src/shards/providers/shard_cache/resource_cache.rs b/nucliadb_node/src/shards/providers/shard_cache/resource_cache.rs index a17858441c..88166a3ccb 100644 --- a/nucliadb_node/src/shards/providers/shard_cache/resource_cache.rs +++ b/nucliadb_node/src/shards/providers/shard_cache/resource_cache.rs @@ -23,6 +23,7 @@ use std::num::NonZeroUsize; use std::sync::{Arc, Condvar, Mutex, Weak}; use lru::LruCache; +use nucliadb_core::metrics; // Classic implementation of a binary semaphore, used to be able to // block while an entry is being loaded by another thread, and get a @@ -162,11 +163,13 @@ where K: Eq + Hash + Clone + std::fmt::Debug self.evict(); } self.live.push(k.clone(), Arc::clone(v)); + metrics::get_metrics().set_shard_cache_gauge(self.live.len() as i64); } fn evict(&mut self) { if let Some((evicted_k, evicted_v)) = self.live.pop_lru() { self.eviction.insert(evicted_k, Arc::downgrade(&evicted_v)); + metrics::get_metrics().record_shard_cache_eviction(); } } } @@ -358,15 +361,16 @@ mod tests { let load_thread = scope.spawn(move |_| { // Sleep a little bit to ensure the waiter actually waits sleep(Duration::from_millis(5)); - cache.lock().unwrap().loaded(load_guard, &Arc::new(0)); + let mut unlocked_cache = cache.lock().unwrap(); tx.send(0).unwrap(); + unlocked_cache.loaded(load_guard, &Arc::new(0)); }); // Both threads finished without panic/failing assert wait_thread.join().unwrap(); load_thread.join().unwrap(); - // Load thread finished earlier + // Load thread stores before wait thread waken up assert_eq!(rx.recv().unwrap(), 0); assert_eq!(rx.recv().unwrap(), 1); }) @@ -409,8 +413,8 @@ mod tests { sleep(Duration::from_millis(5)); // Fail to call `loaded`. This should drop the load_guard // which will mark the load as failed. - drop(load_guard); tx.send(0).unwrap(); + drop(load_guard); }); // Both threads finished without panic/failing assert