Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
vangheem committed Jan 16, 2024
1 parent 17bcab8 commit 824c4a2
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 14 deletions.
4 changes: 2 additions & 2 deletions nucliadb/nucliadb/common/http_clients/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async def pull(self, partition: str) -> PullResponse:


class ProcessRequestResponseV2(pydantic.BaseModel):
payload: bytes
payload: Optional[str]
processing_id: str
kbid: Optional[str]
account_id: str
Expand Down Expand Up @@ -174,7 +174,7 @@ async def close(self):

async def pull(self, cursor: Optional[str], limit: int = 5) -> PullResponseV2:
url = self.base_url + "/pull"
params = {"limit": limit}
params: dict[str, str] = {"limit": str(limit)}
if cursor is not None:
params["cursor"] = cursor
async with self.session.get(url, headers=self.headers, params=params) as resp:
Expand Down
12 changes: 11 additions & 1 deletion nucliadb/nucliadb/ingest/consumer/pull_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,17 @@ async def _loop(self):

if len(data.results) > 0:
for result in data.results:
await self.handle_message(result.payload)
if result.payload is not None:
await self.handle_message(result.payload)
else:
logger.warning(
"Payload has expired, skipping.",
extra={
"resource_id": result.resource_id,
"kbid": result.kbid,
"processing_id": result.processing_id,
},
)
cursor = data.cursor
async with self.driver.transaction() as txn:
await txn.set(DB_TXN_KEY, cursor.encode())
Expand Down
16 changes: 10 additions & 6 deletions nucliadb/nucliadb/ingest/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#
import base64
import datetime
import logging
import uuid
from collections import defaultdict
from contextlib import AsyncExitStack
Expand All @@ -36,15 +37,16 @@
from pydantic import BaseModel, Field

import nucliadb_models as models
from nucliadb.ingest.settings import settings as ingest_settings
from nucliadb_models.configuration import KBConfiguration
from nucliadb_models.resource import QueueType
from nucliadb.ingest.settings import settings as ingest_settings
from nucliadb_telemetry import metrics
from nucliadb_utils import const, logger
from nucliadb_utils.exceptions import LimitsExceededError, SendToProcessError
from nucliadb_utils.settings import nuclia_settings, storage_settings
from nucliadb_utils.storages.storage import Storage
from nucliadb_utils.utilities import Utility, get_ingest, has_feature, set_utility
from nucliadb_utils.utilities import Utility, get_ingest, set_utility

logger = logging.getLogger(__name__)

_T = TypeVar("_T")

Expand Down Expand Up @@ -73,9 +75,9 @@ class Source(SourceValue, Enum): # type: ignore


class ProcessingInfo(BaseModel):
seqid: int
seqid: Optional[int]
account_seq: Optional[int]
queue: QueueType
queue: Optional[QueueType] = None


class PushPayload(BaseModel):
Expand Down Expand Up @@ -440,7 +442,9 @@ async def send_to_process(
)

return ProcessingInfo(
seqid=seqid, account_seq=account_seq, queue=QueueType(queue_type)
seqid=seqid,
account_seq=account_seq,
queue=QueueType(queue_type) if queue_type is not None else None,
)


Expand Down
2 changes: 1 addition & 1 deletion nucliadb/nucliadb/standalone/lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from nucliadb.common.cluster.utils import setup_cluster, teardown_cluster
from nucliadb.ingest.app import initialize_grpc as initialize_ingest_grpc
from nucliadb.ingest.app import initialize_pull_workers, initialize_pull_worker_v2
from nucliadb.ingest.app import initialize_pull_worker_v2, initialize_pull_workers
from nucliadb.ingest.settings import settings as ingest_settings
from nucliadb.reader.lifecycle import finalize as finalize_reader
from nucliadb.reader.lifecycle import initialize as initialize_reader
Expand Down
2 changes: 1 addition & 1 deletion nucliadb/nucliadb/writer/api/v1/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ async def finish_field_put(
toprocess: PushPayload,
partition: int,
wait_on_commit: bool,
) -> int:
) -> Optional[int]:
# Create processing message
transaction = get_transaction_utility()
processing = get_processing()
Expand Down
2 changes: 1 addition & 1 deletion nucliadb/nucliadb/writer/api/v1/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ async def store_file_on_nuclia_db(
md5: Optional[str] = None,
item: Optional[CreateResourcePayload] = None,
wait_on_commit: bool = False,
) -> int:
) -> Optional[int]:
# File is on NucliaDB Storage at path

partitioning = get_partitioning()
Expand Down
13 changes: 11 additions & 2 deletions nucliadb/nucliadb/writer/resource/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,19 @@ def set_status_modify(basic: Basic, item: UpdateResourcePayload):


def set_processing_info(bm: BrokerMessage, processing_info: ProcessingInfo):
bm.basic.last_seqid = processing_info.seqid
"""
Processing V2 does not have this awkward processing into data field and storage
but keeping for b/w compatibility.
Once V1 is removed, this code can be removed because status checking will be done
in a separate API that is not part of NucliaDB.
"""
if processing_info.seqid is not None:
bm.basic.last_seqid = processing_info.seqid
if processing_info.account_seq is not None:
bm.basic.last_account_seq = processing_info.account_seq
bm.basic.queue = bm.basic.QueueType.Value(processing_info.queue.name)
if processing_info.queue is not None:
bm.basic.queue = bm.basic.QueueType.Value(processing_info.queue.name)


def validate_classifications(paragraph: ParagraphAnnotation):
Expand Down

1 comment on commit 824c4a2

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 824c4a2 Previous: 5a633b0 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 13000.529949574622 iter/sec (stddev: 1.8846980039477308e-7) 12745.686329086004 iter/sec (stddev: 1.7317806991721728e-7) 0.98

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.