Skip to content

Commit

Permalink
nidx dev deployment (#2563)
Browse files Browse the repository at this point in the history
  • Loading branch information
javitonino authored Oct 22, 2024
1 parent 5cac6a3 commit f98a562
Show file tree
Hide file tree
Showing 13 changed files with 301 additions and 42 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ jobs:
upload-chart-nidx:
name: Upload nidx chart
runs-on: ubuntu-latest
if: inputs.environment == 'dev'
needs:
- build-nidx-image
steps:
Expand Down Expand Up @@ -281,10 +282,12 @@ jobs:
deploy-nucliadb-components:
name: Deploy NucliaDB components
runs-on: ubuntu-latest
if: ${{ !failure() && !cancelled() }}
needs:
- upload-chart-nucliadb-shared
- upload-charts-nucliadb-component
- upload-chart-nucliadb-node
- upload-chart-nidx
steps:
- name: Generate a token
id: app-token
Expand All @@ -305,6 +308,11 @@ jobs:
{
"commit-sha": "${{ github.sha }}",
"components": [
${{ needs.upload-chart-nidx.result == 'skipped' && ' ' || format('{{
"chart-version": "{0}",
"component": "nidx",
"component-type": "regional"
}},', needs.upload-charts-nucliadb-component.outputs.version_number) }}
{
"chart-version": "${{ needs.upload-charts-nucliadb-component.outputs.version_number }}",
"component": "nucliadb_shared",
Expand Down
1 change: 1 addition & 0 deletions Dockerfile.nidx
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ COPY nidx /app/nidx
RUN cd /app/nidx && cargo build --locked --release

FROM debian:bookworm-slim AS nidx
RUN apt update && apt install -y ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/nidx/target/release/nidx /usr/local/bin
File renamed without changes.
69 changes: 69 additions & 0 deletions charts/nidx/templates/scheduler-deploy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: nidx-scheduler
labels:
app: nidx-scheduler
version: "{{ .Chart.Version | replace "+" "_" }}"
chart: "{{ .Chart.Name }}"
release: "{{ .Release.Name }}"
heritage: "{{ .Release.Service }}"
spec:
# Only 1 scheduler max running at a time, even during deployment (stop before starting).
replicas: 1
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1
maxSurge: 0
selector:
matchLabels:
app: nidx-scheduler
release: "{{ .Release.Name }}"
heritage: "{{ .Release.Service }}"
template:
metadata:
name: nidx-scheduler
annotations:
{{- with .Values.excludeOutboundPorts }}
traffic.sidecar.istio.io/excludeOutboundPorts: "{{ . }}"
{{- end }}
{{- with .Values.excludeInboundPorts }}
traffic.sidecar.istio.io/excludeInboundPorts: "{{ . }}"
{{- end }}
labels:
app: nidx-scheduler
version: "{{ .Chart.Version | replace "+" "_" }}"
chart: "{{ .Chart.Name }}"
release: "{{ .Release.Name }}"
heritage: "{{ .Release.Service }}"
spec:
{{- with .Values.scheduler.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.scheduler.topologySpreadConstraints }}
topologySpreadConstraints:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.scheduler.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.scheduler.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}
dnsPolicy: ClusterFirst
serviceAccount: {{ .Values.scheduler.serviceAccount | default "default" }}
containers:
- name: nidx-scheduler
image: "{{ .Values.containerRegistry }}/{{ .Values.image }}"
command: ["nidx", "scheduler"]
env:
{{- include "toEnv" .Values.env | nindent 10 }}
ports:
{{- with .Values.scheduler.resources }}
resources:
{{- toYaml . | nindent 10 }}
{{- end }}
63 changes: 63 additions & 0 deletions charts/nidx/templates/worker-deploy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: nidx-worker
labels:
app: nidx-worker
version: "{{ .Chart.Version | replace "+" "_" }}"
chart: "{{ .Chart.Name }}"
release: "{{ .Release.Name }}"
heritage: "{{ .Release.Service }}"
spec:
replicas: 1
selector:
matchLabels:
app: nidx-worker
release: "{{ .Release.Name }}"
heritage: "{{ .Release.Service }}"
template:
metadata:
name: nidx-worker
annotations:
{{- with .Values.excludeOutboundPorts }}
traffic.sidecar.istio.io/excludeOutboundPorts: "{{ . }}"
{{- end }}
{{- with .Values.excludeInboundPorts }}
traffic.sidecar.istio.io/excludeInboundPorts: "{{ . }}"
{{- end }}
labels:
app: nidx-worker
version: "{{ .Chart.Version | replace "+" "_" }}"
chart: "{{ .Chart.Name }}"
release: "{{ .Release.Name }}"
heritage: "{{ .Release.Service }}"
spec:
{{- with .Values.worker.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.worker.topologySpreadConstraints }}
topologySpreadConstraints:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.worker.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.worker.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}
dnsPolicy: ClusterFirst
serviceAccount: {{ .Values.worker.serviceAccount | default "default" }}
containers:
- name: nidx-worker
image: "{{ .Values.containerRegistry }}/{{ .Values.image }}"
command: ["nidx", "worker"]
env:
{{- include "toEnv" .Values.env | nindent 10 }}
ports:
{{- with .Values.worker.resources }}
resources:
{{- toYaml . | nindent 10 }}
{{- end }}
2 changes: 2 additions & 0 deletions charts/nidx/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ image: IMAGE_TO_REPLACE

env: {}
indexer: {}
scheduler: {}
worker: {}
33 changes: 23 additions & 10 deletions nidx/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::upload::pack_and_upload;
use crate::{metadata::*, Settings};
use anyhow::anyhow;
use async_nats::jetstream::consumer::PullConsumer;
use async_nats::Message;
use futures::stream::StreamExt;
use nidx_protos::prost::*;
use nidx_protos::IndexMessage;
Expand Down Expand Up @@ -63,18 +62,23 @@ pub async fn run() -> anyhow::Result<()> {
let _ = msg.ack().await;
continue;
}
info!(?seq, "Processing indexing message");

let (msg, acker) = msg.split();

let resource = match download_message(indexer_storage.clone(), msg).await {
let Ok(index_message) = IndexMessage::decode(msg.payload) else {
warn!("Error decoding index message");
continue;
};

let resource = match download_message(indexer_storage.clone(), &index_message.storage_key).await {
Ok(r) => r,
Err(e) => {
warn!("Error downloading index message {e:?}");
continue;
}
};

match index_resource(&meta, segment_storage.clone(), &resource, seq).await {
match index_resource(&meta, segment_storage.clone(), &index_message.shard, &resource, seq).await {
Ok(()) => {
if let Err(e) = acker.ack().await {
warn!("Error ack'ing NATS message {e:?}")
Expand All @@ -89,10 +93,8 @@ pub async fn run() -> anyhow::Result<()> {
Ok(())
}

pub async fn download_message(storage: Arc<DynObjectStore>, msg: Message) -> anyhow::Result<Resource> {
let index_message = IndexMessage::decode(msg.payload)?;

let get_result = storage.get(&object_store::path::Path::from(index_message.storage_key)).await?;
pub async fn download_message(storage: Arc<DynObjectStore>, storage_key: &str) -> anyhow::Result<Resource> {
let get_result = storage.get(&object_store::path::Path::from(storage_key)).await?;
let bytes = get_result.bytes().await?;
let resource = Resource::decode(bytes)?;

Expand All @@ -102,12 +104,15 @@ pub async fn download_message(storage: Arc<DynObjectStore>, msg: Message) -> any
async fn index_resource(
meta: &NidxMetadata,
storage: Arc<DynObjectStore>,
shard_id: &str,
resource: &Resource,
seq: Seq,
) -> anyhow::Result<()> {
let shard_id = Uuid::parse_str(&resource.shard_id)?;
let shard_id = Uuid::parse_str(shard_id)?;
let indexes = Index::for_shard(&meta.pool, shard_id).await?;

info!(?seq, ?shard_id, "Indexing message to shard");

for index in indexes {
let output_dir = tempfile::tempdir()?;
let (records, deletions) = index_resource_to_index(&index, resource, output_dir.path()).await?;
Expand Down Expand Up @@ -160,7 +165,15 @@ mod tests {
let index = Index::create(&meta.pool, shard.id, IndexKind::Vector, Some("multilingual")).await.unwrap();

let storage = Arc::new(object_store::memory::InMemory::new());
index_resource(&meta, storage.clone(), &little_prince(shard.id.to_string()), 123i64.into()).await.unwrap();
index_resource(
&meta,
storage.clone(),
&shard.id.to_string(),
&little_prince(shard.id.to_string()),
123i64.into(),
)
.await
.unwrap();

let segments = index.segments(&meta.pool).await.unwrap();
assert_eq!(segments.len(), 1);
Expand Down
74 changes: 43 additions & 31 deletions nucliadb/src/nucliadb/common/cluster/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
ShardsNotFound,
)
from nucliadb.common.maindb.driver import Transaction
from nucliadb.common.nidx import get_nidx
from nucliadb_protos import (
knowledgebox_pb2,
nodereader_pb2,
Expand Down Expand Up @@ -322,6 +323,7 @@ async def delete_resource(
) -> None:
indexing = get_indexing()
storage = await get_storage()
nidx = get_nidx()

await storage.delete_indexing(resource_uid=uuid, txid=txid, kb=kb, logical_shard=shard.shard)

Expand All @@ -336,6 +338,17 @@ async def delete_resource(
indexpb.kbid = kb
await indexing.index(indexpb, node_id)

if nidx is not None:
nidxpb: nodewriter_pb2.IndexMessage = nodewriter_pb2.IndexMessage()
nidxpb.node = node_id
nidxpb.shard = shard.shard
nidxpb.txid = txid
nidxpb.resource = uuid
nidxpb.typemessage = nodewriter_pb2.TypeMessage.DELETION
nidxpb.partition = partition
nidxpb.kbid = kb
await nidx.index(nidxpb)

async def add_resource(
self,
shard: writer_pb2.ShardObject,
Expand All @@ -356,7 +369,7 @@ async def add_resource(

storage = await get_storage()
indexing = get_indexing()

nidx = get_nidx()
indexpb = IndexMessage()

if reindex_id is not None:
Expand All @@ -383,6 +396,10 @@ async def add_resource(
indexpb.shard = replica_id
await indexing.index(indexpb, node_id)

if nidx is not None:
indexpb.shard = shard.shard
await nidx.index(indexpb)

def should_create_new_shard(self, num_paragraphs: int) -> bool:
return num_paragraphs > settings.max_shard_paragraphs

Expand Down Expand Up @@ -482,6 +499,17 @@ async def delete_resource(
self._resource_change_event(kb, shardreplica.node, shardreplica.shard.id)
)

nidx = get_nidx()
if nidx is not None:
indexpb: nodewriter_pb2.IndexMessage = nodewriter_pb2.IndexMessage()
indexpb.shard = shard.shard
indexpb.txid = txid
indexpb.resource = uuid
indexpb.typemessage = nodewriter_pb2.TypeMessage.DELETION
indexpb.partition = partition
indexpb.kbid = kb
await nidx.index(indexpb)

@backoff.on_exception(backoff.expo, NodesUnsync, jitter=backoff.random_jitter, max_tries=5)
async def add_resource(
self,
Expand All @@ -508,38 +536,22 @@ async def add_resource(
self._resource_change_event(kb, shardreplica.node, shardreplica.shard.id)
)

# HACK: send to nidx
import os

if os.environ.get("NIDX"):
from nucliadb_utils.nats import NatsConnectionManager

storage = await get_storage()
storage.indexing_bucket = "fake"
indexpb = IndexMessage()

storage_key = await storage.indexing(
resource, txid, partition, kb=kb, logical_shard=shard.shard
)
nidx = get_nidx()
if nidx is not None:
storage = await get_storage()
indexpb = IndexMessage()
storage_key = await storage.indexing(
resource, txid, partition, kb=kb, logical_shard=shard.shard
)

indexpb.typemessage = TypeMessage.CREATION
indexpb.storage_key = storage_key
indexpb.kbid = kb
if partition:
indexpb.partition = partition
indexpb.source = source
indexpb.resource = resource.resource.uuid

nats = NatsConnectionManager(
service_name="nidx",
nats_servers=["nats://localhost:4222"],
)
await nats.initialize()
indexpb.typemessage = TypeMessage.CREATION
indexpb.storage_key = storage_key
indexpb.kbid = kb
indexpb.source = source
indexpb.resource = resource.resource.uuid
indexpb.shard = shard.shard

for replica_id, node_id in self.indexing_replicas(shard):
indexpb.node = node_id
indexpb.shard = replica_id
await nats.js.publish("nidx", indexpb.SerializeToString())
await nidx.index(indexpb)


def get_all_shard_nodes(
Expand Down
Loading

0 comments on commit f98a562

Please sign in to comment.