Skip to content

Commit

Permalink
Merge branch 'main' into shard-cache-improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
javitonino authored Jan 19, 2024
2 parents 8fc7043 + c3f6c03 commit 70e0f7b
Show file tree
Hide file tree
Showing 21 changed files with 401 additions and 380 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ ENV DRIVER=LOCAL
ENV HTTP_PORT=8080
ENV INGEST_GRPC_PORT=8060
ENV TRAIN_GRPC_PORT=8040
ENV LOG_OUTPUT_TYPE=stdout

# HTTP
EXPOSE 8080/tcp
# GRPC
# GRPC - INGEST
EXPOSE 8060/tcp
# GRPC - TRAIN
EXPOSE 8040/tcp
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.39.0
2.40.0
36 changes: 36 additions & 0 deletions nucliadb/migrations/0008_cleanup_leftover_rollover_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright (C) 2021 Bosutech XXI S.L.
#
# nucliadb is offered under the AGPL v3.0 and as commercial software.
# For commercial licensing, contact us at [email protected].
#
# AGPL:
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

"""Upgrade relations and texts indices to v2.
This migration is leaning up leftover junk metadata from previous rollover migration from all KB shards objects.
"""

from nucliadb.common.cluster.rollover import clean_rollover_status
from nucliadb.migrator.context import ExecutionContext


async def migrate(context: ExecutionContext) -> None:
...


