Skip to content

Commit

Permalink
Remove non pg catalog (#2638)
Browse files Browse the repository at this point in the history
* Remove local maindb driver

* Remove index node catalog implementation

* Pg catalog always enabled
  • Loading branch information
lferran authored Nov 15, 2024
1 parent 12db918 commit 7ca8964
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 63 deletions.
13 changes: 1 addition & 12 deletions nucliadb/src/nucliadb/ingest/orm/processor/pgcatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
from typing import cast

from nucliadb.common.maindb.driver import Transaction
from nucliadb.common.maindb.pg import PGDriver, PGTransaction
from nucliadb.common.maindb.utils import get_driver
from nucliadb.common.maindb.pg import PGTransaction
from nucliadb_telemetry import metrics

from ..resource import Resource
Expand All @@ -34,15 +33,8 @@ def _pg_transaction(txn: Transaction) -> PGTransaction:
return cast(PGTransaction, txn)


def pgcatalog_enabled(kbid):
return isinstance(get_driver(), PGDriver)


@observer.wrap({"type": "update"})
async def pgcatalog_update(txn: Transaction, kbid: str, resource: Resource):
if not pgcatalog_enabled(kbid):
return

if resource.basic is None:
raise ValueError("Cannot index into the catalog a resource without basic metadata ")

Expand Down Expand Up @@ -76,9 +68,6 @@ async def pgcatalog_update(txn: Transaction, kbid: str, resource: Resource):

@observer.wrap({"type": "delete"})
async def pgcatalog_delete(txn: Transaction, kbid: str, rid: str):
if not pgcatalog_enabled(kbid):
return

async with _pg_transaction(txn).connection.cursor() as cur:
await cur.execute(
"DELETE FROM catalog where kbid = %(kbid)s AND rid = %(rid)s", {"kbid": kbid, "rid": rid}
Expand Down
56 changes: 10 additions & 46 deletions nucliadb/src/nucliadb/search/api/v1/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from nucliadb.search.search import cache
from nucliadb.search.search.exceptions import InvalidQueryError
from nucliadb.search.search.merge import fetch_resources, merge_results
from nucliadb.search.search.pgcatalog import pgcatalog_enabled, pgcatalog_search
from nucliadb.search.search.pgcatalog import pgcatalog_search
from nucliadb.search.search.query import QueryParser
from nucliadb.search.search.utils import (
filter_hidden_resources,
Expand Down Expand Up @@ -322,51 +322,15 @@ async def catalog(
range_modification_end=item.range_modification_end,
hidden=item.hidden,
)
pb_query, _, _ = await query_parser.parse()

if not pgcatalog_enabled(kbid):
(results, _, queried_nodes) = await node_query(
kbid,
Method.SEARCH,
pb_query,
target_shard_replicas=item.shards,
# Catalog should not go to read replicas because we want it to be
# consistent and most up to date results
use_read_replica_nodes=False,
)

# We need to merge
search_results = await merge_results(
results,
count=item.page_size,
page=item.page_number,
kbid=kbid,
show=[ResourceProperties.BASIC, ResourceProperties.ERRORS],
field_type_filter=list(FieldTypeName),
extracted=[],
sort=sort,
requested_relations=pb_query.relation_subgraph,
min_score=query_parser.min_score,
highlight=False,
)
catalog_results = CatalogResponse(
resources=search_results.resources,
fulltext=search_results.fulltext,
)
else:
catalog_results = CatalogResponse()
catalog_results.fulltext = await pgcatalog_search(query_parser)
catalog_results.resources = await fetch_resources(
resources=[r.rid for r in catalog_results.fulltext.results],
kbid=kbid,
show=[ResourceProperties.BASIC, ResourceProperties.ERRORS],
field_type_filter=list(FieldTypeName),
extracted=[],
)
queried_nodes = []
if len(queried_nodes) > 0:
queried_shards = [shard_id for _, shard_id in queried_nodes]
catalog_results.shards = queried_shards
catalog_results = CatalogResponse()
catalog_results.fulltext = await pgcatalog_search(query_parser)
catalog_results.resources = await fetch_resources(
resources=[r.rid for r in catalog_results.fulltext.results],
kbid=kbid,
show=[ResourceProperties.BASIC, ResourceProperties.ERRORS],
field_type_filter=list(FieldTypeName),
extracted=[],
)
return catalog_results
except InvalidQueryError as exc:
return HTTPClientError(status_code=412, detail=str(exc))
Expand Down
4 changes: 0 additions & 4 deletions nucliadb/src/nucliadb/search/search/pgcatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,6 @@ def _pg_driver() -> PGDriver:
return cast(PGDriver, get_driver())


def pgcatalog_enabled(kbid):
return isinstance(get_driver(), PGDriver)


@observer.wrap({"op": "search"})
async def pgcatalog_search(query_parser: QueryParser) -> Resources:
# Prepare SQL query
Expand Down
1 change: 0 additions & 1 deletion nucliadb_sdk/src/nucliadb_sdk/tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ def nucliadb(pg):
images.settings["nucliadb"]["env"]["DRIVER_PG_URL"] = images.settings["nucliadb"]["env"][
"DRIVER_PG_URL"
].replace("localhost", get_docker_internal_host())
print("NucliaDB running on", images.settings["nucliadb"]["env"]["DRIVER_PG_URL"])
container = NucliaDB()
host, port = container.run()
network = container.container_obj.attrs["NetworkSettings"]
Expand Down

0 comments on commit 7ca8964

Please sign in to comment.