Skip to content

Commit

Permalink
Add prometheus metrics to resource cache (#1774)
Browse files Browse the repository at this point in the history
* Add prometheus metrics to resource cache

* Make resource cache tests more consistent when running single threaded
  • Loading branch information
javitonino authored Jan 26, 2024
1 parent 617596f commit f930a08
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 6 deletions.
2 changes: 2 additions & 0 deletions nucliadb_core/src/metrics/meters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ pub trait Meter: Send + Sync {
}
fn record_replicated_bytes(&self, value: u64);
fn record_replication_op(&self, key: replication::ReplicationOpsKey);
fn set_shard_cache_gauge(&self, value: i64);
fn record_shard_cache_eviction(&self);
}
3 changes: 3 additions & 0 deletions nucliadb_core/src/metrics/meters/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,7 @@ impl Meter for NoOpMeter {
}
fn record_replicated_bytes(&self, _value: u64) {}
fn record_replication_op(&self, _key: replication::ReplicationOpsKey) {}

fn set_shard_cache_gauge(&self, _value: i64) {}
fn record_shard_cache_eviction(&self) {}
}
16 changes: 15 additions & 1 deletion nucliadb_core/src/metrics/meters/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::metrics::metric::grpc_ops::{GrpcOpKey, GrpcOpMetric, GrpcOpValue};
use crate::metrics::metric::request_time::{RequestTimeKey, RequestTimeMetric, RequestTimeValue};
use crate::metrics::metric::tokio_runtime::TokioRuntimeObserver;
use crate::metrics::metric::tokio_tasks::TokioTasksObserver;
use crate::metrics::metric::{grpc_ops, replication, request_time};
use crate::metrics::metric::{grpc_ops, replication, request_time, shard_cache};
use crate::metrics::task_monitor::{Monitor, TaskId};
use crate::tracing::{debug, error};
use crate::NodeResult;
Expand All @@ -38,6 +38,8 @@ pub struct PrometheusMeter {
tokio_runtime_observer: TokioRuntimeObserver,
replicated_bytes_metric: replication::ReplicatedBytesMetric,
replication_ops_metric: replication::ReplicationOpsMetric,
open_shards_metric: shard_cache::OpenShardsMetric,
evicted_shards_metric: shard_cache::EvictedShardsMetric,
}

