Skip to content

Commit

Permalink
Use monotonic timers for metrics (#1739)
Browse files Browse the repository at this point in the history
Use monotonic timers for metrics
  • Loading branch information
javitonino authored Jan 19, 2024
1 parent 5d8b376 commit 02b3992
Show file tree
Hide file tree
Showing 15 changed files with 321 additions and 378 deletions.
22 changes: 9 additions & 13 deletions nucliadb_node/src/grpc/middleware/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::task::{Context, Poll};
use std::time::SystemTime;
use std::time::Instant;

use futures::future::BoxFuture;
use hyper::Body;
use nucliadb_core::metrics;
use nucliadb_core::metrics::grpc_ops::GrpcOpKey;
use nucliadb_core::tracing::warn;
use tonic::body::BoxBody;
use tower::{Layer, Service};

Expand Down Expand Up @@ -71,7 +70,7 @@ where
let mut inner = std::mem::replace(&mut self.inner, clone);

Box::pin(async move {
let start = SystemTime::now();
let start = Instant::now();
let meter = metrics::get_metrics();

let grpc_method = req.uri().path().to_string();
Expand All @@ -86,16 +85,13 @@ where
None => call.await,
};

if let Ok(grpc_call_duration) = start.elapsed() {
meter.record_grpc_op(
GrpcOpKey {
method: grpc_method,
},
grpc_call_duration.as_secs_f64(),
);
} else {
warn!("Failed to observe gRPC call duration for: {grpc_method}");
}
let grpc_call_duration = start.elapsed();
meter.record_grpc_op(
GrpcOpKey {
method: grpc_method,
},
grpc_call_duration.as_secs_f64(),
);

response
})
Expand Down
7 changes: 2 additions & 5 deletions nucliadb_node/src/replication/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ pub async fn connect_to_primary_and_replicate(
let no_shards_to_sync = replication_state.shard_states.is_empty();
let no_shards_to_remove = replication_state.shards_to_remove.is_empty();

let start = std::time::SystemTime::now();
let start = std::time::Instant::now();
for shard_state in replication_state.shard_states {
if shutdown_notified.load(std::sync::atomic::Ordering::Relaxed) {
return Ok(());
Expand Down Expand Up @@ -349,10 +349,7 @@ pub async fn connect_to_primary_and_replicate(
//
// 1. If we're healthy, we'll sleep for a while and check again.
// 2. If backed up replicating, we'll try replicating again immediately and check again.
let elapsed = start
.elapsed()
.map(|elapsed| elapsed.as_secs_f64())
.expect("Error getting elapsed time");
let elapsed = start.elapsed().as_secs_f64();
if elapsed < settings.replication_healthy_delay() as f64 {
// only update healthy marker if we're up-to-date in the configured healthy time
repl_health_mng.update_healthy();
Expand Down
102 changes: 47 additions & 55 deletions nucliadb_paragraphs/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//

use std::fmt::Debug;
use std::time::SystemTime;
use std::time::Instant;

use nucliadb_core::prelude::*;
use nucliadb_core::protos::order_by::{OrderField, OrderType};
Expand Down Expand Up @@ -70,46 +70,42 @@ impl ParagraphReader for ParagraphReaderService {
#[measure(actor = "paragraphs", metric = "suggest")]
#[tracing::instrument(skip_all)]
fn suggest(&self, request: &SuggestRequest) -> NodeResult<Self::Response> {
let time = SystemTime::now();
let time = Instant::now();
let id = Some(&request.shard);

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Creating query: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Creating query: starts at {v} ms");

let parser = QueryParser::for_index(&self.index, vec![self.schema.text]);
let text = self.adapt_text(&parser, &request.body);
let (original, termc, fuzzied) =
suggest_query(&parser, &text, request, &self.schema, FUZZY_DISTANCE);
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Creating query: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Creating query: ends at {v} ms");

let v = time.elapsed().as_millis();
debug!("{id:?} - Searching: starts at {v} ms");

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Searching: starts at {v} ms");
}
let searcher = self.reader.searcher();
let topdocs = TopDocs::with_limit(NUMBER_OF_RESULTS_SUGGEST);
let mut results = searcher.search(&original, &topdocs)?;
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Searching: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Searching: ends at {v} ms");

if results.is_empty() {
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Trying fuzzy: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Trying fuzzy: starts at {v} ms");

let topdocs = TopDocs::with_limit(NUMBER_OF_RESULTS_SUGGEST);
match searcher.search(&fuzzied, &topdocs) {
Ok(mut fuzzied) => results.append(&mut fuzzied),
Err(err) => error!("{err:?} during suggest"),
}
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Trying fuzzy: ends at {v} ms");
}
}
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Ending at: {v} ms");
let v = time.elapsed().as_millis();
debug!("{id:?} - Trying fuzzy: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Ending at: {v} ms");

Ok(ParagraphSearchResponse::from(SearchBm25Response {
total: results.len(),
Expand Down Expand Up @@ -146,12 +142,12 @@ impl ReaderChild for ParagraphReaderService {
#[measure(actor = "paragraphs", metric = "search")]
#[tracing::instrument(skip_all)]
fn search(&self, request: &Self::Request) -> NodeResult<Self::Response> {
let time = SystemTime::now();
let time = Instant::now();
let id = Some(&request.id);

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Creating query: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Creating query: starts at {v} ms");

let parser = QueryParser::for_index(&self.index, vec![self.schema.text]);
let results = request.result_per_page as usize;
let offset = results * request.page_number as usize;
Expand All @@ -164,13 +160,12 @@ impl ReaderChild for ParagraphReaderService {
.cloned()
.collect();
let text = self.adapt_text(&parser, &request.body);
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Creating query: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Creating query: ends at {v} ms");

let v = time.elapsed().as_millis();
debug!("{id:?} - Searching: starts at {v} ms");

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Searching: starts at {v} ms");
}
let advanced = request
.advanced_query
.as_ref()
Expand All @@ -194,37 +189,34 @@ impl ReaderChild for ParagraphReaderService {
only_faceted: request.only_faceted,
};
let mut response = searcher.do_search(termc.clone(), original, self)?;
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Searching: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Searching: ends at {v} ms");

if response.results.is_empty() && request.result_per_page > 0 {
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Applying fuzzy: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Applying fuzzy: starts at {v} ms");

let fuzzied = searcher.do_search(termc, fuzzied, self)?;
response = fuzzied;
response.fuzzy_distance = FUZZY_DISTANCE as i32;
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Applying fuzzy: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Applying fuzzy: ends at {v} ms");
}

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Producing results: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Producing results: starts at {v} ms");

let total = response.results.len() as f32;
response.results.iter_mut().enumerate().for_each(|(i, r)| {
if let Some(sc) = &mut r.score {
sc.booster = total - (i as f32);
}
});
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Producing results: starts at {v} ms");
}
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Ending at: {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Producing results: starts at {v} ms");

let v = time.elapsed().as_millis();
debug!("{id:?} - Ending at: {v} ms");

Ok(response)
}
Expand Down Expand Up @@ -295,7 +287,7 @@ impl BatchProducer {
impl Iterator for BatchProducer {
type Item = Vec<ParagraphItem>;
fn next(&mut self) -> Option<Self::Item> {
let time = SystemTime::now();
let time = Instant::now();
if self.offset >= self.total {
debug!("No more batches available");
return None;
Expand Down Expand Up @@ -325,9 +317,9 @@ impl Iterator for BatchProducer {
items.push(ParagraphItem { id, labels });
}
self.offset += Self::BATCH;
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("New batch created, took {v} ms");
}
let v = time.elapsed().as_millis();
debug!("New batch created, took {v} ms");

Some(items)
}
}
Expand Down
76 changes: 36 additions & 40 deletions nucliadb_paragraphs/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::fs;
use std::time::SystemTime;
use std::time::Instant;

use nucliadb_core::prelude::*;
use nucliadb_core::protos::prost::Message;
Expand Down Expand Up @@ -64,81 +64,77 @@ impl WriterChild for ParagraphWriterService {
#[measure(actor = "paragraphs", metric = "count")]
#[tracing::instrument(skip_all)]
fn count(&self) -> NodeResult<usize> {
let time = SystemTime::now();
let time = Instant::now();
let id: Option<String> = None;

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Count starting at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Count starting at {v} ms");

let reader = self.index.reader()?;
let searcher = reader.searcher();
let count = searcher.search(&AllQuery, &Count)?;
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Ending at: {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Ending at: {v} ms");

Ok(count)
}

#[measure(actor = "paragraphs", metric = "set_resource")]
#[tracing::instrument(skip_all)]
fn set_resource(&mut self, resource: &Resource) -> NodeResult<()> {
let time = SystemTime::now();
let time = Instant::now();
let id = Some(&resource.shard_id);

if resource.status != ResourceStatus::Delete as i32 {
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Indexing paragraphs: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Indexing paragraphs: starts at {v} ms");

let _ = self.index_paragraph(resource);
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Indexing paragraphs: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Indexing paragraphs: ends at {v} ms");
}

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Processing paragraphs to delete: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Processing paragraphs to delete: starts at {v} ms");

for paragraph_id in &resource.paragraphs_to_delete {
let uuid_term = Term::from_field_text(self.schema.paragraph, paragraph_id);
self.writer.delete_term(uuid_term);
}
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Processing paragraphs to delete: ends at {v} ms");
}
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Commit: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Processing paragraphs to delete: ends at {v} ms");

let v = time.elapsed().as_millis();
debug!("{id:?} - Commit: starts at {v} ms");

self.writer.commit()?;
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Commit: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Commit: ends at {v} ms");

Ok(())
}

#[measure(actor = "paragraphs", metric = "delete_resource")]
#[tracing::instrument(skip_all)]
fn delete_resource(&mut self, resource_id: &ResourceId) -> NodeResult<()> {
let time = SystemTime::now();
let time = Instant::now();
let id = Some(&resource_id.shard_id);

let uuid_field = self.schema.uuid;
let uuid_term = Term::from_field_text(uuid_field, &resource_id.uuid);
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Delete term: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Delete term: starts at {v} ms");

self.writer.delete_term(uuid_term);
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Delete term: ends at {v} ms");
}
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Commit: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Delete term: ends at {v} ms");

let v = time.elapsed().as_millis();
debug!("{id:?} - Commit: starts at {v} ms");

self.writer.commit()?;
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Commit: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Commit: ends at {v} ms");

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions nucliadb_procs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ pub fn measure(
let expanded = quote! {
#(#attrs)*
#vis #sig {
let time = std::time::SystemTime::now();
let time = std::time::Instant::now();

// execute function body
let return_value = #block;

let took = time.elapsed().map(|elapsed| elapsed.as_secs_f64()).unwrap_or(f64::NAN);
let took = time.elapsed().as_secs_f64();
let metrics = nucliadb_core::metrics::get_metrics();
let metric = nucliadb_core::metrics::request_time::RequestTimeKey::#actor(#metric.to_string());
metrics.record_request_time(metric, took);
Expand Down
Loading

3 comments on commit 02b3992

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 02b3992 Previous: 5a633b0 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 13094.90786909572 iter/sec (stddev: 8.801189073641379e-8) 12745.686329086004 iter/sec (stddev: 1.7317806991721728e-7) 0.97

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 02b3992 Previous: 5a633b0 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 13276.149793622735 iter/sec (stddev: 2.765566036490526e-7) 12745.686329086004 iter/sec (stddev: 1.7317806991721728e-7) 0.96

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 02b3992 Previous: 5a633b0 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 12971.452389048705 iter/sec (stddev: 3.7917047187730425e-7) 12745.686329086004 iter/sec (stddev: 1.7317806991721728e-7) 0.98

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.