From 068a6b7547baf784a31ac656ed5ef099a6ee74b0 Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Mon, 11 Nov 2024 15:31:32 +0100 Subject: [PATCH 1/9] Skeleton for binding and pyprotos --- nidx/Cargo.lock | 110 +++++++++++++++++++++++++++++-- nidx/Cargo.toml | 2 +- nidx/nidx_binding/Cargo.toml | 16 +++++ nidx/nidx_binding/pyproject.toml | 15 +++++ nidx/nidx_binding/src/lib.rs | 104 +++++++++++++++++++++++++++++ nidx/nidx_protos/build.py | 26 ++++++++ nidx/nidx_protos/pyproject.toml | 10 +++ nidx/src/settings.rs | 5 +- 8 files changed, 282 insertions(+), 6 deletions(-) create mode 100644 nidx/nidx_binding/Cargo.toml create mode 100644 nidx/nidx_binding/pyproject.toml create mode 100644 nidx/nidx_binding/src/lib.rs create mode 100644 nidx/nidx_protos/build.py create mode 100644 nidx/nidx_protos/pyproject.toml diff --git a/nidx/Cargo.lock b/nidx/Cargo.lock index fa8937a7b6..8d0cc27b22 100644 --- a/nidx/Cargo.lock +++ b/nidx/Cargo.lock @@ -61,9 +61,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.92" +version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74f37166d7d48a0284b99dd824694c26119c700b53bf0d1540cdb147dbdaaf13" +checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" [[package]] name = "arc-swap" @@ -1117,6 +1117,12 @@ dependencies = [ "hashbrown 0.15.0", ] +[[package]] +name = "indoc" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5" + [[package]] name = "instant" version = "0.1.13" @@ -1322,6 +1328,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.17" @@ -1403,6 +1418,18 @@ dependencies = [ "uuid", ] +[[package]] +name = "nidx_binding" +version = "0.1.0" +dependencies = [ + "anyhow", + "nidx", + "nidx_protos", + "pyo3", + "tokio", + "tonic", +] + [[package]] name = "nidx_paragraph" version = "0.1.0" @@ -1907,6 +1934,69 @@ dependencies = [ "prost", ] +[[package]] +name = "pyo3" +version = "0.22.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f402062616ab18202ae8319da13fa4279883a2b8a9d9f83f20dbade813ce1884" +dependencies = [ + "cfg-if", + "indoc", + "libc", + "memoffset", + "once_cell", + "portable-atomic", + "pyo3-build-config", + "pyo3-ffi", + "pyo3-macros", + "unindent", +] + +[[package]] +name = "pyo3-build-config" +version = "0.22.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b14b5775b5ff446dd1056212d778012cbe8a0fbffd368029fd9e25b514479c38" +dependencies = [ + "once_cell", + "target-lexicon", +] + +[[package]] +name = "pyo3-ffi" +version = "0.22.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ab5bcf04a2cdcbb50c7d6105de943f543f9ed92af55818fd17b660390fc8636" +dependencies = [ + "libc", + "pyo3-build-config", +] + +[[package]] +name = "pyo3-macros" +version = "0.22.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fd24d897903a9e6d80b968368a34e1525aeb719d568dba8b3d4bfa5dc67d453" +dependencies = [ + "proc-macro2", + "pyo3-macros-backend", + "quote", + "syn", +] + +[[package]] +name = "pyo3-macros-backend" +version = "0.22.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36c011a03ba1e50152b4b394b479826cad97e7a21eb52df179cd91ac411cbfbe" +dependencies = [ + "heck", + "proc-macro2", + "pyo3-build-config", + "quote", + "syn", +] + [[package]] name = "quick-xml" version = "0.36.2" @@ -3014,6 +3104,12 @@ dependencies = [ "xattr", ] +[[package]] +name = "target-lexicon" +version = "0.12.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" + [[package]] name = "tempfile" version = "3.13.0" @@ -3125,9 +3221,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.41.0" +version = "1.41.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "145f3413504347a2be84393cc8a7d2fb4d863b375909ea59f2158261aa258bbb" +checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" dependencies = [ "backtrace", "bytes", @@ -3395,6 +3491,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" +[[package]] +name = "unindent" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7de7d73e1754487cb58364ee906a499937a0dfabd86bcb980fa99ec8c8fa2ce" + [[package]] name = "untrusted" version = "0.9.0" diff --git a/nidx/Cargo.toml b/nidx/Cargo.toml index 7dd793e9ec..4b081ebace 100644 --- a/nidx/Cargo.toml +++ b/nidx/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [workspace] -members = ["nidx_vector", "nidx_types", "nidx_protos", "nidx_tests", "nidx_text", "nidx_tantivy", "nidx_paragraph", "nidx_relation"] +members = ["nidx_vector", "nidx_types", "nidx_protos", "nidx_tests", "nidx_text", "nidx_tantivy", "nidx_paragraph", "nidx_relation", "nidx_binding"] [dependencies] anyhow = "1.0.89" diff --git a/nidx/nidx_binding/Cargo.toml b/nidx/nidx_binding/Cargo.toml new file mode 100644 index 0000000000..5c8cccac66 --- /dev/null +++ b/nidx/nidx_binding/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "nidx_binding" +version = "0.1.0" +edition = "2021" + +[lib] +name = "nidx_binding" +crate-type = ["cdylib"] + +[dependencies] +anyhow = "1.0.93" +nidx = { version = "0.1.0", path = ".." } +nidx_protos = { version = "0.1.0", path = "../nidx_protos" } +pyo3 = "0.22.0" +tokio = "1.41.1" +tonic = "0.12.3" diff --git a/nidx/nidx_binding/pyproject.toml b/nidx/nidx_binding/pyproject.toml new file mode 100644 index 0000000000..93a884cbd3 --- /dev/null +++ b/nidx/nidx_binding/pyproject.toml @@ -0,0 +1,15 @@ +[build-system] +requires = ["maturin>=1.7,<2.0"] +build-backend = "maturin" + +[project] +name = "nidx_binding" +requires-python = ">=3.8" +classifiers = [ + "Programming Language :: Rust", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", +] +dynamic = ["version"] +[tool.maturin] +features = ["pyo3/extension-module"] diff --git a/nidx/nidx_binding/src/lib.rs b/nidx/nidx_binding/src/lib.rs new file mode 100644 index 0000000000..2b0dce8ccc --- /dev/null +++ b/nidx/nidx_binding/src/lib.rs @@ -0,0 +1,104 @@ +use pyo3::prelude::*; + +use nidx::api::grpc::ApiServer; +use nidx::grpc_server::GrpcServer; +use nidx::indexer::index_resource; +use nidx::searcher::grpc::SearchServer; +use nidx::searcher::SyncedSearcher; +use nidx::settings::{EnvSettings, MetadataSettings, ObjectStoreConfig, StorageSettings}; +use nidx::Settings; +use nidx_protos::Resource; +use std::path::Path; +use tokio::runtime::Runtime; + +#[pyclass] +pub struct NidxBinding { + #[pyo3(get)] + searcher_port: u16, + #[pyo3(get)] + api_port: u16, + settings: Settings, + seq: i64, + runtime: Option, +} + +#[pymethods] +impl NidxBinding { + #[new] + pub fn start() -> NidxBinding { + let rt = tokio::runtime::Runtime::new().unwrap(); + let mut binding = rt.block_on(NidxBinding::new()).unwrap(); + binding.runtime = Some(rt); + binding + } +} + +impl NidxBinding { + pub async fn new() -> anyhow::Result { + let settings = Settings::from_env_settings(EnvSettings { + indexer: Some(nidx::settings::IndexerSettings { + object_store: ObjectStoreConfig::File { + file_path: "../../data/blob/fake/".to_string(), + } + .client(), + nats_server: String::new(), + }), + storage: Some(StorageSettings { + object_store: ObjectStoreConfig::File { + file_path: "../../data/blob/fake/".to_string(), + } + .client(), + }), + merge: Default::default(), + metadata: MetadataSettings { + database_url: "postgresql://postgres@localhost/test".to_string(), + }, + }) + .await?; + + // API server + let api_service = ApiServer::new(settings.metadata.clone()).into_service(); + let api_server = GrpcServer::new("localhost:0").await?; + let api_port = api_server.port()?; + tokio::task::spawn(api_server.serve(api_service)); + + // Searcher API + let searcher = SyncedSearcher::new(settings.metadata.clone(), Path::new("/tmp/searcher")); + let searcher_api = SearchServer::new(settings.metadata.clone(), searcher.index_cache()); + let searcher_server = GrpcServer::new("localhost:0").await?; + let searcher_port = searcher_server.port()?; + tokio::task::spawn(searcher_server.serve(searcher_api.into_service())); + let settings_copy = settings.clone(); + tokio::task::spawn( + async move { searcher.run(settings_copy.storage.as_ref().unwrap().object_store.clone()).await }, + ); + + Ok(NidxBinding { + searcher_port, + api_port, + settings, + seq: 1, + runtime: None, + }) + } + + pub async fn index_resource(&mut self, shard_id: &str, resource: Resource) -> anyhow::Result<()> { + index_resource( + &self.settings.metadata, + self.settings.storage.as_ref().unwrap().object_store.clone(), + shard_id, + resource, + self.seq.into(), + ) + .await?; + self.seq += 1; + Ok(()) + } +} + +/// A Python module implemented in Rust. +#[pymodule] +fn nidx_binding(m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add_class::()?; + Ok(()) +} diff --git a/nidx/nidx_protos/build.py b/nidx/nidx_protos/build.py new file mode 100644 index 0000000000..f343314bc2 --- /dev/null +++ b/nidx/nidx_protos/build.py @@ -0,0 +1,26 @@ +import os + +from grpc_tools import protoc + + +def pdm_build_initialize(context): + build_dir = context.ensure_build_dir() + python_dir = f"{build_dir}/nidx_protos" + try: + os.mkdir(python_dir) + except FileExistsError: + pass + + for proto in [ + "src/nidx.proto", + ]: + command = [ + "grpc_tools.protoc", + "--proto_path={}".format("src"), + "--proto_path={}".format("../../"), + "--python_out={}".format(python_dir), + "--pyi_out={}".format(python_dir), + "--grpc_python_out={}".format(python_dir), + ] + [proto] + if protoc.main(command) != 0: + raise Exception("error: {} failed".format(command)) diff --git a/nidx/nidx_protos/pyproject.toml b/nidx/nidx_protos/pyproject.toml new file mode 100644 index 0000000000..4b9ae75174 --- /dev/null +++ b/nidx/nidx_protos/pyproject.toml @@ -0,0 +1,10 @@ +[build-system] +requires = ["pdm-backend", "grpcio-tools @ file:///tmp/grpcio_tools-1.67.1"] +build-backend = "pdm.backend" + +[project] +name = "nidx_protos" +version = "0.0.1" + +[tool.pdm.build] +custom-hook = "build.py" diff --git a/nidx/src/settings.rs b/nidx/src/settings.rs index d67e83954c..b5a84dabc4 100644 --- a/nidx/src/settings.rs +++ b/nidx/src/settings.rs @@ -194,7 +194,10 @@ impl Deref for Settings { impl Settings { pub async fn from_env() -> anyhow::Result { - let settings = EnvSettings::from_env(); + Self::from_env_settings(EnvSettings::from_env()).await + } + + pub async fn from_env_settings(settings: EnvSettings) -> anyhow::Result { let metadata = NidxMetadata::new(&settings.metadata.database_url).await?; Ok(Self { metadata, From 77deacddaaf86b01a2c3670cfb2e657512131010 Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Mon, 11 Nov 2024 16:16:00 +0100 Subject: [PATCH 2/9] nidx_binding example usage --- nidx/nidx_binding/src/lib.rs | 4 +-- nidx/nidx_protos/build.py | 4 +++ nidx/nidx_protos/pyproject.toml | 2 +- .../src/nucliadb/common/cluster/manager.py | 11 +++++-- nucliadb/src/nucliadb/common/nidx.py | 12 +++++++ pdm.lock | 31 ++++++++++++++----- pyproject.toml | 4 +++ 7 files changed, 55 insertions(+), 13 deletions(-) diff --git a/nidx/nidx_binding/src/lib.rs b/nidx/nidx_binding/src/lib.rs index 2b0dce8ccc..fc52616f33 100644 --- a/nidx/nidx_binding/src/lib.rs +++ b/nidx/nidx_binding/src/lib.rs @@ -38,14 +38,14 @@ impl NidxBinding { let settings = Settings::from_env_settings(EnvSettings { indexer: Some(nidx::settings::IndexerSettings { object_store: ObjectStoreConfig::File { - file_path: "../../data/blob/fake/".to_string(), + file_path: "data/blob/fake/".to_string(), } .client(), nats_server: String::new(), }), storage: Some(StorageSettings { object_store: ObjectStoreConfig::File { - file_path: "../../data/blob/fake/".to_string(), + file_path: "data/blob/fake/".to_string(), } .client(), }), diff --git a/nidx/nidx_protos/build.py b/nidx/nidx_protos/build.py index f343314bc2..60f25ac7f2 100644 --- a/nidx/nidx_protos/build.py +++ b/nidx/nidx_protos/build.py @@ -11,6 +11,7 @@ def pdm_build_initialize(context): except FileExistsError: pass + # Compile protos for proto in [ "src/nidx.proto", ]: @@ -24,3 +25,6 @@ def pdm_build_initialize(context): ] + [proto] if protoc.main(command) != 0: raise Exception("error: {} failed".format(command)) + + # Create py.typed to enable type checking + open(f"{python_dir}/py.typed", "w") diff --git a/nidx/nidx_protos/pyproject.toml b/nidx/nidx_protos/pyproject.toml index 4b9ae75174..9721423107 100644 --- a/nidx/nidx_protos/pyproject.toml +++ b/nidx/nidx_protos/pyproject.toml @@ -1,5 +1,5 @@ [build-system] -requires = ["pdm-backend", "grpcio-tools @ file:///tmp/grpcio_tools-1.67.1"] +requires = ["pdm-backend", "grpcio-tools>=1.44.0,<1.63.0"] build-backend = "pdm.backend" [project] diff --git a/nucliadb/src/nucliadb/common/cluster/manager.py b/nucliadb/src/nucliadb/common/cluster/manager.py index 33b30eaea6..5f0a578038 100644 --- a/nucliadb/src/nucliadb/common/cluster/manager.py +++ b/nucliadb/src/nucliadb/common/cluster/manager.py @@ -44,7 +44,7 @@ nodewriter_pb2, writer_pb2, ) -from nucliadb_protos.nodewriter_pb2 import IndexMessage, IndexMessageSource, TypeMessage +from nucliadb_protos.nodewriter_pb2 import IndexMessage, IndexMessageSource, NewShardRequest, TypeMessage from nucliadb_telemetry import errors from nucliadb_utils.utilities import get_indexing, get_storage @@ -214,8 +214,13 @@ async def create_shard_by_kbid( nidx_api = get_nidx_api_client() if nidx_api: - shard_created = await nidx_api.new_shard_with_vectorsets(kbid, vectorsets) - shard_uuid = uuid.UUID(shard_created.id).hex + req = NewShardRequest( + kbid=kbid, + vectorsets_configs=vectorsets, + ) + + resp = await nidx_api.NewShard(req) # type: ignore + shard_uuid = uuid.UUID(resp.id).hex else: shard_uuid = uuid.uuid4().hex diff --git a/nucliadb/src/nucliadb/common/nidx.py b/nucliadb/src/nucliadb/common/nidx.py index 690323b61c..f853230175 100644 --- a/nucliadb/src/nucliadb/common/nidx.py +++ b/nucliadb/src/nucliadb/common/nidx.py @@ -25,6 +25,7 @@ IndexMessage, ) from nucliadb_utils import logger +from nucliadb_utils.grpc import get_traced_grpc_channel from nucliadb_utils.nats import NatsConnectionManager from nucliadb_utils.settings import indexing_settings from nucliadb_utils.utilities import Utility, clean_utility, get_utility, set_utility @@ -32,6 +33,14 @@ if TYPE_CHECKING: from nucliadb.common.cluster.manager import AbstractIndexNode +try: + import nidx_binding # type: ignore + from nidx_protos import nidx_pb2_grpc + + NIDX_BINDING = nidx_binding.NidxBinding() +except ImportError as e: + logger.warning("Import error while loading nidx_binding", e) + class NidxIndexer: def __init__( @@ -96,5 +105,8 @@ def get_nidx_api_client() -> Optional["AbstractIndexNode"]: available_disk=0, ) return nidx_api + elif settings.standalone_mode and NIDX_BINDING is not None: + channel = get_traced_grpc_channel(f"localhost:{NIDX_BINDING.api_port}", "nidx_api") + return nidx_pb2_grpc.NidxApiStub(channel) else: return None diff --git a/pdm.lock b/pdm.lock index 6091d41da8..7f337417e1 100644 --- a/pdm.lock +++ b/pdm.lock @@ -2,10 +2,10 @@ # It is not intended for manual editing. [metadata] -groups = ["default", "dev", "sdk", "sidecar"] +groups = ["default", "dev", "nidx", "sdk", "sidecar"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:58191ec87d413066b2e4293ab9d07a56e0b9b08d2015a841f7eb0e216f95fd71" +content_hash = "sha256:6978f28085ba2ae6fa7ff1e22eb326952cf82d99889685a307293f258a59afc9" [[metadata.targets]] requires_python = ">=3.9" @@ -2026,6 +2026,23 @@ files = [ {file = "nats_py-2.9.0.tar.gz", hash = "sha256:01886eb9e0a87f0ec630652cf1fae65d2a8556378a609bc6cc07d2ea60c8d0dd"}, ] +[[package]] +name = "nidx-binding" +version = "0.1.0" +requires_python = ">=3.8" +editable = true +path = "./nidx/nidx_binding" +summary = "" +groups = ["nidx"] + +[[package]] +name = "nidx-protos" +version = "0.0.1" +editable = true +path = "./nidx/nidx_protos" +summary = "" +groups = ["nidx"] + [[package]] name = "nkeys" version = "0.2.1" @@ -2260,13 +2277,13 @@ dependencies = [ "grpcio-tools<1.63.0,>=1.44.0", "grpcio<1.63.0,>=1.44.0", "nats-py[nkeys]>=2.5.0", - "opentelemetry-api==1.21.0", - "opentelemetry-exporter-jaeger==1.21.0", + "opentelemetry-api>=1.21.0", + "opentelemetry-exporter-jaeger>=1.21.0", "opentelemetry-instrumentation-aiohttp-client>=0.42b0", "opentelemetry-instrumentation-fastapi>=0.42b0", - "opentelemetry-propagator-b3==1.21.0", - "opentelemetry-proto==1.21.0", - "opentelemetry-sdk==1.21.0", + "opentelemetry-propagator-b3>=1.21.0", + "opentelemetry-proto>=1.21.0", + "opentelemetry-sdk>=1.21.0", "opentelemetry-semantic-conventions>=0.42b0", ] diff --git a/pyproject.toml b/pyproject.toml index df67c7beaa..cf7c06c6ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,6 +61,10 @@ sdk = [ "-e file:///${PROJECT_ROOT}/nucliadb_dataset#egg=nucliadb-dataset", "requests-mock>=1.12.1", ] +nidx = [ + "-e file:///${PROJECT_ROOT}/nidx/nidx_binding", + "-e file:///${PROJECT_ROOT}/nidx/nidx_protos", +] [tool.setuptools] py-modules = [] From 1f5081e7e2b49caa727b642310acba40a74d0095 Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Tue, 12 Nov 2024 10:57:05 +0100 Subject: [PATCH 3/9] Separate nidx utility --- nidx/nidx_binding/src/lib.rs | 32 +--- nidx/src/settings.rs | 9 +- .../src/nucliadb/common/cluster/settings.py | 3 +- nucliadb/src/nucliadb/common/nidx.py | 149 +++++++++++++----- nucliadb/src/nucliadb/ingest/app.py | 3 +- .../src/nucliadb/ingest/orm/knowledgebox.py | 4 +- nucliadb/src/nucliadb/standalone/config.py | 7 +- 7 files changed, 132 insertions(+), 75 deletions(-) diff --git a/nidx/nidx_binding/src/lib.rs b/nidx/nidx_binding/src/lib.rs index fc52616f33..73e95049bb 100644 --- a/nidx/nidx_binding/src/lib.rs +++ b/nidx/nidx_binding/src/lib.rs @@ -5,9 +5,10 @@ use nidx::grpc_server::GrpcServer; use nidx::indexer::index_resource; use nidx::searcher::grpc::SearchServer; use nidx::searcher::SyncedSearcher; -use nidx::settings::{EnvSettings, MetadataSettings, ObjectStoreConfig, StorageSettings}; +use nidx::settings::EnvSettings; use nidx::Settings; use nidx_protos::Resource; +use std::collections::HashMap; use std::path::Path; use tokio::runtime::Runtime; @@ -25,36 +26,19 @@ pub struct NidxBinding { #[pymethods] impl NidxBinding { #[new] - pub fn start() -> NidxBinding { + pub fn start(mut settings: HashMap) -> NidxBinding { + settings.insert("INDEXER__NATS_SERVER".to_string(), "".to_string()); + let rt = tokio::runtime::Runtime::new().unwrap(); - let mut binding = rt.block_on(NidxBinding::new()).unwrap(); + let mut binding = rt.block_on(NidxBinding::new(settings)).unwrap(); binding.runtime = Some(rt); binding } } impl NidxBinding { - pub async fn new() -> anyhow::Result { - let settings = Settings::from_env_settings(EnvSettings { - indexer: Some(nidx::settings::IndexerSettings { - object_store: ObjectStoreConfig::File { - file_path: "data/blob/fake/".to_string(), - } - .client(), - nats_server: String::new(), - }), - storage: Some(StorageSettings { - object_store: ObjectStoreConfig::File { - file_path: "data/blob/fake/".to_string(), - } - .client(), - }), - merge: Default::default(), - metadata: MetadataSettings { - database_url: "postgresql://postgres@localhost/test".to_string(), - }, - }) - .await?; + pub async fn new(binding_settings: HashMap) -> anyhow::Result { + let settings = Settings::from_env_settings(EnvSettings::from_map(binding_settings)).await?; // API server let api_service = ApiServer::new(settings.metadata.clone()).into_service(); diff --git a/nidx/src/settings.rs b/nidx/src/settings.rs index b5a84dabc4..034957e7cb 100644 --- a/nidx/src/settings.rs +++ b/nidx/src/settings.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::ops::Deref; // Copyright (C) 2021 Bosutech XXI S.L. // @@ -174,6 +175,11 @@ impl EnvSettings { let config = Config::builder().add_source(env.separator("__")).build().unwrap(); config.try_deserialize().unwrap() } + + pub fn from_map(vars: HashMap) -> Self { + let env = Environment::default().source(Some(vars)); + Self::from_config_environment(env) + } } /// Settings wrapper that holds opens a connection to the database @@ -221,8 +227,7 @@ mod tests { ("INDEXER__NATS_SERVER", "localhost"), ("MERGE__MIN_NUMBER_OF_SEGMENTS", "1234"), ]; - let env = Environment::default().source(Some(HashMap::from(env.map(|(k, v)| (k.to_string(), v.to_string()))))); - let settings = EnvSettings::from_config_environment(env); + let settings = EnvSettings::from_map(HashMap::from(env.map(|(k, v)| (k.to_string(), v.to_string())))); assert_eq!(&settings.metadata.database_url, "postgresql://localhost"); assert_eq!(&settings.indexer.unwrap().nats_server, "localhost"); assert_eq!(settings.merge.min_number_of_segments, 1234); diff --git a/nucliadb/src/nucliadb/common/cluster/settings.py b/nucliadb/src/nucliadb/common/cluster/settings.py index fb4b29683c..2b2b400dd1 100644 --- a/nucliadb/src/nucliadb/common/cluster/settings.py +++ b/nucliadb/src/nucliadb/common/cluster/settings.py @@ -86,7 +86,8 @@ class Settings(BaseSettings): cluster_discovery_kubernetes_selector: str = "appType=node" cluster_discovery_manual_addresses: list[str] = [] - nidx_api: Optional[str] = Field(default=None, description="NIDX gRPC API address") + nidx_api_address: Optional[str] = Field(default=None, description="NIDX gRPC API address") + nidx_searcher_address: Optional[str] = Field(default=None, description="NIDX gRPC API address") settings = Settings() diff --git a/nucliadb/src/nucliadb/common/nidx.py b/nucliadb/src/nucliadb/common/nidx.py index f853230175..d8ce5a81ff 100644 --- a/nucliadb/src/nucliadb/common/nidx.py +++ b/nucliadb/src/nucliadb/common/nidx.py @@ -18,46 +18,107 @@ # along with this program. If not, see . # -from typing import TYPE_CHECKING, Optional +import os +from typing import Optional from nucliadb.common.cluster.settings import settings +from nucliadb.ingest.settings import DriverConfig +from nucliadb.ingest.settings import settings as ingest_settings from nucliadb_protos.nodewriter_pb2 import ( IndexMessage, ) from nucliadb_utils import logger from nucliadb_utils.grpc import get_traced_grpc_channel from nucliadb_utils.nats import NatsConnectionManager -from nucliadb_utils.settings import indexing_settings +from nucliadb_utils.settings import FileBackendConfig, indexing_settings, storage_settings from nucliadb_utils.utilities import Utility, clean_utility, get_utility, set_utility -if TYPE_CHECKING: - from nucliadb.common.cluster.manager import AbstractIndexNode - try: - import nidx_binding # type: ignore - from nidx_protos import nidx_pb2_grpc + from nidx_protos.nidx_pb2_grpc import NidxApiStub, NidxSearcherStub + + NIDX_INSTALLED = True +except ImportError: + logger.info("nidx not installed") + NIDX_INSTALLED = False + + +class NidxUtility: + api_client = None + searcher_client = None + + async def initialize(self): ... + async def finalize(self): ... + async def index(self, msg: IndexMessage) -> int: ... + + +class NidxBindingUtility(NidxUtility): + """Implements Nidx utility using the binding""" + + def __init__(self): + if ingest_settings.driver != DriverConfig.PG: + raise ValueError("nidx_binding requires DRIVER=pg") + + if storage_settings.file_backend != FileBackendConfig.LOCAL: + # This is a limitation just to simplify configuration, it can be removed if needed + raise ValueError("nidx_binding requires FILE_BACKEND=local") + + if storage_settings.local_files is None or storage_settings.local_indexing_bucket is None: + raise ValueError("nidx_binding requires LOCAL_FILES and LOCAL_INDEXING_BUCKET to be set") + + indexing_path = storage_settings.local_files + "/" + storage_settings.local_indexing_bucket + nidx_storage_path = storage_settings.local_files + "/nidx" + os.makedirs(indexing_path, exist_ok=True) + os.makedirs(nidx_storage_path, exist_ok=True) + + self.config = { + "METADATA__DATABASE_URL": ingest_settings.driver_pg_url, + "INDEXER__OBJECT_STORE": "file", + "INDEXER__FILE_PATH": indexing_path, + "STORAGE__OBJECT_STORE": "file", + "STORAGE__FILE_PATH": nidx_storage_path, + } + + async def initialize(self): + import nidx_binding # type: ignore + + self.binding = nidx_binding.NidxBinding(self.config) + self.api_client = NidxApiStub( + get_traced_grpc_channel(f"localhost:{self.binding.api_port}", "nidx_api") + ) + self.searcher_client = NidxSearcherStub( + get_traced_grpc_channel(f"localhost:{self.binding.searcher_port}", "nidx_searcher") + ) + + async def finalize(self): + del self.binding + + async def index(self, msg: IndexMessage) -> int: + raise "Not implemented yet" - NIDX_BINDING = nidx_binding.NidxBinding() -except ImportError as e: - logger.warning("Import error while loading nidx_binding", e) +class NidxServiceUtility(NidxUtility): + """Implements Nidx utility connecting to the network service""" + + def __init__(self): + if indexing_settings.index_nidx_subject is None: + raise ValueError("nidx subject needed for nidx utility") + + if not settings.nidx_api_address or not settings.nidx_searcher_address: + raise ValueError("NIDX_API and NIDX_SEARCHER are required") -class NidxIndexer: - def __init__( - self, - subject: str, - nats_servers: list[str], - nats_creds: Optional[str] = None, - ): self.nats_connection_manager = NatsConnectionManager( service_name="NidxIndexer", - nats_servers=nats_servers, - nats_creds=nats_creds, + nats_servers=indexing_settings.index_jetstream_servers, + nats_creds=indexing_settings.index_jetstream_auth, ) - self.subject = subject + self.subject = indexing_settings.index_nidx_subject async def initialize(self): await self.nats_connection_manager.initialize() + self.api_client = NidxApiStub(get_traced_grpc_channel(settings.nidx_api_address, "nidx_api")) + self.searcher_client = NidxSearcherStub( + get_traced_grpc_channel(settings.nidx_searcher_address, "nidx_searcher") + ) async def finalize(self): await self.nats_connection_manager.finalize() @@ -70,14 +131,20 @@ async def index(self, writer: IndexMessage) -> int: return res.seq -async def start_nidx_utility() -> NidxIndexer: - if indexing_settings.index_nidx_subject is None: - raise ValueError("nidx subject needed for nidx utility") - nidx_utility = NidxIndexer( - subject=indexing_settings.index_nidx_subject, - nats_creds=indexing_settings.index_jetstream_auth, - nats_servers=indexing_settings.index_jetstream_servers, - ) +async def start_nidx_utility() -> Optional[NidxUtility]: + if not NIDX_INSTALLED: + return None + + nidx = get_nidx() + if nidx: + logger.warn("nidx already initialized, will not reinitialize") + return nidx + + if settings.standalone_mode: + nidx_utility = NidxBindingUtility() + else: + nidx_utility = NidxServiceUtility() + await nidx_utility.initialize() set_utility(Utility.NIDX, nidx_utility) return nidx_utility @@ -90,23 +157,21 @@ async def stop_nidx_utility(): await nidx_utility.finalize() -def get_nidx() -> NidxIndexer: +def get_nidx() -> Optional[NidxUtility]: return get_utility(Utility.NIDX) -def get_nidx_api_client() -> Optional["AbstractIndexNode"]: - if settings.nidx_api: - from nucliadb.common.cluster.manager import IndexNode +def get_nidx_api_client() -> Optional[NidxApiStub]: + nidx = get_nidx() + if nidx: + return nidx.api_client + else: + return None - nidx_api = IndexNode( - id="nidx-api", - address=settings.nidx_api, - shard_count=0, - available_disk=0, - ) - return nidx_api - elif settings.standalone_mode and NIDX_BINDING is not None: - channel = get_traced_grpc_channel(f"localhost:{NIDX_BINDING.api_port}", "nidx_api") - return nidx_pb2_grpc.NidxApiStub(channel) + +def get_nidx_searcher_client() -> Optional[NidxSearcherStub]: + nidx = get_nidx() + if nidx: + return nidx.searcher_client else: return None diff --git a/nucliadb/src/nucliadb/ingest/app.py b/nucliadb/src/nucliadb/ingest/app.py index 669ea48daa..95ddd3a1c6 100644 --- a/nucliadb/src/nucliadb/ingest/app.py +++ b/nucliadb/src/nucliadb/ingest/app.py @@ -62,8 +62,7 @@ async def initialize() -> list[Callable[[], Awaitable[None]]]: if not cluster_settings.standalone_mode and indexing_settings.index_jetstream_servers is not None: await start_indexing_utility(SERVICE_NAME) - if indexing_settings.index_nidx_subject is not None: - await start_nidx_utility() + await start_nidx_utility() await start_audit_utility(SERVICE_NAME) diff --git a/nucliadb/src/nucliadb/ingest/orm/knowledgebox.py b/nucliadb/src/nucliadb/ingest/orm/knowledgebox.py index 810fa2add5..17240faf37 100644 --- a/nucliadb/src/nucliadb/ingest/orm/knowledgebox.py +++ b/nucliadb/src/nucliadb/ingest/orm/knowledgebox.py @@ -50,7 +50,7 @@ from nucliadb.ingest.orm.utils import choose_matryoshka_dimension, compute_paragraph_key from nucliadb.ingest.settings import settings from nucliadb.migrator.utils import get_latest_version -from nucliadb_protos import knowledgebox_pb2, nodewriter_pb2, writer_pb2 +from nucliadb_protos import knowledgebox_pb2, noderesources_pb2, nodewriter_pb2, writer_pb2 from nucliadb_protos.knowledgebox_pb2 import ( CreateExternalIndexProviderMetadata, ExternalIndexProviderType, @@ -305,7 +305,7 @@ async def delete(cls, driver: Driver, kbid: str): if nidx_api is not None: for shard in shards_obj.shards: shard_id = shard.shard - await nidx_api.delete_shard(shard_id) + await nidx_api.DeleteShard(noderesources_pb2.ShardId(id=shard_id)) if kb_config is not None: await cls._maybe_delete_external_indexes(kbid, kb_config.external_index_provider) diff --git a/nucliadb/src/nucliadb/standalone/config.py b/nucliadb/src/nucliadb/standalone/config.py index 4a997cc61c..a70231a859 100644 --- a/nucliadb/src/nucliadb/standalone/config.py +++ b/nucliadb/src/nucliadb/standalone/config.py @@ -54,8 +54,11 @@ def config_standalone_driver(nucliadb_args: Settings): # no driver specified, for standalone, we try to automate some settings here storage_settings.file_backend = FileBackendConfig.LOCAL - if storage_settings.file_backend == FileBackendConfig.LOCAL and storage_settings.local_files is None: - storage_settings.local_files = "./data/blob" + if storage_settings.file_backend == FileBackendConfig.LOCAL: + if storage_settings.local_files is None: + storage_settings.local_files = "./data/blob" + if storage_settings.local_indexing_bucket is None: + storage_settings.local_indexing_bucket = "indexer" if ingest_settings.driver_local_url is not None and not os.path.isdir( ingest_settings.driver_local_url From 1475e3be9b2b53f721c6eb81c37f2860e17b2617 Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Tue, 12 Nov 2024 12:19:44 +0100 Subject: [PATCH 4/9] Indexer and scheduler in binding --- ...d04f0d06a8acaed379eff0d23f3229edde9ee.json | 20 +++++ nidx/nidx_binding/src/lib.rs | 76 ++++++++++++++----- nidx/src/indexer.rs | 3 + nidx/src/maintenance/scheduler.rs | 53 +++++++++---- nidx/src/maintenance/worker.rs | 14 ++-- nidx/src/metadata.rs | 18 ++++- nucliadb/src/nucliadb/common/nidx.py | 2 +- 7 files changed, 144 insertions(+), 42 deletions(-) create mode 100644 nidx/.sqlx/query-8493bb0059b013eaca42fd10cd7d04f0d06a8acaed379eff0d23f3229edde9ee.json diff --git a/nidx/.sqlx/query-8493bb0059b013eaca42fd10cd7d04f0d06a8acaed379eff0d23f3229edde9ee.json b/nidx/.sqlx/query-8493bb0059b013eaca42fd10cd7d04f0d06a8acaed379eff0d23f3229edde9ee.json new file mode 100644 index 0000000000..3c4c8fd5d6 --- /dev/null +++ b/nidx/.sqlx/query-8493bb0059b013eaca42fd10cd7d04f0d06a8acaed379eff0d23f3229edde9ee.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT COALESCE(MAX(seq), 1) AS \"seq!\" FROM segments\n UNION\n SELECT COALESCE(MAX(seq), 1) AS \"seq!\" FROM deletions", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "seq!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + }, + "hash": "8493bb0059b013eaca42fd10cd7d04f0d06a8acaed379eff0d23f3229edde9ee" +} diff --git a/nidx/nidx_binding/src/lib.rs b/nidx/nidx_binding/src/lib.rs index 73e95049bb..3217a3bbda 100644 --- a/nidx/nidx_binding/src/lib.rs +++ b/nidx/nidx_binding/src/lib.rs @@ -1,17 +1,33 @@ +use nidx::maintenance::scheduler::{self, GetAckFloor}; +use nidx::maintenance::worker; +use pyo3::exceptions::PyException; use pyo3::prelude::*; use nidx::api::grpc::ApiServer; use nidx::grpc_server::GrpcServer; -use nidx::indexer::index_resource; +use nidx::indexer::{download_message, index_resource}; use nidx::searcher::grpc::SearchServer; use nidx::searcher::SyncedSearcher; use nidx::settings::EnvSettings; use nidx::Settings; -use nidx_protos::Resource; +use nidx_protos::prost::*; +use nidx_protos::IndexMessage; use std::collections::HashMap; use std::path::Path; +use std::sync::atomic::AtomicI64; +use std::sync::Arc; use tokio::runtime::Runtime; +#[derive(Clone)] +struct SeqSource(Arc); + +impl GetAckFloor for SeqSource { + async fn get(&mut self) -> anyhow::Result { + let seq = self.0.load(std::sync::atomic::Ordering::Relaxed); + Ok(seq) + } +} + #[pyclass] pub struct NidxBinding { #[pyo3(get)] @@ -19,7 +35,7 @@ pub struct NidxBinding { #[pyo3(get)] api_port: u16, settings: Settings, - seq: i64, + seq: SeqSource, runtime: Option, } @@ -34,6 +50,32 @@ impl NidxBinding { binding.runtime = Some(rt); binding } + + pub fn index(&mut self, bytes: Vec) -> PyResult { + // TODO: Can this be simplified into the indexer code? + let msg = IndexMessage::decode(&bytes[..]).unwrap(); + + let seq = self.seq.0.load(std::sync::atomic::Ordering::Relaxed); + let object_store = self.settings.indexer.as_ref().unwrap().object_store.clone(); + let result = self.runtime.as_ref().unwrap().block_on(async { + let resource = download_message(object_store, &msg.storage_key).await?; + index_resource( + &self.settings.metadata, + self.settings.storage.as_ref().unwrap().object_store.clone(), + &msg.shard, + resource, + seq.into(), + ) + .await + }); + + // Always increment seq, even on failure + self.seq.0.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + result.map_err(|e| PyException::new_err(format!("Error indexing {e}")))?; + + Ok(seq) + } } impl NidxBinding { @@ -57,27 +99,27 @@ impl NidxBinding { async move { searcher.run(settings_copy.storage.as_ref().unwrap().object_store.clone()).await }, ); + // Scheduler + let seq = SeqSource(Arc::new(settings.metadata.max_seq().await?.into())); + tokio::task::spawn(scheduler::run_tasks( + settings.metadata.clone(), + settings.storage.as_ref().unwrap().object_store.clone(), + settings.merge.clone(), + seq.clone(), + )); + + // Worker + let settings_copy = settings.clone(); + tokio::task::spawn(worker::run(settings_copy)); + Ok(NidxBinding { searcher_port, api_port, settings, - seq: 1, + seq, runtime: None, }) } - - pub async fn index_resource(&mut self, shard_id: &str, resource: Resource) -> anyhow::Result<()> { - index_resource( - &self.settings.metadata, - self.settings.storage.as_ref().unwrap().object_store.clone(), - shard_id, - resource, - self.seq.into(), - ) - .await?; - self.seq += 1; - Ok(()) - } } /// A Python module implemented in Rust. diff --git a/nidx/src/indexer.rs b/nidx/src/indexer.rs index f41f88f4d3..1067aa4997 100644 --- a/nidx/src/indexer.rs +++ b/nidx/src/indexer.rs @@ -70,6 +70,7 @@ pub async fn run(settings: Settings) -> anyhow::Result<()> { continue; }; + // TODO: Implement deletions let resource = match download_message(indexer_storage.clone(), &index_message.storage_key).await { Ok(r) => r, Err(e) => { @@ -88,6 +89,8 @@ pub async fn run(settings: Settings) -> anyhow::Result<()> { warn!("Error processing index message {e:?}") } }; + + // TODO: Delete indexer message on success } Ok(()) diff --git a/nidx/src/maintenance/scheduler.rs b/nidx/src/maintenance/scheduler.rs index b99dfab4fa..33a17979f4 100644 --- a/nidx/src/maintenance/scheduler.rs +++ b/nidx/src/maintenance/scheduler.rs @@ -18,7 +18,7 @@ // along with this program. If not, see . // -use std::{sync::Arc, time::Duration}; +use std::{future::Future, sync::Arc, time::Duration}; use async_nats::jetstream::consumer::PullConsumer; use futures::StreamExt; @@ -35,13 +35,36 @@ use crate::{ pub async fn run(settings: Settings) -> anyhow::Result<()> { let indexer_settings = settings.indexer.as_ref().unwrap(); - let merger_settings = settings.merge.clone(); + let storage_settings = settings.indexer.as_ref().unwrap(); + let merge_settings = settings.merge.clone(); let meta = settings.metadata.clone(); let client = async_nats::connect(&indexer_settings.nats_server).await?; let jetstream = async_nats::jetstream::new(client); - let mut consumer: PullConsumer = jetstream.get_consumer_from_stream("nidx", "nidx").await?; + let consumer: PullConsumer = jetstream.get_consumer_from_stream("nidx", "nidx").await?; + run_tasks(meta, storage_settings.object_store.clone(), merge_settings, NatsAckFloor(consumer)).await +} + +#[derive(Clone)] +struct NatsAckFloor(PullConsumer); + +impl GetAckFloor for NatsAckFloor { + async fn get(&mut self) -> anyhow::Result { + Ok(self.0.info().await?.ack_floor.stream_sequence as i64) + } +} + +pub trait GetAckFloor { + fn get(&mut self) -> impl Future> + Send; +} + +pub async fn run_tasks( + meta: NidxMetadata, + storage: Arc, + merge_settings: MergeSettings, + mut ack_floor: impl GetAckFloor + Clone + Send + 'static, +) -> anyhow::Result<()> { let mut tasks = JoinSet::new(); let meta2 = meta.clone(); @@ -55,7 +78,7 @@ pub async fn run(settings: Settings) -> anyhow::Result<()> { }); let meta2 = meta.clone(); - let storage = indexer_settings.object_store.clone(); + let storage = storage.clone(); tasks.spawn(async move { loop { if let Err(e) = purge_segments(&meta2, &storage).await { @@ -76,12 +99,11 @@ pub async fn run(settings: Settings) -> anyhow::Result<()> { }); let meta2 = meta.clone(); - let mut consumer2 = consumer.clone(); + let mut ack_floor_copy = ack_floor.clone(); tasks.spawn(async move { loop { - match consumer2.info().await { - Ok(consumer_info) => { - let oldest_confirmed_seq = consumer_info.ack_floor.stream_sequence; + match ack_floor_copy.get().await { + Ok(oldest_confirmed_seq) => { let oldest_pending_seq = oldest_confirmed_seq + 1; if let Err(e) = purge_deletions(&meta2, oldest_pending_seq).await { warn!(?e, "Error in purge_deletions task"); @@ -96,11 +118,10 @@ pub async fn run(settings: Settings) -> anyhow::Result<()> { }); tasks.spawn(async move { - let merge_scheduler = MergeScheduler::from_settings(&merger_settings); + let merge_scheduler = MergeScheduler::from_settings(&merge_settings); loop { - match consumer.info().await { - Ok(consumer_info) => { - let oldest_confirmed_seq = consumer_info.ack_floor.stream_sequence; + match ack_floor.get().await { + Ok(oldest_confirmed_seq) => { if let Err(e) = merge_scheduler.schedule_merges(&meta, Seq::from(oldest_confirmed_seq)).await { warn!(?e, "Error in schedule_merges task"); } @@ -113,8 +134,8 @@ pub async fn run(settings: Settings) -> anyhow::Result<()> { } }); - tasks.join_next().await; - println!("A task finished, exiting"); + let task = tasks.join_next().await; + error!(?task, "A scheduling task finished, exiting"); Ok(()) } @@ -172,7 +193,7 @@ pub async fn purge_segments(meta: &NidxMetadata, storage: &Arc) Ok(()) } -pub async fn purge_deletions(meta: &NidxMetadata, oldest_pending_seq: u64) -> anyhow::Result<()> { +pub async fn purge_deletions(meta: &NidxMetadata, oldest_pending_seq: i64) -> anyhow::Result<()> { // Purge deletions that don't apply to any segment and won't apply to any // segment pending to process sqlx::query!( @@ -185,7 +206,7 @@ pub async fn purge_deletions(meta: &NidxMetadata, oldest_pending_seq: u64) -> an WHERE deletions.index_id = oldest_segments.index_id AND deletions.seq <= oldest_segments.seq AND deletions.seq <= $1", - oldest_pending_seq as i64 + oldest_pending_seq ) .execute(&meta.pool) .await?; diff --git a/nidx/src/maintenance/worker.rs b/nidx/src/maintenance/worker.rs index e9f71d8d09..5b024685ff 100644 --- a/nidx/src/maintenance/worker.rs +++ b/nidx/src/maintenance/worker.rs @@ -21,7 +21,11 @@ use std::{path::PathBuf, sync::Arc, time::Duration}; use anyhow::anyhow; +use nidx_paragraph::ParagraphIndexer; +use nidx_relation::RelationIndexer; +use nidx_text::TextIndexer; use nidx_types::{OpenIndexMetadata, SegmentMetadata, Seq}; +use nidx_vector::VectorIndexer; use object_store::DynObjectStore; use serde::Deserialize; use tempfile::tempdir; @@ -125,12 +129,12 @@ pub async fn run_job(meta: &NidxMetadata, job: &MergeJob, storage: Arc nidx_vector::VectorIndexer.merge(&work_path, index_config, merge_inputs).map(|x| x.into()), - IndexKind::Text => nidx_text::TextIndexer.merge(&work_path, merge_inputs).map(|x| x.into()), - IndexKind::Paragraph => nidx_paragraph::ParagraphIndexer.merge(&work_path, merge_inputs).map(|x| x.into()), - IndexKind::Relation => nidx_relation::RelationIndexer.merge(&work_path, merge_inputs).map(|x| x.into()), + IndexKind::Vector => VectorIndexer.merge(&work_path, vector_config.unwrap(), merge_inputs).map(|x| x.into()), + IndexKind::Text => TextIndexer.merge(&work_path, merge_inputs).map(|x| x.into()), + IndexKind::Paragraph => ParagraphIndexer.merge(&work_path, merge_inputs).map(|x| x.into()), + IndexKind::Relation => RelationIndexer.merge(&work_path, merge_inputs).map(|x| x.into()), }) .await??; diff --git a/nidx/src/metadata.rs b/nidx/src/metadata.rs index 32be7138e8..8f305e7f88 100644 --- a/nidx/src/metadata.rs +++ b/nidx/src/metadata.rs @@ -41,14 +41,14 @@ pub struct NidxMetadata { } impl NidxMetadata { - pub async fn new(database_url: &str) -> Result { + pub async fn new(database_url: &str) -> sqlx::Result { let pool = sqlx::postgres::PgPoolOptions::new().acquire_timeout(Duration::from_secs(2)).connect(database_url).await?; Self::new_with_pool(pool).await } - pub async fn new_with_pool(pool: sqlx::PgPool) -> Result { + pub async fn new_with_pool(pool: sqlx::PgPool) -> sqlx::Result { // Run migrations inside a transaction that holds a global lock, avoids races let mut tx = pool.begin().await?; sqlx::query!("SELECT pg_advisory_xact_lock($1)", MIGRATION_LOCK_ID).execute(&mut *tx).await?; @@ -60,9 +60,21 @@ impl NidxMetadata { }) } - pub async fn transaction(&self) -> Result, sqlx::Error> { + pub async fn transaction(&self) -> sqlx::Result> { self.pool.begin().await } + + /// Used by binding to insert in seq order (we don't have NATS to keep sequence) + pub async fn max_seq(&self) -> sqlx::Result { + let seqs = sqlx::query_scalar!( + r#"SELECT COALESCE(MAX(seq), 1) AS "seq!" FROM segments + UNION + SELECT COALESCE(MAX(seq), 1) AS "seq!" FROM deletions"# + ) + .fetch_all(&self.pool) + .await?; + Ok(*seqs.iter().max().unwrap_or(&1)) + } } #[cfg(test)] diff --git a/nucliadb/src/nucliadb/common/nidx.py b/nucliadb/src/nucliadb/common/nidx.py index d8ce5a81ff..fe1fe02e6c 100644 --- a/nucliadb/src/nucliadb/common/nidx.py +++ b/nucliadb/src/nucliadb/common/nidx.py @@ -93,7 +93,7 @@ async def finalize(self): del self.binding async def index(self, msg: IndexMessage) -> int: - raise "Not implemented yet" + return self.binding.index(msg.SerializeToString()) class NidxServiceUtility(NidxUtility): From bfdf67c4e16dfe615791c55815b82f68334a8f8a Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Tue, 12 Nov 2024 12:34:38 +0100 Subject: [PATCH 5/9] Use nidx for search --- nucliadb/src/nucliadb/common/nidx.py | 41 +++++++++++++++++-- .../src/nucliadb/search/requesters/utils.py | 21 +++++----- 2 files changed, 48 insertions(+), 14 deletions(-) diff --git a/nucliadb/src/nucliadb/common/nidx.py b/nucliadb/src/nucliadb/common/nidx.py index fe1fe02e6c..2b52d3830d 100644 --- a/nucliadb/src/nucliadb/common/nidx.py +++ b/nucliadb/src/nucliadb/common/nidx.py @@ -21,6 +21,7 @@ import os from typing import Optional +from nucliadb.common.cluster.base import AbstractIndexNode from nucliadb.common.cluster.settings import settings from nucliadb.ingest.settings import DriverConfig from nucliadb.ingest.settings import settings as ingest_settings @@ -46,9 +47,14 @@ class NidxUtility: api_client = None searcher_client = None - async def initialize(self): ... - async def finalize(self): ... - async def index(self, msg: IndexMessage) -> int: ... + async def initialize(self): + raise NotImplementedError() + + async def finalize(self): + raise NotImplementedError() + + async def index(self, msg: IndexMessage) -> int: + raise NotImplementedError() class NidxBindingUtility(NidxUtility): @@ -140,6 +146,7 @@ async def start_nidx_utility() -> Optional[NidxUtility]: logger.warn("nidx already initialized, will not reinitialize") return nidx + nidx_utility: NidxUtility if settings.standalone_mode: nidx_utility = NidxBindingUtility() else: @@ -175,3 +182,31 @@ def get_nidx_searcher_client() -> Optional[NidxSearcherStub]: return nidx.searcher_client else: return None + + +class FakeNode(AbstractIndexNode): + def __init__(self, searcher_client): + self.client = searcher_client + + @property + def reader(self): + return self.client + + @property + def writer(self): + return None + + def is_read_replica(_): + return False + + @property + def id(self): + return "nidx" + + +def get_nidx_fake_node() -> Optional[FakeNode]: + nidx = get_nidx_searcher_client() + if nidx: + return FakeNode(nidx) + else: + return None diff --git a/nucliadb/src/nucliadb/search/requesters/utils.py b/nucliadb/src/nucliadb/search/requesters/utils.py index b304eef160..7ef287736b 100644 --- a/nucliadb/src/nucliadb/search/requesters/utils.py +++ b/nucliadb/src/nucliadb/search/requesters/utils.py @@ -31,6 +31,7 @@ from nucliadb.common.cluster.base import AbstractIndexNode from nucliadb.common.cluster.exceptions import ShardsNotFound from nucliadb.common.cluster.utils import get_shard_manager +from nucliadb.common.nidx import get_nidx_fake_node from nucliadb.search import logger from nucliadb.search.search.shards import ( query_paragraph_shard, @@ -166,17 +167,15 @@ async def node_query( ) # nidx testing - if nidx and settings.nidx_address: - from nucliadb.common.cluster.index_node import IndexNode - - node = IndexNode( - id="nidx", - address=settings.nidx_address, - shard_count=0, - available_disk=0, - dummy=False, - primary_id=None, - ) + if nidx: + fake_node = get_nidx_fake_node() + if fake_node is None: + raise HTTPException( + status_code=500, + detail="nidx not available", + ) + node = fake_node + shard_id = shard_obj.shard except KeyError: incomplete_results = True From 8c06ffe9ee60c6010a6b333ce0399ba8ecb1ce6b Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Tue, 12 Nov 2024 12:40:56 +0100 Subject: [PATCH 6/9] trigger ci --- nidx/src/metadata.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nidx/src/metadata.rs b/nidx/src/metadata.rs index 8f305e7f88..caa00f922b 100644 --- a/nidx/src/metadata.rs +++ b/nidx/src/metadata.rs @@ -64,7 +64,7 @@ impl NidxMetadata { self.pool.begin().await } - /// Used by binding to insert in seq order (we don't have NATS to keep sequence) + /// Used by nidx_binding to insert in seq order (we don't have NATS to keep sequence) pub async fn max_seq(&self) -> sqlx::Result { let seqs = sqlx::query_scalar!( r#"SELECT COALESCE(MAX(seq), 1) AS "seq!" FROM segments From 20729d7083dc00ba50563d68a4c3dce6abc15dfa Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Tue, 12 Nov 2024 14:14:43 +0100 Subject: [PATCH 7/9] Update nucliadb/src/nucliadb/common/cluster/settings.py Co-authored-by: Joan Antoni RE --- nucliadb/src/nucliadb/common/cluster/settings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nucliadb/src/nucliadb/common/cluster/settings.py b/nucliadb/src/nucliadb/common/cluster/settings.py index 2b2b400dd1..c417046fb7 100644 --- a/nucliadb/src/nucliadb/common/cluster/settings.py +++ b/nucliadb/src/nucliadb/common/cluster/settings.py @@ -87,7 +87,7 @@ class Settings(BaseSettings): cluster_discovery_manual_addresses: list[str] = [] nidx_api_address: Optional[str] = Field(default=None, description="NIDX gRPC API address") - nidx_searcher_address: Optional[str] = Field(default=None, description="NIDX gRPC API address") + nidx_searcher_address: Optional[str] = Field(default=None, description="NIDX gRPC searcher API address") settings = Settings() From 8143ff055f4af71a21ef7be33ab894d13522f34b Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Tue, 12 Nov 2024 16:20:49 +0100 Subject: [PATCH 8/9] No nidx in CI --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4a26790478..d10aff72af 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -118,7 +118,7 @@ jobs: python-version: "3.9.20" cache: true - name: Install venv - run: pdm sync -d --clean --no-editable + run: pdm sync -d --clean --no-editable --without nidx - name: Clean build run: find . -name build | xargs rm -r || true - name: Lint source code @@ -209,7 +209,7 @@ jobs: python-version: "3.12.5" cache: true - name: Install venv - run: pdm sync -d --clean --no-editable + run: pdm sync -d --clean --no-editable --without nidx - name: Upload virtualenv to cache uses: actions/cache/save@v4 with: From 5fd52c93efb3dfe7f9e37f225d67275048c25c32 Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Tue, 12 Nov 2024 16:26:50 +0100 Subject: [PATCH 9/9] Optional nidx --- nucliadb/src/nucliadb/common/nidx.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nucliadb/src/nucliadb/common/nidx.py b/nucliadb/src/nucliadb/common/nidx.py index 2b52d3830d..fb24a90506 100644 --- a/nucliadb/src/nucliadb/common/nidx.py +++ b/nucliadb/src/nucliadb/common/nidx.py @@ -168,7 +168,7 @@ def get_nidx() -> Optional[NidxUtility]: return get_utility(Utility.NIDX) -def get_nidx_api_client() -> Optional[NidxApiStub]: +def get_nidx_api_client() -> Optional["NidxApiStub"]: nidx = get_nidx() if nidx: return nidx.api_client @@ -176,7 +176,7 @@ def get_nidx_api_client() -> Optional[NidxApiStub]: return None -def get_nidx_searcher_client() -> Optional[NidxSearcherStub]: +def get_nidx_searcher_client() -> Optional["NidxSearcherStub"]: nidx = get_nidx() if nidx: return nidx.searcher_client