Skip to content

Commit

Permalink
Use node writer response on set/delete resource (#1610)
Browse files Browse the repository at this point in the history
* Use node writer response on set/delete resource

* use custom exception
  • Loading branch information
jotare authored Nov 27, 2023
1 parent ef57279 commit bacd9c0
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
11 changes: 9 additions & 2 deletions nucliadb_node/nucliadb_node/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@
)


class IndexNodeError(Exception):
pass


class ShardManager:
schedule_delay_seconds = 30.0

Expand Down Expand Up @@ -307,10 +311,13 @@ async def subscription_worker(self, msg: Msg):
msg, nats_consumer_settings.nats_ack_wait * 0.66
), sm.lock:
try:
status = None
if pb.typemessage == TypeMessage.CREATION:
await self.set_resource(pb)
status = await self.set_resource(pb)
elif pb.typemessage == TypeMessage.DELETION:
await self.delete_resource(pb)
status = await self.delete_resource(pb)
if status is not None and status.status != OpStatus.Status.OK:
raise IndexNodeError(status.detail)
sm.shard_changed_event()
except AioRpcError as grpc_error:
if grpc_error.code() == StatusCode.NOT_FOUND:
Expand Down
16 changes: 13 additions & 3 deletions nucliadb_node/nucliadb_node/tests/unit/test_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import asyncio
import tempfile
from unittest import mock
from unittest.mock import AsyncMock, MagicMock, Mock
from unittest.mock import AsyncMock, MagicMock, Mock, patch

import pytest
from nats.aio.client import Msg
from nucliadb_protos.nodewriter_pb2 import IndexMessage, TypeMessage
from nucliadb_protos.nodewriter_pb2 import IndexMessage, OpStatus, TypeMessage

from nucliadb_node.pull import IndexedPublisher, ShardManager, Worker
from nucliadb_node.pull import IndexedPublisher, IndexNodeError, ShardManager, Worker
from nucliadb_node.settings import settings
from nucliadb_utils import const

Expand Down Expand Up @@ -188,3 +188,13 @@ async def test_reconnected_cb(self, worker: Worker):
assert worker.nc.jetstream().subscribe.call_count == 2
finally:
await worker.finalize()

@pytest.mark.asyncio
async def test_node_writer_errors_are_managed(self, worker: Worker):
status = OpStatus()
status.status = OpStatus.Status.ERROR
status.detail = "node writer error"
with patch("nucliadb_node.pull.Worker.set_resource", return_value=status):
msg = self.get_msg(seqid=1)
with pytest.raises(IndexNodeError):
await worker.subscription_worker(msg)

1 comment on commit bacd9c0

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: bacd9c0 Previous: 6a3e3cd Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 13073.552268206211 iter/sec (stddev: 2.344172206362573e-7) 6621.622688322887 iter/sec (stddev: 0.000003234538187648496) 0.51

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.