Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PGCatalog: Metrics & Fix paging #2347

Merged
merged 2 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions nucliadb/src/nucliadb/ingest/orm/processor/pgcatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
from nucliadb.common.maindb.driver import Transaction
from nucliadb.common.maindb.pg import PGDriver, PGTransaction
from nucliadb.common.maindb.utils import get_driver
from nucliadb_telemetry import metrics
from nucliadb_utils import const
from nucliadb_utils.utilities import has_feature

from ..resource import Resource

observer = metrics.Observer("pg_catalog_write", labels={"type": ""})


def _pg_transaction(txn: Transaction) -> PGTransaction:
return cast(PGTransaction, txn)
Expand All @@ -39,6 +42,7 @@ def pgcatalog_enabled(kbid):
)


@observer.wrap({"type": "update"})
async def pgcatalog_update(txn: Transaction, kbid: str, resource: Resource):
if not pgcatalog_enabled(kbid):
return
Expand Down Expand Up @@ -69,6 +73,7 @@ async def pgcatalog_update(txn: Transaction, kbid: str, resource: Resource):
)


@observer.wrap({"type": "delete"})
async def pgcatalog_delete(txn: Transaction, kbid: str, rid: str):
if not pgcatalog_enabled(kbid):
return
Expand Down
43 changes: 25 additions & 18 deletions nucliadb/src/nucliadb/search/api/v1/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from nucliadb.search.api.v1.utils import fastapi_query
from nucliadb.search.requesters.utils import Method, debug_nodes_info, node_query
from nucliadb.search.search.exceptions import InvalidQueryError
from nucliadb.search.search.merge import merge_results
from nucliadb.search.search.merge import fetch_resources, merge_results
from nucliadb.search.search.pgcatalog import pgcatalog_enabled, pgcatalog_search
from nucliadb.search.search.query import QueryParser
from nucliadb.search.search.utils import (
Expand Down Expand Up @@ -328,26 +328,33 @@ async def catalog(
# consistent and most up to date results
use_read_replica_nodes=False,
)

# We need to merge
search_results = await merge_results(
results,
count=item.page_size,
page=item.page_number,
kbid=kbid,
show=[ResourceProperties.BASIC],
field_type_filter=[],
extracted=[],
sort=sort,
requested_relations=pb_query.relation_subgraph,
min_score=query_parser.min_score,
highlight=False,
)
else:
result = await pgcatalog_search(query_parser)
results = [result]
item.page_number = 0
search_results = KnowledgeboxSearchResults()
search_results.fulltext = await pgcatalog_search(query_parser)
search_results.resources = await fetch_resources(
resources=[r.rid for r in search_results.fulltext.results],
kbid=kbid,
show=[ResourceProperties.BASIC],
field_type_filter=[],
extracted=[],
)
queried_nodes = []

# We need to merge
search_results = await merge_results(
results,
count=item.page_size,
page=item.page_number,
kbid=kbid,
show=[ResourceProperties.BASIC],
field_type_filter=[],
extracted=[],
sort=sort,
requested_relations=pb_query.relation_subgraph,
min_score=query_parser.min_score,
highlight=False,
)
# We don't need sentences, paragraphs or relations on the catalog
# response, so we set to None so that fastapi doesn't include them
# in the response payload
Expand Down
55 changes: 32 additions & 23 deletions nucliadb/src/nucliadb/search/search/pgcatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@

from nucliadb.common.maindb.pg import PGDriver
from nucliadb.common.maindb.utils import get_driver
from nucliadb_models.labels import translate_system_to_alias_label
from nucliadb_models.metadata import ResourceProcessingStatus
from nucliadb_models.search import SortField, SortOrder
from nucliadb_protos.nodereader_pb2 import (
DocumentResult,
DocumentSearchResponse,
FacetResult,
FacetResults,
SearchResponse,
from nucliadb_models.search import (
ResourceResult,
Resources,
SortField,
SortOrder,
)
from nucliadb_telemetry import metrics
from nucliadb_utils import const
from nucliadb_utils.utilities import has_feature

Expand Down Expand Up @@ -150,17 +150,18 @@ def pgcatalog_enabled(kbid):
)


async def pgcatalog_search(query_parser: QueryParser):
@metrics.Observer("pg_catalog_search").wrap()
async def pgcatalog_search(query_parser: QueryParser) -> Resources:
# Prepare SQL query
query, query_params = _prepare_query(query_parser)

async with _pg_driver()._get_connection() as conn, conn.cursor(row_factory=dict_row) as cur:
facets: dict[str, FacetResults] = {}
facets = {}

# Faceted search
if query_parser.faceted:
tmp_facets: dict[str, list[FacetResult]] = {
translate_label(f): [] for f in query_parser.faceted
tmp_facets: dict[str, dict[str, int]] = {
translate_label(f): {} for f in query_parser.faceted
}
await cur.execute(
f"SELECT unnest(labels) AS label, COUNT(*) FROM ({query}) fc GROUP BY 1 ORDER BY 1",
Expand All @@ -171,9 +172,9 @@ async def pgcatalog_search(query_parser: QueryParser):
parent = "/".join(label.split("/")[:-1])
count = row["count"]
if parent in tmp_facets:
tmp_facets[parent].append(FacetResult(tag=label, total=count))
tmp_facets[parent][translate_system_to_alias_label(label)] = count

facets = {k: FacetResults(facetresults=v) for k, v in tmp_facets.items()}
facets = {translate_system_to_alias_label(k): v for k, v in tmp_facets.items()}

# Totals
await cur.execute(
Expand All @@ -194,14 +195,22 @@ async def pgcatalog_search(query_parser: QueryParser):
)
data = await cur.fetchall()

return SearchResponse(
document=DocumentSearchResponse(
results=[
DocumentResult(uuid=str(r["rid"]).replace("-", ""), field="/a/title") for r in data
],
facets=facets,
total=total,
page_number=query_parser.page_number,
next_page=(offset + len(data) < total),
)
return Resources(
facets=facets,
results=[
ResourceResult(
rid=str(r["rid"]).replace("-", ""),
field="title",
field_type="a",
labels=[label for label in r["labels"] if label.startswith("/l/")],
score=0,
)
for r in data
],
query=query_parser.query,
total=total,
page_number=query_parser.page_number,
page_size=query_parser.page_size,
next_page=(offset + len(data) < total),
min_score=0,
)
1 change: 1 addition & 0 deletions nucliadb/tests/nucliadb/integration/search/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -1618,6 +1618,7 @@ async def test_catalog_pagination(
assert resp.status_code == 200
body = resp.json()
assert len(body["resources"]) <= page_size
assert body["fulltext"]["page_number"] == page_number
for resource_id, resource_data in body["resources"].items():
resource_created_date = datetime.fromisoformat(resource_data["created"]).timestamp()
if resource_id in resource_uuids:
Expand Down
Loading