async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
await clean_rollover_status(context, kbid)
21 changes: 21 additions & 0 deletions nucliadb/nucliadb/common/cluster/rollover.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ def _set_rollover_status(rollover_shards: writer_pb2.Shards, status: RolloverSta
rollover_shards.extra[status.value] = "true"


def _clear_rollover_status(rollover_shards: writer_pb2.Shards):
for status in RolloverStatus:
rollover_shards.extra.pop(status.value, None)


class UnexpectedRolloverError(Exception):
pass

Expand Down Expand Up @@ -312,6 +317,7 @@ async def cutover_shards(app_context: ApplicationContext, kbid: str) -> None:
if previously_active_shards is None or rollover_shards is None:
raise UnexpectedRolloverError("Shards for kb not found")

_clear_rollover_status(rollover_shards)
await cluster_datamanager.update_kb_shards(kbid, rollover_shards)
await rollover_datamanager.delete_kb_rollover_shards(kbid)

Expand Down Expand Up @@ -436,6 +442,20 @@ async def clean_indexed_data(app_context: ApplicationContext, kbid: str) -> None
await rollover_datamanager.remove_indexed(kbid, batch)


async def clean_rollover_status(app_context: ApplicationContext, kbid: str) -> None:
cluster_datamanager = ClusterDataManager(app_context.kv_driver)
kb_shards = await cluster_datamanager.get_kb_shards(kbid)
if kb_shards is None:
logger.warning(
"No shards found for KB, skipping clean rollover status",
extra={"kbid": kbid},
)
return

_clear_rollover_status(kb_shards)
await cluster_datamanager.update_kb_shards(kbid, kb_shards)


async def rollover_kb_shards(app_context: ApplicationContext, kbid: str) -> None:
"""
Rollover a shard is the process of creating new shard replicas for every
Expand Down Expand Up @@ -471,6 +491,7 @@ async def rollover_kb_shards(app_context: ApplicationContext, kbid: str) -> None
# we need to cut over BEFORE we validate the data
await validate_indexed_data(app_context, kbid)
await clean_indexed_data(app_context, kbid)
await clean_rollover_status(app_context, kbid)

logger.warning("Finished rolling over shards", extra={"kbid": kbid})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from nucliadb.common.cluster import rollover
from nucliadb.common.context import ApplicationContext
from nucliadb.common.datamanagers.cluster import ClusterDataManager

pytestmark = pytest.mark.asyncio

Expand Down Expand Up @@ -79,3 +80,22 @@ async def test_rollover_kb_shards(
assert resp.status_code == 200
body = resp.json()
assert len(body["resources"]) == count


@pytest.mark.parametrize("knowledgebox", ("EXPERIMENTAL", "STABLE"), indirect=True)
async def test_rollover_kb_shards_does_a_clean_cutover(
app_context,
knowledgebox,
):
async def get_kb_shards(kbid: str):
driver = app_context.kv_driver
cluster_data_manager = ClusterDataManager(driver)
return await cluster_data_manager.get_kb_shards(kbid)

shards1 = await get_kb_shards(knowledgebox)
assert shards1.extra == {}

await rollover.rollover_kb_shards(app_context, knowledgebox)

shards2 = await get_kb_shards(knowledgebox)
assert shards2.extra == {}
22 changes: 9 additions & 13 deletions nucliadb_node/src/grpc/middleware/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::task::{Context, Poll};
use std::time::SystemTime;
use std::time::Instant;

use futures::future::BoxFuture;
use hyper::Body;
use nucliadb_core::metrics;
use nucliadb_core::metrics::grpc_ops::GrpcOpKey;
use nucliadb_core::tracing::warn;
use tonic::body::BoxBody;
use tower::{Layer, Service};

Expand Down Expand Up @@ -71,7 +70,7 @@ where
let mut inner = std::mem::replace(&mut self.inner, clone);

Box::pin(async move {
let start = SystemTime::now();
let start = Instant::now();
let meter = metrics::get_metrics();

let grpc_method = req.uri().path().to_string();
Expand All @@ -86,16 +85,13 @@ where
None => call.await,
};

if let Ok(grpc_call_duration) = start.elapsed() {
meter.record_grpc_op(
GrpcOpKey {
method: grpc_method,
},
grpc_call_duration.as_secs_f64(),
);
} else {
warn!("Failed to observe gRPC call duration for: {grpc_method}");
}
let grpc_call_duration = start.elapsed();
meter.record_grpc_op(
GrpcOpKey {
method: grpc_method,
},
grpc_call_duration.as_secs_f64(),
);

response
})
Expand Down
7 changes: 2 additions & 5 deletions nucliadb_node/src/replication/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ pub async fn connect_to_primary_and_replicate(
let no_shards_to_sync = replication_state.shard_states.is_empty();
let no_shards_to_remove = replication_state.shards_to_remove.is_empty();

let start = std::time::SystemTime::now();
let start = std::time::Instant::now();
for shard_state in replication_state.shard_states {
if shutdown_notified.load(std::sync::atomic::Ordering::Relaxed) {
return Ok(());
Expand Down Expand Up @@ -349,10 +349,7 @@ pub async fn connect_to_primary_and_replicate(
//
// 1. If we're healthy, we'll sleep for a while and check again.
// 2. If backed up replicating, we'll try replicating again immediately and check again.
let elapsed = start
.elapsed()
.map(|elapsed| elapsed.as_secs_f64())
.expect("Error getting elapsed time");
let elapsed = start.elapsed().as_secs_f64();
if elapsed < settings.replication_healthy_delay() as f64 {
// only update healthy marker if we're up-to-date in the configured healthy time
repl_health_mng.update_healthy();
Expand Down
102 changes: 47 additions & 55 deletions nucliadb_paragraphs/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//

use std::fmt::Debug;
use std::time::SystemTime;
use std::time::Instant;

use nucliadb_core::prelude::*;
use nucliadb_core::protos::order_by::{OrderField, OrderType};
Expand Down Expand Up @@ -70,46 +70,42 @@ impl ParagraphReader for ParagraphReaderService {
#[measure(actor = "paragraphs", metric = "suggest")]
#[tracing::instrument(skip_all)]
fn suggest(&self, request: &SuggestRequest) -> NodeResult<Self::Response> {
let time = SystemTime::now();
let time = Instant::now();
let id = Some(&request.shard);

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Creating query: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Creating query: starts at {v} ms");

let parser = QueryParser::for_index(&self.index, vec![self.schema.text]);
let text = self.adapt_text(&parser, &request.body);
let (original, termc, fuzzied) =
suggest_query(&parser, &text, request, &self.schema, FUZZY_DISTANCE);
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Creating query: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Creating query: ends at {v} ms");

let v = time.elapsed().as_millis();
debug!("{id:?} - Searching: starts at {v} ms");

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Searching: starts at {v} ms");
}
let searcher = self.reader.searcher();
let topdocs = TopDocs::with_limit(NUMBER_OF_RESULTS_SUGGEST);
let mut results = searcher.search(&original, &topdocs)?;
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Searching: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Searching: ends at {v} ms");

if results.is_empty() {
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Trying fuzzy: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Trying fuzzy: starts at {v} ms");

let topdocs = TopDocs::with_limit(NUMBER_OF_RESULTS_SUGGEST);
match searcher.search(&fuzzied, &topdocs) {
Ok(mut fuzzied) => results.append(&mut fuzzied),
Err(err) => error!("{err:?} during suggest"),
}
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Trying fuzzy: ends at {v} ms");
}
}
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Ending at: {v} ms");
let v = time.elapsed().as_millis();
debug!("{id:?} - Trying fuzzy: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Ending at: {v} ms");

Ok(ParagraphSearchResponse::from(SearchBm25Response {
total: results.len(),
Expand Down Expand Up @@ -146,12 +142,12 @@ impl ReaderChild for ParagraphReaderService {
#[measure(actor = "paragraphs", metric = "search")]
#[tracing::instrument(skip_all)]
fn search(&self, request: &Self::Request) -> NodeResult<Self::Response> {
let time = SystemTime::now();
let time = Instant::now();
let id = Some(&request.id);

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Creating query: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Creating query: starts at {v} ms");

let parser = QueryParser::for_index(&self.index, vec![self.schema.text]);
let results = request.result_per_page as usize;
let offset = results * request.page_number as usize;
Expand All @@ -164,13 +160,12 @@ impl ReaderChild for ParagraphReaderService {
.cloned()
.collect();
let text = self.adapt_text(&parser, &request.body);
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Creating query: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Creating query: ends at {v} ms");

let v = time.elapsed().as_millis();
debug!("{id:?} - Searching: starts at {v} ms");

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Searching: starts at {v} ms");
}
let advanced = request
.advanced_query
.as_ref()
Expand All @@ -194,37 +189,34 @@ impl ReaderChild for ParagraphReaderService {
only_faceted: request.only_faceted,
};
let mut response = searcher.do_search(termc.clone(), original, self)?;
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Searching: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Searching: ends at {v} ms");

if response.results.is_empty() && request.result_per_page > 0 {
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Applying fuzzy: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Applying fuzzy: starts at {v} ms");

let fuzzied = searcher.do_search(termc, fuzzied, self)?;
response = fuzzied;
response.fuzzy_distance = FUZZY_DISTANCE as i32;
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Applying fuzzy: ends at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Applying fuzzy: ends at {v} ms");
}

if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Producing results: starts at {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Producing results: starts at {v} ms");

let total = response.results.len() as f32;
response.results.iter_mut().enumerate().for_each(|(i, r)| {
if let Some(sc) = &mut r.score {
sc.booster = total - (i as f32);
}
});
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Producing results: starts at {v} ms");
}
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("{id:?} - Ending at: {v} ms");
}
let v = time.elapsed().as_millis();
debug!("{id:?} - Producing results: starts at {v} ms");

let v = time.elapsed().as_millis();
debug!("{id:?} - Ending at: {v} ms");

Ok(response)
}
Expand Down Expand Up @@ -295,7 +287,7 @@ impl BatchProducer {
impl Iterator for BatchProducer {
type Item = Vec<ParagraphItem>;
fn next(&mut self) -> Option<Self::Item> {
let time = SystemTime::now();
let time = Instant::now();
if self.offset >= self.total {
debug!("No more batches available");
return None;
Expand Down Expand Up @@ -325,9 +317,9 @@ impl Iterator for BatchProducer {
items.push(ParagraphItem { id, labels });
}
self.offset += Self::BATCH;
if let Ok(v) = time.elapsed().map(|s| s.as_millis()) {
debug!("New batch created, took {v} ms");
}
let v = time.elapsed().as_millis();
debug!("New batch created, took {v} ms");

Some(items)
}
}
Expand Down
Loading

1 comment on commit 70e0f7b

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 70e0f7b Previous: 5a633b0 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 13255.453478963405 iter/sec (stddev: 2.5929523629123066e-7) 12745.686329086004 iter/sec (stddev: 1.7317806991721728e-7) 0.96

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.