Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use monotonic timers for metrics #1739

Merged
merged 2 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions nucliadb_node/src/grpc/middleware/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::task::{Context, Poll};
use std::time::SystemTime;
use std::time::Instant;

use futures::future::BoxFuture;
use hyper::Body;
use nucliadb_core::metrics;
use nucliadb_core::metrics::grpc_ops::GrpcOpKey;
use nucliadb_core::tracing::warn;
use tonic::body::BoxBody;
use tower::{Layer, Service};

Expand Down Expand Up @@ -71,7 +70,7 @@ where
let mut inner = std::mem::replace(&mut self.inner, clone);

Box::pin(async move {
let start = SystemTime::now();
let start = Instant::now();
let meter = metrics::get_metrics();

let grpc_method = req.uri().path().to_string();
Expand All @@ -86,16 +85,13 @@ where
None => call.await,
};

if let Ok(grpc_call_duration) = start.elapsed() {
meter.record_grpc_op(
GrpcOpKey {
method: grpc_method,
},
grpc_call_duration.as_secs_f64(),
);
} else {
warn!("Failed to observe gRPC call duration for: {grpc_method}");
}
let grpc_call_duration = start.elapsed();
meter.record_grpc_op(
GrpcOpKey {
method: grpc_method,
},
grpc_call_duration.as_secs_f64(),
);

response
})
Expand Down
7 changes: 2 additions & 5 deletions nucliadb_node/src/replication/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ pub async fn connect_to_primary_and_replicate(
let no_shards_to_sync = replication_state.shard_states.is_empty();
let no_shards_to_remove = replication_state.shards_to_remove.is_empty();

let start = std::time::SystemTime::now();
let start = std::time::Instant::now();
for shard_state in replication_state.shard_states {
if shutdown_notified.load(std::sync::atomic::Ordering::Relaxed) {
return Ok(());
Expand Down Expand Up @@ -349,10 +349,7 @@ pub async fn connect_to_primary_and_replicate(
//
// 1. If we're healthy, we'll sleep for a while and check again.
// 2. If backed up replicating, we'll try replicating again immediately and check again.
let elapsed = start
.elapsed()
.map(|elapsed| elapsed.as_secs_f64())
.expect("Error getting elapsed time");
let elapsed = start.elapsed().as_secs_f64();
if elapsed < settings.replication_healthy_delay() as f64 {
// only update healthy marker if we're up-to-date in the configured healthy time
repl_health_mng.update_healthy();
Expand Down
102 changes: 47 additions & 55 deletions nucliadb_paragraphs/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//

use std::fmt::Debug;
use std::time::SystemTime;
use std::time::Instant;

use nucliadb_core::prelude::*;
use nucliadb_core::protos::order_by::{OrderField, OrderType};
Expand Down Expand Up @@ -70,46 +70,42 @@ impl ParagraphReader for ParagraphReaderService {
#[measure(actor = "paragraphs", metric = "suggest")]
#[tracing::instrument(skip_all)]
fn suggest(&self, request: &SuggestRequest) -> NodeResult<Self::Response> {
let time = SystemTime::now();
let time = Instant::now();
let id = Some(&request.shard);

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Creating query: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Creating query: starts at {v} ms");

let parser = QueryParser::for_index(&self.index, vec![self.schema.text]);
let text = self.adapt_text(&parser, &request.body);
let (original, termc, fuzzied) =
suggest_query(&parser, &text, request, &self.schema, FUZZY_DISTANCE);
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Creating query: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Creating query: ends at {v} ms");

let v = time.elapsed().as_millis();
debug!("{id:?} - Searching: starts at {v} ms");

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Searching: starts at {v} ms");
}
let searcher = self.reader.searcher();
let topdocs = TopDocs::with_limit(NUMBER_OF_RESULTS_SUGGEST);
let mut results = searcher.search(&original, &topdocs)?;
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Searching: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Searching: ends at {v} ms");

if results.is_empty() {
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Trying fuzzy: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Trying fuzzy: starts at {v} ms");

let topdocs = TopDocs::with_limit(NUMBER_OF_RESULTS_SUGGEST);
match searcher.search(&fuzzied, &topdocs) {
Ok(mut fuzzied) => results.append(&mut fuzzied),
Err(err) => error!("{err:?} during suggest"),
}
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Trying fuzzy: ends at {v} ms");
}
}
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Ending at: {v} ms");
let v = time.elapsed().as_millis();
debug!("{id:?} - Trying fuzzy: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Ending at: {v} ms");

Ok(ParagraphSearchResponse::from(SearchBm25Response {
total: results.len(),
Expand Down Expand Up @@ -146,12 +142,12 @@ impl ReaderChild for ParagraphReaderService {
#[measure(actor = "paragraphs", metric = "search")]
#[tracing::instrument(skip_all)]
fn search(&self, request: &Self::Request) -> NodeResult<Self::Response> {
let time = SystemTime::now();
let time = Instant::now();
let id = Some(&request.id);

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Creating query: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Creating query: starts at {v} ms");

let parser = QueryParser::for_index(&self.index, vec![self.schema.text]);
let results = request.result_per_page as usize;
let offset = results * request.page_number as usize;
Expand All @@ -164,13 +160,12 @@ impl ReaderChild for ParagraphReaderService {
.cloned()
.collect();
let text = self.adapt_text(&parser, &request.body);
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Creating query: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Creating query: ends at {v} ms");

let v = time.elapsed().as_millis();
debug!("{id:?} - Searching: starts at {v} ms");

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Searching: starts at {v} ms");
}
let advanced = request
.advanced_query
.as_ref()
Expand All @@ -194,37 +189,34 @@ impl ReaderChild for ParagraphReaderService {
only_faceted: request.only_faceted,
};
let mut response = searcher.do_search(termc.clone(), original, self)?;
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Searching: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Searching: ends at {v} ms");

if response.results.is_empty() && request.result_per_page > 0 {
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Applying fuzzy: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Applying fuzzy: starts at {v} ms");

let fuzzied = searcher.do_search(termc, fuzzied, self)?;
response = fuzzied;
response.fuzzy_distance = FUZZY_DISTANCE as i32;
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Applying fuzzy: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Applying fuzzy: ends at {v} ms");
}

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Producing results: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Producing results: starts at {v} ms");

let total = response.results.len() as f32;
response.results.iter_mut().enumerate().for_each(|(i, r)| {
if let Some(sc) = &mut r.score {
sc.booster = total - (i as f32);
}
});
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Producing results: starts at {v} ms");
}
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Ending at: {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Producing results: starts at {v} ms");

let v = time.elapsed().as_millis();
debug!("{id:?} - Ending at: {v} ms");

Ok(response)
}
Expand Down Expand Up @@ -295,7 +287,7 @@ impl BatchProducer {
impl Iterator for BatchProducer {
type Item = Vec<ParagraphItem>;
fn next(&mut self) -> Option<Self::Item> {
let time = SystemTime::now();
let time = Instant::now();
if self.offset >= self.total {
debug!("No more batches available");
return None;
Expand Down Expand Up @@ -325,9 +317,9 @@ impl Iterator for BatchProducer {
items.push(ParagraphItem { id, labels });
}
self.offset += Self::BATCH;
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("New batch created, took {v} ms");
}
let v = time.elapsed().as_millis();
debug!("New batch created, took {v} ms");

Some(items)
}
}
Expand Down
76 changes: 36 additions & 40 deletions nucliadb_paragraphs/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::fs;
use std::time::SystemTime;
use std::time::Instant;

use nucliadb_core::prelude::*;
use nucliadb_core::protos::prost::Message;
Expand Down Expand Up @@ -64,81 +64,77 @@ impl WriterChild for ParagraphWriterService {
#[measure(actor = "paragraphs", metric = "count")]
#[tracing::instrument(skip_all)]
fn count(&self) -> NodeResult<usize> {
let time = SystemTime::now();
let time = Instant::now();
let id: Option<String> = None;

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Count starting at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Count starting at {v} ms");

let reader = self.index.reader()?;
let searcher = reader.searcher();
let count = searcher.search(&AllQuery, &Count)?;
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Ending at: {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Ending at: {v} ms");

Ok(count)
}

#[measure(actor = "paragraphs", metric = "set_resource")]
#[tracing::instrument(skip_all)]
fn set_resource(&mut self, resource: &Resource) -> NodeResult<()> {
let time = SystemTime::now();
let time = Instant::now();
let id = Some(&resource.shard_id);

if resource.status != ResourceStatus::Delete as i32 {
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Indexing paragraphs: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Indexing paragraphs: starts at {v} ms");

let _ = self.index_paragraph(resource);
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Indexing paragraphs: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Indexing paragraphs: ends at {v} ms");
}

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Processing paragraphs to delete: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Processing paragraphs to delete: starts at {v} ms");

for paragraph_id in &resource.paragraphs_to_delete {
let uuid_term = Term::from_field_text(self.schema.paragraph, paragraph_id);
self.writer.delete_term(uuid_term);
}
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Processing paragraphs to delete: ends at {v} ms");
}
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Commit: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Processing paragraphs to delete: ends at {v} ms");

let v = time.elapsed().as_millis();
debug!("{id:?} - Commit: starts at {v} ms");

self.writer.commit()?;
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Commit: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Commit: ends at {v} ms");

Ok(())
}

#[measure(actor = "paragraphs", metric = "delete_resource")]
#[tracing::instrument(skip_all)]
fn delete_resource(&mut self, resource_id: &ResourceId) -> NodeResult<()> {
let time = SystemTime::now();
let time = Instant::now();
let id = Some(&resource_id.shard_id);

let uuid_field = self.schema.uuid;
let uuid_term = Term::from_field_text(uuid_field, &resource_id.uuid);
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Delete term: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Delete term: starts at {v} ms");

self.writer.delete_term(uuid_term);
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Delete term: ends at {v} ms");
}
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Commit: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Delete term: ends at {v} ms");

let v = time.elapsed().as_millis();
debug!("{id:?} - Commit: starts at {v} ms");

self.writer.commit()?;
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Commit: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Commit: ends at {v} ms");

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions nucliadb_procs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ pub fn measure(
let expanded = quote! {
#(#attrs)*
#vis #sig {
let time = std::time::SystemTime::now();
let time = std::time::Instant::now();

// execute function body
let return_value = #block;

let took = time.elapsed().map(|elapsed| elapsed.as_secs_f64()).unwrap_or(f64::NAN);
let took = time.elapsed().as_secs_f64();
let metrics = nucliadb_core::metrics::get_metrics();
let metric = nucliadb_core::metrics::request_time::RequestTimeKey::#actor(#metric.to_string());
metrics.record_request_time(metric, took);
Expand Down
Loading
Loading