From f98a562995bb2334eadb3b94929a8cc614264825 Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Tue, 22 Oct 2024 11:54:55 +0200 Subject: [PATCH] nidx dev deployment (#2563) --- .github/workflows/deploy.yml | 8 ++ Dockerfile.nidx | 1 + ...ndexer.deploy.yaml => indexer-deploy.yaml} | 0 charts/nidx/templates/scheduler-deploy.yaml | 69 ++++++++++++++++ charts/nidx/templates/worker-deploy.yaml | 63 +++++++++++++++ charts/nidx/values.yaml | 2 + nidx/src/indexer.rs | 33 +++++--- .../src/nucliadb/common/cluster/manager.py | 74 +++++++++-------- nucliadb/src/nucliadb/common/nidx.py | 79 +++++++++++++++++++ nucliadb/src/nucliadb/ingest/app.py | 4 + nucliadb_utils/src/nucliadb_utils/settings.py | 5 ++ .../src/nucliadb_utils/storages/local.py | 3 +- .../src/nucliadb_utils/utilities.py | 2 + 13 files changed, 301 insertions(+), 42 deletions(-) rename charts/nidx/templates/{indexer.deploy.yaml => indexer-deploy.yaml} (100%) create mode 100644 charts/nidx/templates/scheduler-deploy.yaml create mode 100644 charts/nidx/templates/worker-deploy.yaml create mode 100644 nucliadb/src/nucliadb/common/nidx.py diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 6a016c60c4..5ce54a2d30 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -230,6 +230,7 @@ jobs: upload-chart-nidx: name: Upload nidx chart runs-on: ubuntu-latest + if: inputs.environment == 'dev' needs: - build-nidx-image steps: @@ -281,10 +282,12 @@ jobs: deploy-nucliadb-components: name: Deploy NucliaDB components runs-on: ubuntu-latest + if: ${{ !failure() && !cancelled() }} needs: - upload-chart-nucliadb-shared - upload-charts-nucliadb-component - upload-chart-nucliadb-node + - upload-chart-nidx steps: - name: Generate a token id: app-token @@ -305,6 +308,11 @@ jobs: { "commit-sha": "${{ github.sha }}", "components": [ + ${{ needs.upload-chart-nidx.result == 'skipped' && ' ' || format('{{ + "chart-version": "{0}", + "component": "nidx", + "component-type": "regional" + }},', needs.upload-charts-nucliadb-component.outputs.version_number) }} { "chart-version": "${{ needs.upload-charts-nucliadb-component.outputs.version_number }}", "component": "nucliadb_shared", diff --git a/Dockerfile.nidx b/Dockerfile.nidx index 17dd7a4894..93c5342be6 100644 --- a/Dockerfile.nidx +++ b/Dockerfile.nidx @@ -10,4 +10,5 @@ COPY nidx /app/nidx RUN cd /app/nidx && cargo build --locked --release FROM debian:bookworm-slim AS nidx +RUN apt update && apt install -y ca-certificates && rm -rf /var/lib/apt/lists/* COPY --from=builder /app/nidx/target/release/nidx /usr/local/bin diff --git a/charts/nidx/templates/indexer.deploy.yaml b/charts/nidx/templates/indexer-deploy.yaml similarity index 100% rename from charts/nidx/templates/indexer.deploy.yaml rename to charts/nidx/templates/indexer-deploy.yaml diff --git a/charts/nidx/templates/scheduler-deploy.yaml b/charts/nidx/templates/scheduler-deploy.yaml new file mode 100644 index 0000000000..30eb86d6d6 --- /dev/null +++ b/charts/nidx/templates/scheduler-deploy.yaml @@ -0,0 +1,69 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: nidx-scheduler + labels: + app: nidx-scheduler + version: "{{ .Chart.Version | replace "+" "_" }}" + chart: "{{ .Chart.Name }}" + release: "{{ .Release.Name }}" + heritage: "{{ .Release.Service }}" +spec: + # Only 1 scheduler max running at a time, even during deployment (stop before starting). + replicas: 1 + strategy: + type: RollingUpdate + rollingUpdate: + maxUnavailable: 1 + maxSurge: 0 + selector: + matchLabels: + app: nidx-scheduler + release: "{{ .Release.Name }}" + heritage: "{{ .Release.Service }}" + template: + metadata: + name: nidx-scheduler + annotations: + {{- with .Values.excludeOutboundPorts }} + traffic.sidecar.istio.io/excludeOutboundPorts: "{{ . }}" + {{- end }} + {{- with .Values.excludeInboundPorts }} + traffic.sidecar.istio.io/excludeInboundPorts: "{{ . }}" + {{- end }} + labels: + app: nidx-scheduler + version: "{{ .Chart.Version | replace "+" "_" }}" + chart: "{{ .Chart.Name }}" + release: "{{ .Release.Name }}" + heritage: "{{ .Release.Service }}" + spec: + {{- with .Values.scheduler.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.scheduler.topologySpreadConstraints }} + topologySpreadConstraints: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.scheduler.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.scheduler.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} + dnsPolicy: ClusterFirst + serviceAccount: {{ .Values.scheduler.serviceAccount | default "default" }} + containers: + - name: nidx-scheduler + image: "{{ .Values.containerRegistry }}/{{ .Values.image }}" + command: ["nidx", "scheduler"] + env: + {{- include "toEnv" .Values.env | nindent 10 }} + ports: + {{- with .Values.scheduler.resources }} + resources: + {{- toYaml . | nindent 10 }} + {{- end }} diff --git a/charts/nidx/templates/worker-deploy.yaml b/charts/nidx/templates/worker-deploy.yaml new file mode 100644 index 0000000000..2ac71292a0 --- /dev/null +++ b/charts/nidx/templates/worker-deploy.yaml @@ -0,0 +1,63 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: nidx-worker + labels: + app: nidx-worker + version: "{{ .Chart.Version | replace "+" "_" }}" + chart: "{{ .Chart.Name }}" + release: "{{ .Release.Name }}" + heritage: "{{ .Release.Service }}" +spec: + replicas: 1 + selector: + matchLabels: + app: nidx-worker + release: "{{ .Release.Name }}" + heritage: "{{ .Release.Service }}" + template: + metadata: + name: nidx-worker + annotations: + {{- with .Values.excludeOutboundPorts }} + traffic.sidecar.istio.io/excludeOutboundPorts: "{{ . }}" + {{- end }} + {{- with .Values.excludeInboundPorts }} + traffic.sidecar.istio.io/excludeInboundPorts: "{{ . }}" + {{- end }} + labels: + app: nidx-worker + version: "{{ .Chart.Version | replace "+" "_" }}" + chart: "{{ .Chart.Name }}" + release: "{{ .Release.Name }}" + heritage: "{{ .Release.Service }}" + spec: + {{- with .Values.worker.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.worker.topologySpreadConstraints }} + topologySpreadConstraints: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.worker.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.worker.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} + dnsPolicy: ClusterFirst + serviceAccount: {{ .Values.worker.serviceAccount | default "default" }} + containers: + - name: nidx-worker + image: "{{ .Values.containerRegistry }}/{{ .Values.image }}" + command: ["nidx", "worker"] + env: + {{- include "toEnv" .Values.env | nindent 10 }} + ports: + {{- with .Values.worker.resources }} + resources: + {{- toYaml . | nindent 10 }} + {{- end }} diff --git a/charts/nidx/values.yaml b/charts/nidx/values.yaml index 6392633e93..3b118346cc 100644 --- a/charts/nidx/values.yaml +++ b/charts/nidx/values.yaml @@ -9,3 +9,5 @@ image: IMAGE_TO_REPLACE env: {} indexer: {} +scheduler: {} +worker: {} diff --git a/nidx/src/indexer.rs b/nidx/src/indexer.rs index 5c7e6cf89e..5a45bbf45e 100644 --- a/nidx/src/indexer.rs +++ b/nidx/src/indexer.rs @@ -21,7 +21,6 @@ use crate::upload::pack_and_upload; use crate::{metadata::*, Settings}; use anyhow::anyhow; use async_nats::jetstream::consumer::PullConsumer; -use async_nats::Message; use futures::stream::StreamExt; use nidx_protos::prost::*; use nidx_protos::IndexMessage; @@ -63,10 +62,15 @@ pub async fn run() -> anyhow::Result<()> { let _ = msg.ack().await; continue; } - info!(?seq, "Processing indexing message"); + let (msg, acker) = msg.split(); - let resource = match download_message(indexer_storage.clone(), msg).await { + let Ok(index_message) = IndexMessage::decode(msg.payload) else { + warn!("Error decoding index message"); + continue; + }; + + let resource = match download_message(indexer_storage.clone(), &index_message.storage_key).await { Ok(r) => r, Err(e) => { warn!("Error downloading index message {e:?}"); @@ -74,7 +78,7 @@ pub async fn run() -> anyhow::Result<()> { } }; - match index_resource(&meta, segment_storage.clone(), &resource, seq).await { + match index_resource(&meta, segment_storage.clone(), &index_message.shard, &resource, seq).await { Ok(()) => { if let Err(e) = acker.ack().await { warn!("Error ack'ing NATS message {e:?}") @@ -89,10 +93,8 @@ pub async fn run() -> anyhow::Result<()> { Ok(()) } -pub async fn download_message(storage: Arc, msg: Message) -> anyhow::Result { - let index_message = IndexMessage::decode(msg.payload)?; - - let get_result = storage.get(&object_store::path::Path::from(index_message.storage_key)).await?; +pub async fn download_message(storage: Arc, storage_key: &str) -> anyhow::Result { + let get_result = storage.get(&object_store::path::Path::from(storage_key)).await?; let bytes = get_result.bytes().await?; let resource = Resource::decode(bytes)?; @@ -102,12 +104,15 @@ pub async fn download_message(storage: Arc, msg: Message) -> any async fn index_resource( meta: &NidxMetadata, storage: Arc, + shard_id: &str, resource: &Resource, seq: Seq, ) -> anyhow::Result<()> { - let shard_id = Uuid::parse_str(&resource.shard_id)?; + let shard_id = Uuid::parse_str(shard_id)?; let indexes = Index::for_shard(&meta.pool, shard_id).await?; + info!(?seq, ?shard_id, "Indexing message to shard"); + for index in indexes { let output_dir = tempfile::tempdir()?; let (records, deletions) = index_resource_to_index(&index, resource, output_dir.path()).await?; @@ -160,7 +165,15 @@ mod tests { let index = Index::create(&meta.pool, shard.id, IndexKind::Vector, Some("multilingual")).await.unwrap(); let storage = Arc::new(object_store::memory::InMemory::new()); - index_resource(&meta, storage.clone(), &little_prince(shard.id.to_string()), 123i64.into()).await.unwrap(); + index_resource( + &meta, + storage.clone(), + &shard.id.to_string(), + &little_prince(shard.id.to_string()), + 123i64.into(), + ) + .await + .unwrap(); let segments = index.segments(&meta.pool).await.unwrap(); assert_eq!(segments.len(), 1); diff --git a/nucliadb/src/nucliadb/common/cluster/manager.py b/nucliadb/src/nucliadb/common/cluster/manager.py index ac7abafebc..3a5980259f 100644 --- a/nucliadb/src/nucliadb/common/cluster/manager.py +++ b/nucliadb/src/nucliadb/common/cluster/manager.py @@ -36,6 +36,7 @@ ShardsNotFound, ) from nucliadb.common.maindb.driver import Transaction +from nucliadb.common.nidx import get_nidx from nucliadb_protos import ( knowledgebox_pb2, nodereader_pb2, @@ -322,6 +323,7 @@ async def delete_resource( ) -> None: indexing = get_indexing() storage = await get_storage() + nidx = get_nidx() await storage.delete_indexing(resource_uid=uuid, txid=txid, kb=kb, logical_shard=shard.shard) @@ -336,6 +338,17 @@ async def delete_resource( indexpb.kbid = kb await indexing.index(indexpb, node_id) + if nidx is not None: + nidxpb: nodewriter_pb2.IndexMessage = nodewriter_pb2.IndexMessage() + nidxpb.node = node_id + nidxpb.shard = shard.shard + nidxpb.txid = txid + nidxpb.resource = uuid + nidxpb.typemessage = nodewriter_pb2.TypeMessage.DELETION + nidxpb.partition = partition + nidxpb.kbid = kb + await nidx.index(nidxpb) + async def add_resource( self, shard: writer_pb2.ShardObject, @@ -356,7 +369,7 @@ async def add_resource( storage = await get_storage() indexing = get_indexing() - + nidx = get_nidx() indexpb = IndexMessage() if reindex_id is not None: @@ -383,6 +396,10 @@ async def add_resource( indexpb.shard = replica_id await indexing.index(indexpb, node_id) + if nidx is not None: + indexpb.shard = shard.shard + await nidx.index(indexpb) + def should_create_new_shard(self, num_paragraphs: int) -> bool: return num_paragraphs > settings.max_shard_paragraphs @@ -482,6 +499,17 @@ async def delete_resource( self._resource_change_event(kb, shardreplica.node, shardreplica.shard.id) ) + nidx = get_nidx() + if nidx is not None: + indexpb: nodewriter_pb2.IndexMessage = nodewriter_pb2.IndexMessage() + indexpb.shard = shard.shard + indexpb.txid = txid + indexpb.resource = uuid + indexpb.typemessage = nodewriter_pb2.TypeMessage.DELETION + indexpb.partition = partition + indexpb.kbid = kb + await nidx.index(indexpb) + @backoff.on_exception(backoff.expo, NodesUnsync, jitter=backoff.random_jitter, max_tries=5) async def add_resource( self, @@ -508,38 +536,22 @@ async def add_resource( self._resource_change_event(kb, shardreplica.node, shardreplica.shard.id) ) - # HACK: send to nidx - import os - - if os.environ.get("NIDX"): - from nucliadb_utils.nats import NatsConnectionManager - - storage = await get_storage() - storage.indexing_bucket = "fake" - indexpb = IndexMessage() - - storage_key = await storage.indexing( - resource, txid, partition, kb=kb, logical_shard=shard.shard - ) + nidx = get_nidx() + if nidx is not None: + storage = await get_storage() + indexpb = IndexMessage() + storage_key = await storage.indexing( + resource, txid, partition, kb=kb, logical_shard=shard.shard + ) - indexpb.typemessage = TypeMessage.CREATION - indexpb.storage_key = storage_key - indexpb.kbid = kb - if partition: - indexpb.partition = partition - indexpb.source = source - indexpb.resource = resource.resource.uuid - - nats = NatsConnectionManager( - service_name="nidx", - nats_servers=["nats://localhost:4222"], - ) - await nats.initialize() + indexpb.typemessage = TypeMessage.CREATION + indexpb.storage_key = storage_key + indexpb.kbid = kb + indexpb.source = source + indexpb.resource = resource.resource.uuid + indexpb.shard = shard.shard - for replica_id, node_id in self.indexing_replicas(shard): - indexpb.node = node_id - indexpb.shard = replica_id - await nats.js.publish("nidx", indexpb.SerializeToString()) + await nidx.index(indexpb) def get_all_shard_nodes( diff --git a/nucliadb/src/nucliadb/common/nidx.py b/nucliadb/src/nucliadb/common/nidx.py new file mode 100644 index 0000000000..c74518a0e8 --- /dev/null +++ b/nucliadb/src/nucliadb/common/nidx.py @@ -0,0 +1,79 @@ +# Copyright (C) 2021 Bosutech XXI S.L. +# +# nucliadb is offered under the AGPL v3.0 and as commercial software. +# For commercial licensing, contact us at info@nuclia.com. +# +# AGPL: +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# + +from typing import Optional + +from nucliadb_protos.nodewriter_pb2 import IndexMessage +from nucliadb_utils import logger +from nucliadb_utils.nats import NatsConnectionManager +from nucliadb_utils.settings import indexing_settings +from nucliadb_utils.utilities import Utility, clean_utility, get_utility, set_utility + + +class NidxIndexer: + def __init__( + self, + subject: str, + nats_servers: list[str], + nats_creds: Optional[str] = None, + ): + self.nats_connection_manager = NatsConnectionManager( + service_name="NidxIndexer", + nats_servers=nats_servers, + nats_creds=nats_creds, + ) + self.subject = subject + + async def initialize(self): + await self.nats_connection_manager.initialize() + + async def finalize(self): + await self.nats_connection_manager.finalize() + + async def index(self, writer: IndexMessage) -> int: + res = await self.nats_connection_manager.js.publish(self.subject, writer.SerializeToString()) + logger.info( + f" = Pushed message to nidx shard: {writer.shard}, txid: {writer.txid} seqid: {res.seq}" # noqa + ) + return res.seq + + +async def start_nidx_utility() -> NidxIndexer: + if indexing_settings.index_nidx_subject is None: + raise ValueError("nidx subject needed for nidx utility") + nidx_utility = NidxIndexer( + subject=indexing_settings.index_nidx_subject, + nats_creds=indexing_settings.index_jetstream_auth, + nats_servers=indexing_settings.index_jetstream_servers, + ) + await nidx_utility.initialize() + set_utility(Utility.NIDX, nidx_utility) + return nidx_utility + + +async def stop_nidx_utility(): + nidx_utility = get_nidx() + if nidx_utility: + clean_utility(Utility.NIDX) + await nidx_utility.finalize() + + +def get_nidx() -> NidxIndexer: + return get_utility(Utility.NIDX) diff --git a/nucliadb/src/nucliadb/ingest/app.py b/nucliadb/src/nucliadb/ingest/app.py index 1d97489a49..669ea48daa 100644 --- a/nucliadb/src/nucliadb/ingest/app.py +++ b/nucliadb/src/nucliadb/ingest/app.py @@ -29,6 +29,7 @@ from nucliadb.common.cluster.settings import settings as cluster_settings from nucliadb.common.cluster.utils import setup_cluster, teardown_cluster from nucliadb.common.context import ApplicationContext +from nucliadb.common.nidx import start_nidx_utility from nucliadb.export_import.tasks import get_exports_consumer, get_imports_consumer from nucliadb.ingest import SERVICE_NAME from nucliadb.ingest.consumer import service as consumer_service @@ -61,6 +62,9 @@ async def initialize() -> list[Callable[[], Awaitable[None]]]: if not cluster_settings.standalone_mode and indexing_settings.index_jetstream_servers is not None: await start_indexing_utility(SERVICE_NAME) + if indexing_settings.index_nidx_subject is not None: + await start_nidx_utility() + await start_audit_utility(SERVICE_NAME) finalizers = [ diff --git a/nucliadb_utils/src/nucliadb_utils/settings.py b/nucliadb_utils/src/nucliadb_utils/settings.py index 5e3c33d903..845726ca56 100644 --- a/nucliadb_utils/src/nucliadb_utils/settings.py +++ b/nucliadb_utils/src/nucliadb_utils/settings.py @@ -109,6 +109,10 @@ class StorageSettings(BaseSettings): default=None, description="If using LOCAL `file_backend` storage, directory where files should be stored", ) + local_indexing_bucket: Optional[str] = Field( + default=None, + description="If using LOCAL `file_backend` storage, subdirectory where indexing data is stored", + ) upload_token_expiration: int = Field( default=3, description="Number of days that uploaded files are kept in Nulia's processing engine", @@ -182,6 +186,7 @@ class IndexingSettings(BaseSettings): index_jetstream_servers: List[str] = [] index_jetstream_auth: Optional[str] = None index_local: bool = False + index_nidx_subject: Optional[str] = None indexing_settings = IndexingSettings() diff --git a/nucliadb_utils/src/nucliadb_utils/storages/local.py b/nucliadb_utils/src/nucliadb_utils/storages/local.py index 08621c1a72..3140b12d22 100644 --- a/nucliadb_utils/src/nucliadb_utils/storages/local.py +++ b/nucliadb_utils/src/nucliadb_utils/storages/local.py @@ -218,10 +218,11 @@ class LocalStorage(Storage): field_klass = LocalStorageField chunk_size = CHUNK_SIZE - def __init__(self, local_testing_files: str): + def __init__(self, local_testing_files: str, indexing_bucket: Optional[str] = None): self.local_testing_files = local_testing_files.rstrip("/") self.bucket_format = "ndb_{kbid}" self.source = CloudFile.LOCAL + self.indexing_bucket = indexing_bucket async def initialize(self): pass diff --git a/nucliadb_utils/src/nucliadb_utils/utilities.py b/nucliadb_utils/src/nucliadb_utils/utilities.py index 307a36d628..ad597e2a8d 100644 --- a/nucliadb_utils/src/nucliadb_utils/utilities.py +++ b/nucliadb_utils/src/nucliadb_utils/utilities.py @@ -84,6 +84,7 @@ class Utility(str, Enum): USAGE = "usage" ENDECRYPTOR = "endecryptor" PINECONE_SESSION = "pinecone_session" + NIDX = "nidx" def get_utility(ident: Union[Utility, str]): @@ -165,6 +166,7 @@ async def get_storage( localutil = LocalStorage( local_testing_files=storage_settings.local_files, + indexing_bucket=storage_settings.local_indexing_bucket, ) set_utility(Utility.STORAGE, localutil) await localutil.initialize()