impl Default for PrometheusMeter {
Expand Down Expand Up @@ -82,6 +84,14 @@ impl Meter for PrometheusMeter {
fn record_replication_op(&self, key: replication::ReplicationOpsKey) {
self.replication_ops_metric.get_or_create(&key).inc();
}

fn set_shard_cache_gauge(&self, value: i64) {
self.open_shards_metric.get_or_create(&()).set(value);
}

fn record_shard_cache_eviction(&self) {
self.evicted_shards_metric.get_or_create(&()).inc();
}
}

impl PrometheusMeter {
Expand All @@ -92,6 +102,8 @@ impl PrometheusMeter {
let grpc_op_metric = grpc_ops::register_grpc_ops(&mut registry);
let replicated_bytes_metric = replication::register_replicated_bytes_ops(&mut registry);
let replication_ops_metric = replication::register_replication_operations(&mut registry);
let open_shards_metric = shard_cache::register_open_shards_metric(&mut registry);
let evicted_shards_metric = shard_cache::register_evicted_shards_metric(&mut registry);

let prefixed_subregistry = registry.sub_registry_with_prefix("nucliadb_node");
let tokio_tasks_observer = TokioTasksObserver::new(prefixed_subregistry);
Expand All @@ -105,6 +117,8 @@ impl PrometheusMeter {
tokio_runtime_observer,
replicated_bytes_metric,
replication_ops_metric,
open_shards_metric,
evicted_shards_metric,
}
}
}
1 change: 1 addition & 0 deletions nucliadb_core/src/metrics/metric/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@
pub mod grpc_ops;
pub mod replication;
pub mod request_time;
pub mod shard_cache;
pub mod tokio_runtime;
pub mod tokio_tasks;
47 changes: 47 additions & 0 deletions nucliadb_core/src/metrics/metric/shard_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (C) 2021 Bosutech XXI S.L.
//
// nucliadb is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;

pub type OpenShardsMetric = Family<(), Gauge>;

pub fn register_open_shards_metric(registry: &mut Registry) -> OpenShardsMetric {
let open_shards = OpenShardsMetric::default();
registry.register(
"nucliadb_shard_cache_open",
"Open shards",
open_shards.clone(),
);
open_shards
}

pub type EvictedShardsMetric = Family<(), Counter>;

pub fn register_evicted_shards_metric(registry: &mut Registry) -> EvictedShardsMetric {
let evicted_shards = EvictedShardsMetric::default();
registry.register(
"nucliadb_shard_cache_evicted",
"Evicted shards",
evicted_shards.clone(),
);
evicted_shards
}
1 change: 0 additions & 1 deletion nucliadb_node/src/grpc/grpc_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use nucliadb_core::prelude::{DocumentIterator, ParagraphIterator};
use nucliadb_core::protos::node_reader_server::NodeReader;
use nucliadb_core::protos::*;
use nucliadb_core::tracing::{info_span, Span};
use nucliadb_core::NodeResult;
use Shard as ShardPB;

use crate::settings::Settings;
Expand Down
2 changes: 1 addition & 1 deletion nucliadb_node/src/grpc/grpc_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use nucliadb_core::protos::{
ShardId, ShardIds, VectorSetId, VectorSetList,
};
use nucliadb_core::tracing::{self, Span, *};
use nucliadb_core::{Channel, NodeResult};
use nucliadb_core::Channel;
use tokio::sync::mpsc::UnboundedSender;
use tonic::{Request, Response, Status};

Expand Down
10 changes: 7 additions & 3 deletions nucliadb_node/src/shards/providers/shard_cache/resource_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::num::NonZeroUsize;
use std::sync::{Arc, Condvar, Mutex, Weak};

use lru::LruCache;
use nucliadb_core::metrics;

// Classic implementation of a binary semaphore, used to be able to
// block while an entry is being loaded by another thread, and get a
Expand Down Expand Up @@ -162,11 +163,13 @@ where K: Eq + Hash + Clone + std::fmt::Debug
self.evict();
}
self.live.push(k.clone(), Arc::clone(v));
metrics::get_metrics().set_shard_cache_gauge(self.live.len() as i64);
}

fn evict(&mut self) {
if let Some((evicted_k, evicted_v)) = self.live.pop_lru() {
self.eviction.insert(evicted_k, Arc::downgrade(&evicted_v));
metrics::get_metrics().record_shard_cache_eviction();
}
}
}
Expand Down Expand Up @@ -358,15 +361,16 @@ mod tests {
let load_thread = scope.spawn(move |_| {
// Sleep a little bit to ensure the waiter actually waits
sleep(Duration::from_millis(5));
cache.lock().unwrap().loaded(load_guard, &Arc::new(0));
let mut unlocked_cache = cache.lock().unwrap();
tx.send(0).unwrap();
unlocked_cache.loaded(load_guard, &Arc::new(0));
});

// Both threads finished without panic/failing assert
wait_thread.join().unwrap();
load_thread.join().unwrap();

// Load thread finished earlier
// Load thread stores before wait thread waken up
assert_eq!(rx.recv().unwrap(), 0);
assert_eq!(rx.recv().unwrap(), 1);
})
Expand Down Expand Up @@ -409,8 +413,8 @@ mod tests {
sleep(Duration::from_millis(5));
// Fail to call `loaded`. This should drop the load_guard
// which will mark the load as failed.
drop(load_guard);
tx.send(0).unwrap();
drop(load_guard);
});

// Both threads finished without panic/failing assert
Expand Down

3 comments on commit f930a08

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: f930a08 Previous: d4afd82 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 12967.225074848006 iter/sec (stddev: 2.2817610124921905e-7) 13028.533525895236 iter/sec (stddev: 4.192637045977425e-7) 1.00

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: f930a08 Previous: d4afd82 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 12888.267496857517 iter/sec (stddev: 5.867173056171454e-7) 13028.533525895236 iter/sec (stddev: 4.192637045977425e-7) 1.01

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: f930a08 Previous: d4afd82 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 13149.090243804827 iter/sec (stddev: 8.766350580772966e-7) 13028.533525895236 iter/sec (stddev: 4.192637045977425e-7) 0.99

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.