Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nidx binding #2620

Merged
merged 9 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ jobs:
python-version: "3.9.20"
cache: true
- name: Install venv
run: pdm sync -d --clean --no-editable
run: pdm sync -d --clean --no-editable --without nidx
- name: Clean build
run: find . -name build | xargs rm -r || true
- name: Lint source code
Expand Down Expand Up @@ -209,7 +209,7 @@ jobs:
python-version: "3.12.5"
cache: true
- name: Install venv
run: pdm sync -d --clean --no-editable
run: pdm sync -d --clean --no-editable --without nidx
- name: Upload virtualenv to cache
uses: actions/cache/save@v4
with:
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

110 changes: 106 additions & 4 deletions nidx/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion nidx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021"

[workspace]
members = ["nidx_vector", "nidx_types", "nidx_protos", "nidx_tests", "nidx_text", "nidx_tantivy", "nidx_paragraph", "nidx_relation"]
members = ["nidx_vector", "nidx_types", "nidx_protos", "nidx_tests", "nidx_text", "nidx_tantivy", "nidx_paragraph", "nidx_relation", "nidx_binding"]

[dependencies]
anyhow = "1.0.89"
Expand Down
16 changes: 16 additions & 0 deletions nidx/nidx_binding/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "nidx_binding"
version = "0.1.0"
edition = "2021"

[lib]
name = "nidx_binding"
crate-type = ["cdylib"]

[dependencies]
anyhow = "1.0.93"
nidx = { version = "0.1.0", path = ".." }
nidx_protos = { version = "0.1.0", path = "../nidx_protos" }
pyo3 = "0.22.0"
tokio = "1.41.1"
tonic = "0.12.3"
15 changes: 15 additions & 0 deletions nidx/nidx_binding/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[build-system]
requires = ["maturin>=1.7,<2.0"]
build-backend = "maturin"

[project]
name = "nidx_binding"
requires-python = ">=3.8"
classifiers = [
"Programming Language :: Rust",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
dynamic = ["version"]
[tool.maturin]
features = ["pyo3/extension-module"]
130 changes: 130 additions & 0 deletions nidx/nidx_binding/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use nidx::maintenance::scheduler::{self, GetAckFloor};
use nidx::maintenance::worker;
use pyo3::exceptions::PyException;
use pyo3::prelude::*;

use nidx::api::grpc::ApiServer;
use nidx::grpc_server::GrpcServer;
use nidx::indexer::{download_message, index_resource};
use nidx::searcher::grpc::SearchServer;
use nidx::searcher::SyncedSearcher;
use nidx::settings::EnvSettings;
use nidx::Settings;
use nidx_protos::prost::*;
use nidx_protos::IndexMessage;
use std::collections::HashMap;
use std::path::Path;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use tokio::runtime::Runtime;

#[derive(Clone)]
struct SeqSource(Arc<AtomicI64>);

impl GetAckFloor for SeqSource {
async fn get(&mut self) -> anyhow::Result<i64> {
let seq = self.0.load(std::sync::atomic::Ordering::Relaxed);
Ok(seq)
}
}

#[pyclass]
pub struct NidxBinding {
#[pyo3(get)]
searcher_port: u16,
#[pyo3(get)]
api_port: u16,
settings: Settings,
seq: SeqSource,
runtime: Option<Runtime>,
}

#[pymethods]
impl NidxBinding {
#[new]
pub fn start(mut settings: HashMap<String, String>) -> NidxBinding {
settings.insert("INDEXER__NATS_SERVER".to_string(), "".to_string());

let rt = tokio::runtime::Runtime::new().unwrap();
let mut binding = rt.block_on(NidxBinding::new(settings)).unwrap();
binding.runtime = Some(rt);
binding
}

pub fn index(&mut self, bytes: Vec<u8>) -> PyResult<i64> {
// TODO: Can this be simplified into the indexer code?
let msg = IndexMessage::decode(&bytes[..]).unwrap();

let seq = self.seq.0.load(std::sync::atomic::Ordering::Relaxed);
let object_store = self.settings.indexer.as_ref().unwrap().object_store.clone();
let result = self.runtime.as_ref().unwrap().block_on(async {
let resource = download_message(object_store, &msg.storage_key).await?;
index_resource(
&self.settings.metadata,
self.settings.storage.as_ref().unwrap().object_store.clone(),
&msg.shard,
resource,
seq.into(),
)
.await
});

// Always increment seq, even on failure
self.seq.0.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

result.map_err(|e| PyException::new_err(format!("Error indexing {e}")))?;

Ok(seq)
}
}

impl NidxBinding {
pub async fn new(binding_settings: HashMap<String, String>) -> anyhow::Result<Self> {
let settings = Settings::from_env_settings(EnvSettings::from_map(binding_settings)).await?;

// API server
let api_service = ApiServer::new(settings.metadata.clone()).into_service();
let api_server = GrpcServer::new("localhost:0").await?;
let api_port = api_server.port()?;
tokio::task::spawn(api_server.serve(api_service));

// Searcher API
let searcher = SyncedSearcher::new(settings.metadata.clone(), Path::new("/tmp/searcher"));
let searcher_api = SearchServer::new(settings.metadata.clone(), searcher.index_cache());
let searcher_server = GrpcServer::new("localhost:0").await?;
let searcher_port = searcher_server.port()?;
tokio::task::spawn(searcher_server.serve(searcher_api.into_service()));
let settings_copy = settings.clone();
tokio::task::spawn(
async move { searcher.run(settings_copy.storage.as_ref().unwrap().object_store.clone()).await },
);

// Scheduler
let seq = SeqSource(Arc::new(settings.metadata.max_seq().await?.into()));
tokio::task::spawn(scheduler::run_tasks(
settings.metadata.clone(),
settings.storage.as_ref().unwrap().object_store.clone(),
settings.merge.clone(),
seq.clone(),
));

// Worker
let settings_copy = settings.clone();
tokio::task::spawn(worker::run(settings_copy));

Ok(NidxBinding {
searcher_port,
api_port,
settings,
seq,
runtime: None,
})
}
}

/// A Python module implemented in Rust.
#[pymodule]
fn nidx_binding(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<NidxBinding>()?;
Ok(())
}
Loading
Loading