diff --git a/nidx/.sqlx/query-ef25d1b091b8edce55dcd7f0a2f528537cd9329ab7297b81ace0aba8c0e1bb0e.json b/nidx/.sqlx/query-ef25d1b091b8edce55dcd7f0a2f528537cd9329ab7297b81ace0aba8c0e1bb0e.json new file mode 100644 index 0000000000..3b71f18da5 --- /dev/null +++ b/nidx/.sqlx/query-ef25d1b091b8edce55dcd7f0a2f528537cd9329ab7297b81ace0aba8c0e1bb0e.json @@ -0,0 +1,40 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT kind as \"kind: IndexKind\", SUM(records)::bigint as \"records!\" FROM indexes\n JOIN segments ON index_id = indexes.id\n WHERE shard_id = $1\n GROUP BY kind", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "kind: IndexKind", + "type_info": { + "Custom": { + "name": "index_kind", + "kind": { + "Enum": [ + "text", + "paragraph", + "vector", + "relation" + ] + } + } + } + }, + { + "ordinal": 1, + "name": "records!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + null + ] + }, + "hash": "ef25d1b091b8edce55dcd7f0a2f528537cd9329ab7297b81ace0aba8c0e1bb0e" +} diff --git a/nidx/.sqlx/query-d1db2d4b7de69471692f5f76f931e16de56ea704d4b1c8784dafcce996983342.json b/nidx/.sqlx/query-ef56d5fefc5774040d1ee397beadb475f6af02768c22f0e583c74062e2e821ce.json similarity index 76% rename from nidx/.sqlx/query-d1db2d4b7de69471692f5f76f931e16de56ea704d4b1c8784dafcce996983342.json rename to nidx/.sqlx/query-ef56d5fefc5774040d1ee397beadb475f6af02768c22f0e583c74062e2e821ce.json index df1200d076..cb6ea686a3 100644 --- a/nidx/.sqlx/query-d1db2d4b7de69471692f5f76f931e16de56ea704d4b1c8784dafcce996983342.json +++ b/nidx/.sqlx/query-ef56d5fefc5774040d1ee397beadb475f6af02768c22f0e583c74062e2e821ce.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT * FROM shards WHERE id = $1", + "query": "SELECT * FROM shards WHERE id = $1 AND deleted_at IS NULL", "describe": { "columns": [ { @@ -30,5 +30,5 @@ true ] }, - "hash": "d1db2d4b7de69471692f5f76f931e16de56ea704d4b1c8784dafcce996983342" + "hash": "ef56d5fefc5774040d1ee397beadb475f6af02768c22f0e583c74062e2e821ce" } diff --git a/nidx/Cargo.lock b/nidx/Cargo.lock index a64c0ef48f..460ae517e2 100644 --- a/nidx/Cargo.lock +++ b/nidx/Cargo.lock @@ -95,7 +95,7 @@ dependencies = [ "serde_json", "serde_nanos", "serde_repr", - "thiserror", + "thiserror 1.0.64", "time", "tokio", "tokio-rustls", @@ -1376,6 +1376,7 @@ dependencies = [ "base64", "config", "futures", + "http", "lru", "nidx_paragraph", "nidx_protos", @@ -1391,9 +1392,11 @@ dependencies = [ "sqlx", "tar", "tempfile", + "thiserror 2.0.0", "tokio", "tokio-util", "tonic", + "tower 0.5.1", "tracing", "tracing-subscriber", "uuid", @@ -1500,7 +1503,7 @@ dependencies = [ "serde", "simsimd", "tempfile", - "thiserror", + "thiserror 1.0.64", "tracing", "uuid", ] @@ -1926,7 +1929,7 @@ dependencies = [ "rustc-hash 2.0.0", "rustls", "socket2", - "thiserror", + "thiserror 1.0.64", "tokio", "tracing", ] @@ -1943,7 +1946,7 @@ dependencies = [ "rustc-hash 2.0.0", "rustls", "slab", - "thiserror", + "thiserror 1.0.64", "tinyvec", "tracing", ] @@ -2652,7 +2655,7 @@ dependencies = [ "sha2", "smallvec", "sqlformat", - "thiserror", + "thiserror 1.0.64", "time", "tokio", "tokio-stream", @@ -2737,7 +2740,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror", + "thiserror 1.0.64", "time", "tracing", "uuid", @@ -2777,7 +2780,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror", + "thiserror 1.0.64", "time", "tracing", "uuid", @@ -2834,9 +2837,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.85" +version = "2.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5023162dfcd14ef8f32034d8bcd4cc5ddc61ef7a247c024a33e24e1f24d21b56" +checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" dependencies = [ "proc-macro2", "quote", @@ -2903,7 +2906,7 @@ dependencies = [ "tantivy-stacker", "tantivy-tokenizer-api", "tempfile", - "thiserror", + "thiserror 1.0.64", "time", "uuid", "winapi", @@ -3029,7 +3032,16 @@ version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.64", +] + +[[package]] +name = "thiserror" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15291287e9bff1bc6f9ff3409ed9af665bec7a5fc8ac079ea96be07bca0e2668" +dependencies = [ + "thiserror-impl 2.0.0", ] [[package]] @@ -3043,6 +3055,17 @@ dependencies = [ "syn", ] +[[package]] +name = "thiserror-impl" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22efd00f33f93fa62848a7cab956c3d38c8d43095efda1decfc2b3a5dc0b8972" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.8" diff --git a/nidx/Cargo.toml b/nidx/Cargo.toml index 21bd3620b3..9d014f7840 100644 --- a/nidx/Cargo.toml +++ b/nidx/Cargo.toml @@ -31,6 +31,9 @@ nidx_text = { version = "0.1.0", path = "nidx_text" } nidx_paragraph = { version = "0.1.0", path = "nidx_paragraph" } nidx_relation = { version = "0.1.0", path = "nidx_relation" } config = { version = "0.14.1", default-features = false } +thiserror = "2.0.0" +tower = "0.5.1" +http = "1.1.0" [dev-dependencies] nidx_tests = { path = "nidx_tests" } diff --git a/nidx/nidx_protos/build.rs b/nidx/nidx_protos/build.rs index 0f41894b03..2635fb240c 100644 --- a/nidx/nidx_protos/build.rs +++ b/nidx/nidx_protos/build.rs @@ -22,13 +22,15 @@ use std::io::Result; fn main() -> Result<()> { println!("cargo:rerun-if-changed=../../nucliadb_protos"); + println!("cargo:rerun-if-changed=src"); tonic_build::configure().emit_rerun_if_changed(false).compile_protos( &[ + "src/nidx.proto", "../../nucliadb_protos/nodereader.proto", "../../nucliadb_protos/noderesources.proto", "../../nucliadb_protos/nodewriter.proto", ], - &["../../"], + &["../../", "src"], )?; Ok(()) } diff --git a/nidx/nidx_protos/src/lib.rs b/nidx/nidx_protos/src/lib.rs index 582ade3b25..638f824d78 100644 --- a/nidx/nidx_protos/src/lib.rs +++ b/nidx/nidx_protos/src/lib.rs @@ -34,6 +34,10 @@ pub mod nodewriter { tonic::include_proto!("nodewriter"); } +pub mod nidx { + tonic::include_proto!("nidx"); +} + pub use nodereader::*; pub use noderesources::*; pub use nodewriter::*; diff --git a/nidx/nidx_protos/src/nidx.proto b/nidx/nidx_protos/src/nidx.proto new file mode 100644 index 0000000000..d2b652068f --- /dev/null +++ b/nidx/nidx_protos/src/nidx.proto @@ -0,0 +1,49 @@ +syntax = "proto3"; + +package nidx; + +import "nucliadb_protos/nodereader.proto"; +import "nucliadb_protos/nodewriter.proto"; + +service NidxApi { + rpc GetShard(nodereader.GetShardRequest) returns (noderesources.Shard) {} + + // rpc GetShardFiles(GetShardFilesRequest) returns (ShardFileList) {}; + // rpc DownloadShardFile(DownloadShardFileRequest) returns (stream + // ShardFileChunk) {}; + + rpc NewShard(nodewriter.NewShardRequest) returns (noderesources.ShardCreated) {} + rpc DeleteShard(noderesources.ShardId) returns (noderesources.ShardId) {} + rpc ListShards(noderesources.EmptyQuery) returns (noderesources.ShardIds) {} + // rpc GC(noderesources.ShardId) returns (GarbageCollectorResponse) {} + // rpc Merge(noderesources.ShardId) returns (MergeResponse) {} + + // rpc SetResource(noderesources.Resource) returns (OpStatus) {} + // rpc SetResourceFromStorage(IndexMessage) returns (OpStatus) {} + // rpc RemoveResource(noderesources.ResourceID) returns (OpStatus) {} + + rpc AddVectorSet(nodewriter.NewVectorSetRequest) returns (nodewriter.OpStatus) {} // New + rpc RemoveVectorSet(noderesources.VectorSetID) returns (nodewriter.OpStatus) {} + rpc ListVectorSets(noderesources.ShardId) returns (noderesources.VectorSetList) {} + + rpc GetMetadata(noderesources.EmptyQuery) returns (noderesources.NodeMetadata) {} +} + +service NidxSearcher { + rpc DocumentSearch(nodereader.DocumentSearchRequest) returns (nodereader.DocumentSearchResponse) {} + rpc ParagraphSearch(nodereader.ParagraphSearchRequest) returns (nodereader.ParagraphSearchResponse) {} + rpc VectorSearch(nodereader.VectorSearchRequest) returns (nodereader.VectorSearchResponse) {} + rpc RelationSearch(nodereader.RelationSearchRequest) returns (nodereader.RelationSearchResponse) {} + rpc DocumentIds(noderesources.ShardId) returns (nodereader.IdCollection) {} + rpc ParagraphIds(noderesources.ShardId) returns (nodereader.IdCollection) {} + rpc VectorIds(noderesources.VectorSetID) returns (nodereader.IdCollection) {} + rpc RelationIds(noderesources.ShardId) returns (nodereader.IdCollection) {} + rpc RelationEdges(noderesources.ShardId) returns (nodereader.EdgeList) {} + + rpc Search(nodereader.SearchRequest) returns (nodereader.SearchResponse) {} + rpc Suggest(nodereader.SuggestRequest) returns (nodereader.SuggestResponse) {} + + // Streams + rpc Paragraphs(nodereader.StreamRequest) returns (stream nodereader.ParagraphItem) {} + rpc Documents(nodereader.StreamRequest) returns (stream nodereader.DocumentItem) {} +} diff --git a/nidx/src/api/grpc.rs b/nidx/src/api/grpc.rs index 24644c1cad..5374e538e4 100644 --- a/nidx/src/api/grpc.rs +++ b/nidx/src/api/grpc.rs @@ -21,13 +21,14 @@ use std::collections::HashMap; use std::str::FromStr; -use nidx_protos::node_writer_server::NodeWriter; +use crate::errors::NidxError; +use crate::grpc_server::RemappedGrpcService; +use crate::metadata::{IndexKind, Shard}; +use nidx_protos::nidx::nidx_api_server::*; use nidx_protos::*; use nidx_vector::config::VectorConfig; -use node_writer_server::NodeWriterServer; -use tonic::transport::server::Router; -use tonic::transport::Server; -use tonic::{Request, Response, Status}; +use tonic::service::Routes; +use tonic::{Request, Response, Result, Status}; use uuid::Uuid; use crate::api::shards; @@ -44,26 +45,46 @@ impl ApiServer { } } - pub fn into_service(self) -> Router { - Server::builder().add_service(NodeWriterServer::new(self)) + pub fn into_service(self) -> RemappedGrpcService { + RemappedGrpcService { + routes: Routes::new(NidxApiServer::new(self)), + package: "nidx.NidxApi".to_string(), + } } } #[tonic::async_trait] -impl NodeWriter for ApiServer { - async fn new_shard(&self, request: Request) -> Result, Status> { +impl NidxApi for ApiServer { + async fn get_shard(&self, request: Request) -> Result> { + let request = request.into_inner(); + let shard_id = request.shard_id.ok_or(Status::invalid_argument("Shard ID required"))?.id; + let shard_id = Uuid::parse_str(&shard_id).map_err(NidxError::from)?; + + let shard = Shard::get(&self.meta.pool, shard_id).await.map_err(NidxError::from)?; + let index_stats = shard.stats(&self.meta.pool).await.map_err(NidxError::from)?; + + Ok(Response::new(noderesources::Shard { + metadata: Some(ShardMetadata { + kbid: shard.kbid.to_string(), + release_channel: 0, + }), + shard_id: shard_id.to_string(), + fields: *index_stats.get(&IndexKind::Text).unwrap_or(&0) as u64, + paragraphs: *index_stats.get(&IndexKind::Paragraph).unwrap_or(&0) as u64, + sentences: *index_stats.get(&IndexKind::Vector).unwrap_or(&0) as u64, + })) + } + async fn new_shard(&self, request: Request) -> Result> { // TODO? analytics event let request = request.into_inner(); - let kbid = Uuid::from_str(&request.kbid).map_err(|e| Status::internal(e.to_string()))?; + let kbid = Uuid::from_str(&request.kbid).map_err(NidxError::from)?; let mut vector_configs = HashMap::with_capacity(request.vectorsets_configs.len()); for (vectorset_id, config) in request.vectorsets_configs { vector_configs .insert(vectorset_id, VectorConfig::try_from(config).map_err(|e| Status::internal(e.to_string()))?); } - let shard = shards::create_shard(&self.meta, kbid, vector_configs) - .await - .map_err(|e| Status::internal(e.to_string()))?; + let shard = shards::create_shard(&self.meta, kbid, vector_configs).await.map_err(NidxError::from)?; Ok(Response::new(ShardCreated { id: shard.id.to_string(), @@ -72,56 +93,44 @@ impl NodeWriter for ApiServer { })) } - async fn delete_shard(&self, request: Request) -> Result, Status> { + async fn delete_shard(&self, request: Request) -> Result> { // TODO? analytics event let request = request.into_inner(); - let shard_id = Uuid::from_str(&request.id).map_err(|e| Status::internal(e.to_string()))?; + let shard_id = Uuid::from_str(&request.id).map_err(NidxError::from)?; - shards::delete_shard(&self.meta, shard_id).await.map_err(|e| Status::internal(e.to_string()))?; + shards::delete_shard(&self.meta, shard_id).await?; Ok(Response::new(ShardId { id: shard_id.to_string(), })) } - async fn list_shards(&self, _request: Request) -> Result, Status> { - todo!() - } - - async fn set_resource(&self, _request: Request) -> Result, Status> { - unimplemented!("Use indexer service instead") - } - - async fn set_resource_from_storage(&self, _request: Request) -> Result, Status> { - unimplemented!("Use indexer service instead") - } - - async fn remove_resource(&self, _request: Request) -> Result, Status> { - unimplemented!("Use indexer service instead") + async fn list_shards(&self, _request: Request) -> Result> { + let ids = Shard::list_ids(&self.meta.pool).await.map_err(NidxError::from)?; + Ok(Response::new(ShardIds { + ids: ids + .iter() + .map(|x| ShardId { + id: x.to_string(), + }) + .collect(), + })) } - async fn add_vector_set(&self, _request: Request) -> Result, Status> { + async fn add_vector_set(&self, _request: Request) -> Result> { todo!() } - async fn remove_vector_set(&self, _request: Request) -> Result, Status> { + async fn remove_vector_set(&self, _request: Request) -> Result> { todo!() } - async fn list_vector_sets(&self, _request: Request) -> Result, Status> { + async fn list_vector_sets(&self, _request: Request) -> Result> { todo!() } - async fn get_metadata(&self, _request: Request) -> Result, Status> { + async fn get_metadata(&self, _request: Request) -> Result> { // TODO Ok(Response::new(NodeMetadata::default())) } - - async fn gc(&self, _request: Request) -> Result, Status> { - unimplemented!("Garbage collection is done by the scheduler service") - } - - async fn merge(&self, _request: Request) -> Result, Status> { - unimplemented!("Merging is done by scheduler and worker services") - } } diff --git a/nidx/src/api/shards.rs b/nidx/src/api/shards.rs index 8cceb6439a..e433d33993 100644 --- a/nidx/src/api/shards.rs +++ b/nidx/src/api/shards.rs @@ -20,10 +20,10 @@ use std::collections::HashMap; -use anyhow::anyhow; use nidx_vector::config::VectorConfig; use uuid::Uuid; +use crate::errors::{NidxError, NidxResult}; use crate::metadata::{Index, IndexConfig, MergeJob, Segment, Shard}; use crate::NidxMetadata; @@ -31,9 +31,9 @@ pub async fn create_shard( meta: &NidxMetadata, kbid: Uuid, vector_configs: HashMap, -) -> anyhow::Result { +) -> NidxResult { if vector_configs.is_empty() { - return Err(anyhow!("Can't create shard without a vector index")); + return Err(NidxError::invalid("Can't create shard without a vector index")); } let mut tx = meta.transaction().await?; @@ -53,7 +53,7 @@ pub async fn create_shard( /// Mark a shard, its indexes and segments for eventual deletion. Delete merge /// jobs scheduled for its indexes, as we don't want to keep working on it. /// Segment deletions will be purged eventually by the worker. -pub async fn delete_shard(meta: &NidxMetadata, shard_id: Uuid) -> anyhow::Result<()> { +pub async fn delete_shard(meta: &NidxMetadata, shard_id: Uuid) -> NidxResult<()> { let mut tx = meta.transaction().await?; let shard = match Shard::get(&mut *tx, shard_id).await { Ok(shard) => shard, diff --git a/nidx/src/errors.rs b/nidx/src/errors.rs new file mode 100644 index 0000000000..4b2b5fe6fd --- /dev/null +++ b/nidx/src/errors.rs @@ -0,0 +1,67 @@ +// Copyright (C) 2021 Bosutech XXI S.L. +// +// nucliadb is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at info@nuclia.com. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . +// + +use thiserror::Error; + +pub type NidxResult = Result; + +/// Error type for nidx. The idea is not to be exhaustive, but just include +/// enough variants to be able to map application errors to API errors +#[derive(Error, Debug)] +pub enum NidxError { + #[error("Not found")] + NotFound, + #[error("Invalid request: {0}")] + InvalidRequest(String), + #[error("Invalid uuid: {0}")] + InvalidUuid(#[from] uuid::Error), + #[error(transparent)] + DatabaseError(sqlx::Error), + #[error(transparent)] + TokioTaskError(#[from] tokio::task::JoinError), + #[error(transparent)] + Unknown(#[from] anyhow::Error), +} + +impl NidxError { + pub fn invalid(message: &str) -> Self { + Self::InvalidRequest(message.to_string()) + } +} + +impl From for NidxError { + fn from(value: sqlx::Error) -> Self { + match value { + sqlx::Error::RowNotFound => Self::NotFound, + e => Self::DatabaseError(e), + } + } +} + +impl From for tonic::Status { + fn from(value: NidxError) -> Self { + match value { + NidxError::NotFound => tonic::Status::not_found("Not found"), + NidxError::InvalidRequest(_) => tonic::Status::invalid_argument(value.to_string()), + NidxError::InvalidUuid(_) => tonic::Status::invalid_argument(value.to_string()), + _ => tonic::Status::internal(value.to_string()), + } + } +} diff --git a/nidx/src/grpc_server.rs b/nidx/src/grpc_server.rs index b023dc7e7a..31cd284a97 100644 --- a/nidx/src/grpc_server.rs +++ b/nidx/src/grpc_server.rs @@ -18,8 +18,18 @@ // along with this program. If not, see . // +use http::Uri; use tokio::net::{TcpListener, ToSocketAddrs}; -use tonic::transport::server::{Router, TcpIncoming}; +use tonic::{ + service::Routes, + transport::{server::TcpIncoming, Server}, +}; +use tower::util::MapRequestLayer; + +pub struct RemappedGrpcService { + pub routes: Routes, + pub package: String, +} /// A tonic server that allows binding to and returning a random port. pub struct GrpcServer(TcpListener); @@ -33,7 +43,32 @@ impl GrpcServer { Ok(self.0.local_addr()?.port()) } - pub async fn serve(self, server: Router) -> anyhow::Result<()> { + pub async fn serve(self, service: RemappedGrpcService) -> anyhow::Result<()> { + let server = Server::builder() + .layer(MapRequestLayer::new(move |req| map_grpc_path_to(&service.package, req))) + .add_routes(service.routes); Ok(server.serve_with_incoming(TcpIncoming::from_listener(self.0, true, None).unwrap()).await?) } } + +// TODO: Remove once we don't need backwards API compatibility +/// Sets the request path for Grpc services. This is useful to be able to serve the same +/// service with different names. e.g: We expose the same API for NodeWriter and NidxApi +fn map_grpc_path_to(to: &str, mut req: http::Request) -> http::Request { + let mut parts = req.uri().clone().into_parts(); + let mut new_path = None; + // Finds the first part of the URI, which in grpc, it's the service + if let Some(path_and_query) = parts.path_and_query { + let path = path_and_query.path(); + let mut parts = path[1..].split('/'); + if let Some(service) = parts.next() { + new_path = Some(path.replace(service, to)); + } + } + if let Some(new_path) = new_path { + parts.path_and_query = Some(new_path.try_into().unwrap()); + *req.uri_mut() = Uri::from_parts(parts).unwrap(); + } + + req +} diff --git a/nidx/src/lib.rs b/nidx/src/lib.rs index 590e0a233d..c9392ab8f0 100644 --- a/nidx/src/lib.rs +++ b/nidx/src/lib.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . // pub mod api; +mod errors; pub mod grpc_server; pub mod indexer; pub mod maintenance; diff --git a/nidx/src/metadata/index.rs b/nidx/src/metadata/index.rs index e5f74f823c..ee066bc677 100644 --- a/nidx/src/metadata/index.rs +++ b/nidx/src/metadata/index.rs @@ -32,7 +32,7 @@ use uuid::Uuid; use super::segment::Segment; -#[derive(sqlx::Type, Copy, Clone, PartialEq, Debug)] +#[derive(sqlx::Type, Copy, Clone, PartialEq, Eq, Hash, Debug)] #[sqlx(type_name = "index_kind", rename_all = "lowercase")] pub enum IndexKind { Text, diff --git a/nidx/src/metadata/shard.rs b/nidx/src/metadata/shard.rs index 0aed5ddafe..3e1dc50a17 100644 --- a/nidx/src/metadata/shard.rs +++ b/nidx/src/metadata/shard.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + // Copyright (C) 2021 Bosutech XXI S.L. // // nucliadb is offered under the AGPL v3.0 and as commercial software. @@ -18,6 +20,7 @@ // along with this program. If not, see . // use super::index::*; +use futures::StreamExt; use sqlx::{types::time::PrimitiveDateTime, Executor, Postgres}; use uuid::Uuid; @@ -33,7 +36,7 @@ impl Shard { } pub async fn get(meta: impl Executor<'_, Database = Postgres>, id: Uuid) -> sqlx::Result { - sqlx::query_as!(Shard, "SELECT * FROM shards WHERE id = $1", id).fetch_one(meta).await + sqlx::query_as!(Shard, "SELECT * FROM shards WHERE id = $1 AND deleted_at IS NULL", id).fetch_one(meta).await } pub async fn mark_delete(&self, meta: impl Executor<'_, Database = Postgres>) -> sqlx::Result<()> { @@ -51,4 +54,26 @@ impl Shard { .fetch_all(meta) .await } + + pub async fn stats(&self, meta: impl Executor<'_, Database = Postgres>) -> sqlx::Result> { + let mut stats = HashMap::new(); + let mut results = sqlx::query!( + r#"SELECT kind as "kind: IndexKind", SUM(records)::bigint as "records!" FROM indexes + JOIN segments ON index_id = indexes.id + WHERE shard_id = $1 + GROUP BY kind"#, + self.id + ) + .fetch(meta); + while let Some(record) = results.next().await { + let record = record?; + stats.insert(record.kind, record.records); + } + + Ok(stats) + } + + pub async fn list_ids(meta: impl Executor<'_, Database = Postgres>) -> sqlx::Result> { + sqlx::query_scalar("SELECT id FROM shards WHERE deleted_at IS NULL").fetch_all(meta).await + } } diff --git a/nidx/src/searcher/grpc.rs b/nidx/src/searcher/grpc.rs index fddbffd027..7cbaa04f9b 100644 --- a/nidx/src/searcher/grpc.rs +++ b/nidx/src/searcher/grpc.rs @@ -21,15 +21,11 @@ use std::{pin::Pin, sync::Arc}; use futures::Stream; -use nidx_protos::node_reader_server::NodeReader; +use nidx_protos::nidx::nidx_searcher_server::{NidxSearcher, NidxSearcherServer}; use nidx_protos::*; -use node_reader_server::NodeReaderServer; -use tonic::{ - transport::{server::Router, Server}, - Request, Response, Result, Status, -}; +use tonic::{service::Routes, Request, Response, Result, Status}; -use crate::NidxMetadata; +use crate::{grpc_server::RemappedGrpcService, NidxMetadata}; use super::{index_cache::IndexCache, shard_search::search}; use tracing::*; @@ -47,17 +43,16 @@ impl SearchServer { } } - pub fn into_service(self) -> Router { - Server::builder().add_service(NodeReaderServer::new(self)) + pub fn into_service(self) -> RemappedGrpcService { + RemappedGrpcService { + routes: Routes::new(NidxSearcherServer::new(self)), + package: "nidx.NidxSearcher".to_string(), + } } } #[tonic::async_trait] -impl NodeReader for SearchServer { - async fn get_shard(&self, _request: Request) -> Result> { - todo!() - } - +impl NidxSearcher for SearchServer { async fn document_search( &self, _request: Request, @@ -119,24 +114,13 @@ impl NodeReader for SearchServer { } type ParagraphsStream = Pin> + Send>>; + async fn paragraphs(&self, _request: Request) -> Result> { todo!() } - type DocumentsStream = Pin> + Send>>; - async fn documents(&self, _request: Request) -> Result> { - todo!() - } - async fn get_shard_files(&self, _request: Request) -> Result> { - todo!() - } - - type DownloadShardFileStream = Pin> + Send>>; - async fn download_shard_file( - &self, - _request: Request, - ) -> Result> { + async fn documents(&self, _request: Request) -> Result> { todo!() } } diff --git a/nidx/src/searcher/shard_search.rs b/nidx/src/searcher/shard_search.rs index 6f66ee4037..7948686348 100644 --- a/nidx/src/searcher/shard_search.rs +++ b/nidx/src/searcher/shard_search.rs @@ -27,6 +27,7 @@ use nidx_text::TextSearcher; use nidx_vector::VectorSearcher; use crate::{ + errors::NidxResult, metadata::{Index, IndexKind}, NidxMetadata, }; @@ -37,7 +38,7 @@ pub async fn search( meta: &NidxMetadata, index_cache: Arc, search_request: SearchRequest, -) -> anyhow::Result { +) -> NidxResult { let shard_id = uuid::Uuid::parse_str(&search_request.shard)?; // TODO: Avoid querying here, the information can be take from synced metadata @@ -53,7 +54,7 @@ pub async fn search( let vector_index = Index::find(&meta.pool, shard_id, IndexKind::Vector, &search_request.vectorset).await?; let vector_seacher_arc = index_cache.get(&vector_index.id).await?; - tokio::task::spawn_blocking(move || { + let search_results = tokio::task::spawn_blocking(move || { blocking_search( search_request, paragraph_searcher_arc.as_ref().into(), @@ -62,7 +63,8 @@ pub async fn search( vector_seacher_arc.as_ref().into(), ) }) - .await? + .await??; + Ok(search_results) } fn blocking_search( diff --git a/nidx/tests/common/services.rs b/nidx/tests/common/services.rs index 27ea820c74..24ca29102a 100644 --- a/nidx/tests/common/services.rs +++ b/nidx/tests/common/services.rs @@ -24,15 +24,15 @@ use nidx::searcher::grpc::SearchServer; use nidx::searcher::SyncedSearcher; use nidx::settings::{EnvSettings, MetadataSettings, ObjectStoreConfig, StorageSettings}; use nidx::{NidxMetadata, Settings}; -use nidx_protos::node_reader_client::NodeReaderClient; -use nidx_protos::node_writer_client::NodeWriterClient; +use nidx_protos::nidx::nidx_api_client::NidxApiClient; +use nidx_protos::nidx::nidx_searcher_client::NidxSearcherClient; use sqlx::PgPool; use tempfile::tempdir; use tonic::transport::Channel; pub struct NidxFixture { - pub searcher_client: NodeReaderClient, - pub api_client: NodeWriterClient, + pub searcher_client: NidxSearcherClient, + pub api_client: NidxApiClient, } impl NidxFixture { @@ -69,8 +69,8 @@ impl NidxFixture { tokio::task::spawn(async move { searcher.run(settings.storage.as_ref().unwrap().object_store.clone()).await }); // Clients - let searcher_client = NodeReaderClient::connect(format!("http://localhost:{searcher_port}")).await?; - let api_client = NodeWriterClient::connect(format!("http://localhost:{api_port}")).await?; + let searcher_client = NidxSearcherClient::connect(format!("http://localhost:{searcher_port}")).await?; + let api_client = NidxApiClient::connect(format!("http://localhost:{api_port}")).await?; Ok(NidxFixture { searcher_client, diff --git a/nidx/tests/shards_api.rs b/nidx/tests/shards_api.rs index 74f463f517..d4c6cd08f7 100644 --- a/nidx/tests/shards_api.rs +++ b/nidx/tests/shards_api.rs @@ -30,7 +30,7 @@ use uuid::Uuid; use nidx::api::shards; use nidx::indexer::index_resource; use nidx::maintenance::scheduler::{purge_deleted_shards_and_indexes, purge_deletions, purge_segments}; -use nidx::metadata::IndexId; +use nidx::metadata::{Index, IndexId, Segment}; use nidx::{metadata::Shard, NidxMetadata}; use nidx_tests::*; use nidx_vector::config::VectorConfig; @@ -76,16 +76,15 @@ async fn test_shards_create_and_delete(pool: sqlx::PgPool) -> anyhow::Result<()> // Mark shard and indexes to delete shards::delete_shard(&meta, shard.id).await?; - let shard = Shard::get(&meta.pool, shard.id).await?; - assert!(shard.deleted_at.is_some()); - for index in shard.indexes(&meta.pool).await? { - assert!(index.deleted_at.is_some()); - for segment in index.segments(&meta.pool).await? { + let deleted = Shard::get(&meta.pool, shard.id).await; + assert!(matches!(deleted, Err(sqlx::Error::RowNotFound))); + for index_id in Index::marked_to_delete(&meta.pool).await? { + for segment in Segment::in_index(&meta.pool, index_id).await? { assert!(segment.delete_at.is_some()); } // Update segment deletion time to validate purge - sqlx::query!("UPDATE segments SET delete_at = NOW() WHERE index_id = $1", index.id as IndexId,) + sqlx::query!("UPDATE segments SET delete_at = NOW() WHERE index_id = $1", index_id as IndexId,) .execute(&meta.pool) .await?; } diff --git a/nidx/tests/test_shards.rs b/nidx/tests/test_shards.rs index 3b1c3c26e4..f22ddbc792 100644 --- a/nidx/tests/test_shards.rs +++ b/nidx/tests/test_shards.rs @@ -23,164 +23,146 @@ mod common; use std::collections::HashMap; use common::services::NidxFixture; -use nidx_protos::{NewShardRequest, VectorIndexConfig}; +use nidx_protos::{ + nidx::nidx_api_client::NidxApiClient, EmptyQuery, GetShardRequest, NewShardRequest, ShardId, VectorIndexConfig, +}; use sqlx::PgPool; -use tonic::Request; +use tonic::{transport::Channel, Code, Request}; +use uuid::Uuid; #[sqlx::test] -async fn test_create_shard(pool: PgPool) -> Result<(), Box> { +async fn test_create_shard(pool: PgPool) -> anyhow::Result<()> { let mut fixture = NidxFixture::new(pool).await?; let new_shard_response = fixture .api_client .new_shard(Request::new(NewShardRequest { - kbid: "aabbccddeeff11223344556677889900".to_string(), + kbid: "aabbccdd-eeff-1122-3344-556677889900".to_string(), vectorsets_configs: HashMap::from([("english".to_string(), VectorIndexConfig::default())]), ..Default::default() })) .await?; - let _shard_id = &new_shard_response.get_ref().id; + let shard_id = &new_shard_response.get_ref().id; - // let response = fixture - // .searcher_client - // .get_shard(Request::new(GetShardRequest { - // shard_id: Some(ShardId { - // id: shard_id.to_owned(), - // }), - // ..Default::default() - // })) - // .await?; + let response = fixture + .api_client + .get_shard(Request::new(GetShardRequest { + shard_id: Some(ShardId { + id: shard_id.to_owned(), + }), + ..Default::default() + })) + .await?; + + let response = response.into_inner(); + assert_eq!(&response.shard_id, shard_id); + assert_eq!(&response.metadata.unwrap().kbid, "aabbccdd-eeff-1122-3344-556677889900"); + + // get_shard error handling + let response = fixture + .api_client + .get_shard(Request::new(GetShardRequest { + shard_id: Some(ShardId { + id: Uuid::new_v4().to_string(), + }), + ..Default::default() + })) + .await; + let err = response.expect_err("Should have failed"); + assert_eq!(err.code(), Code::NotFound); - // assert_eq!(shard_id, &response.get_ref().shard_id); + let response = fixture + .api_client + .get_shard(Request::new(GetShardRequest { + shard_id: None, + ..Default::default() + })) + .await; + let err = response.expect_err("Should have failed"); + assert_eq!(err.code(), Code::InvalidArgument); Ok(()) } -// #[rstest] -// #[tokio::test] -// async fn test_shard_metadata() -> Result<(), Box> { -// let mut fixture = NodeFixture::new(); -// fixture.with_writer().await?.with_reader().await?; -// let mut writer = fixture.writer_client(); -// let mut reader = fixture.reader_client(); - -// async fn create_shard_with_metadata( -// writer: &mut TestNodeWriter, -// kbid: String, -// ) -> Result> { -// let shard = writer -// .new_shard(Request::new(NewShardRequest { -// kbid, -// ..Default::default() -// })) -// .await? -// .into_inner(); -// Ok(shard.id) -// } - -// async fn validate_shard_metadata( -// reader: &mut TestNodeReader, -// shard_id: String, -// kbid: String, -// ) -> Result<(), Box> { -// let shard = reader -// .get_shard(Request::new(GetShardRequest { -// shard_id: Some(ShardId { -// id: shard_id, -// }), -// ..Default::default() -// })) -// .await? -// .into_inner(); - -// assert!(shard.metadata.is_some()); - -// let shard_metadata = shard.metadata.unwrap(); -// assert_eq!(shard_metadata.kbid, kbid); - -// Ok(()) -// } - -// const KB0: &str = "KB0"; -// const KB1: &str = "KB1"; -// const KB2: &str = "KB2"; - -// // Used to validate correct creation -// let shard_0 = create_shard_with_metadata(&mut writer, KB0.to_string()).await?; -// // Used to check 1 is not overwritting 0 -// let shard_1 = create_shard_with_metadata(&mut writer, KB1.to_string()).await?; -// // Used to validate correct creation when there are more shards -// let shard_2 = create_shard_with_metadata(&mut writer, KB2.to_string()).await?; - -// validate_shard_metadata(&mut reader, shard_0, KB0.to_string()).await?; -// validate_shard_metadata(&mut reader, shard_1, KB1.to_string()).await?; -// validate_shard_metadata(&mut reader, shard_2, KB2.to_string()).await?; - -// Ok(()) -// } - -// #[rstest] -// #[tokio::test] -// async fn test_list_shards() -> Result<(), Box> { -// let mut fixture = NodeFixture::new(); -// fixture.with_writer().await?.with_reader().await?; -// let mut writer = fixture.writer_client(); - -// let current = -// writer.list_shards(Request::new(EmptyQuery {})).await?.get_ref().ids.iter().map(|s| s.id.clone()).len(); - -// let request_ids = create_shards(&mut writer, 5).await; - -// let response = writer.list_shards(Request::new(EmptyQuery {})).await.expect("Error in list_shards request"); - -// let response_ids: Vec = response.get_ref().ids.iter().map(|s| s.id.clone()).collect(); - -// assert!(!request_ids.is_empty()); -// assert_eq!(request_ids.len() + current, response_ids.len()); -// assert!(request_ids.iter().all(|item| { response_ids.contains(item) })); - -// Ok(()) -// } - -// #[rstest] -// #[tokio::test] -// async fn test_delete_shards() -> anyhow::Result<()> { -// let mut fixture = NodeFixture::new(); -// fixture.with_writer().await?.with_reader().await?; -// let mut writer = fixture.writer_client(); - -// let current = -// writer.list_shards(Request::new(EmptyQuery {})).await?.get_ref().ids.iter().map(|s| s.id.clone()).len(); - -// let request_ids = create_shards(&mut writer, 5).await; - -// for (id, expected) in request_ids.iter().map(|v| (v.clone(), v.clone())) { -// let response = writer -// .delete_shard(Request::new(ShardId { -// id, -// })) -// .await -// .expect("Error in delete_shard request"); -// let deleted_id = response.get_ref().id.clone(); -// assert_eq!(deleted_id, expected); -// } - -// let response = writer.list_shards(Request::new(EmptyQuery {})).await.expect("Error in list_shards request"); - -// assert_eq!(response.get_ref().ids.len(), current); - -// Ok(()) -// } - -// async fn create_shards(writer: &mut TestNodeWriter, n: usize) -> Vec { -// let mut shard_ids = Vec::with_capacity(n); - -// for _ in 0..n { -// let response = -// writer.new_shard(Request::new(NewShardRequest::default())).await.expect("Error in new_shard request"); - -// shard_ids.push(response.get_ref().id.clone()); -// } +#[sqlx::test] +async fn test_list_shards(pool: PgPool) -> anyhow::Result<()> { + let mut fixture = NidxFixture::new(pool).await?; + + let current = fixture + .api_client + .list_shards(Request::new(EmptyQuery {})) + .await? + .get_ref() + .ids + .iter() + .map(|s| s.id.clone()) + .len(); + + let request_ids = create_shards(&mut fixture.api_client, 5).await; + + let response = + fixture.api_client.list_shards(Request::new(EmptyQuery {})).await.expect("Error in list_shards request"); -// shard_ids -// } + let response_ids: Vec = response.get_ref().ids.iter().map(|s| s.id.clone()).collect(); + + assert!(!request_ids.is_empty()); + assert_eq!(request_ids.len() + current, response_ids.len()); + assert!(request_ids.iter().all(|item| { response_ids.contains(item) })); + + Ok(()) +} + +#[sqlx::test] +async fn test_delete_shards(pool: PgPool) -> anyhow::Result<()> { + let mut fixture = NidxFixture::new(pool).await?; + + let current = fixture + .api_client + .list_shards(Request::new(EmptyQuery {})) + .await? + .get_ref() + .ids + .iter() + .map(|s| s.id.clone()) + .len(); + + let request_ids = create_shards(&mut fixture.api_client, 5).await; + + for (id, expected) in request_ids.iter().map(|v| (v.clone(), v.clone())) { + let response = fixture + .api_client + .delete_shard(Request::new(ShardId { + id, + })) + .await + .expect("Error in delete_shard request"); + let deleted_id = response.get_ref().id.clone(); + assert_eq!(deleted_id, expected); + } + + let response = + fixture.api_client.list_shards(Request::new(EmptyQuery {})).await.expect("Error in list_shards request"); + + assert_eq!(response.get_ref().ids.len(), current); + + Ok(()) +} + +async fn create_shards(writer: &mut NidxApiClient, n: usize) -> Vec { + let mut shard_ids = Vec::with_capacity(n); + + for _ in 0..n { + let response = writer + .new_shard(Request::new(NewShardRequest { + kbid: "aabbccdd-eeff-1122-3344-556677889900".to_string(), + vectorsets_configs: HashMap::from([("english".to_string(), VectorIndexConfig::default())]), + ..Default::default() + })) + .await + .expect("Error in new_shard request"); + + shard_ids.push(response.get_ref().id.clone()); + } + + shard_ids +}