Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Index creation (GSI-327) #13

Merged
merged 14 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,21 @@ We recommend using the provided Docker container.

A pre-build version is available at [docker hub](https://hub.docker.com/repository/docker/ghga/mass):
```bash
docker pull ghga/mass:0.2.0
docker pull ghga/mass:0.2.1
```

Or you can build the container yourself from the [`./Dockerfile`](./Dockerfile):
```bash
# Execute in the repo's root dir:
docker build -t ghga/mass:0.2.0 .
docker build -t ghga/mass:0.2.1 .
```

For production-ready deployment, we recommend using Kubernetes, however,
for simple use cases, you could execute the service using docker
on a single server:
```bash
# The entrypoint is preconfigured:
docker run -p 8080:8080 ghga/mass:0.2.0 --help
docker run -p 8080:8080 ghga/mass:0.2.1 --help
```

If you prefer not to use containers, you may install the service from source:
Expand Down
2 changes: 1 addition & 1 deletion mass/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@

"""A service for searching metadata artifacts and filtering results."""

__version__ = "0.2.0"
__version__ = "0.2.1"
4 changes: 4 additions & 0 deletions mass/adapters/outbound/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ async def aggregate(
skip: int = 0,
limit: Optional[int] = None,
) -> JsonObject:
# don't carry out aggregation if the collection is empty
if not await self._collection.find_one():
return models.QueryResults().dict()

# build the aggregation pipeline
pipeline = utils.build_pipeline(
query=query,
Expand Down
41 changes: 38 additions & 3 deletions mass/adapters/outbound/dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

"""Contains the ResourceDaoCollection, which houses a DAO for each resource class"""
from hexkit.protocols.dao import DaoFactoryProtocol
from pymongo import TEXT, MongoClient

from mass.config import SearchableClassesConfig
from mass.config import Config
from mass.core import models
from mass.ports.outbound.dao import DaoCollectionPort, ResourceDao

Expand All @@ -36,7 +37,7 @@ class DaoCollection(DaoCollectionPort):
async def construct(
cls,
dao_factory: DaoFactoryProtocol,
config: SearchableClassesConfig,
config: Config,
):
"""Initialize the DAO collection with one DAO for each resource class"""
resource_daos: dict[str, ResourceDao] = {}
Expand All @@ -45,14 +46,17 @@ async def construct(
name=name, dto_model=models.Resource, id_field="id_"
)

return cls(resource_daos=resource_daos)
return cls(config=config, resource_daos=resource_daos)

def __init__(
self,
config: Config,
resource_daos: dict[str, ResourceDao],
):
"""Initialize the collection of DAOs"""
self._config = config
self._resource_daos = resource_daos
self._indexes_created = False

def get_dao(self, *, class_name: str) -> ResourceDao:
"""returns a dao for the given resource class name
Expand All @@ -64,3 +68,34 @@ def get_dao(self, *, class_name: str) -> ResourceDao:
return self._resource_daos[class_name]
except KeyError as err:
raise DaoNotFoundError(class_name=class_name) from err

def create_collections_and_indexes_if_needed(self) -> None:
# This only needs to be done once, so exit if we've already created the indexes
if self._indexes_created:
return

# get client
client: MongoClient = MongoClient(
self._config.db_connection_str.get_secret_value()
)
db = client[self._config.db_name]
dontseyit marked this conversation as resolved.
Show resolved Hide resolved

existing_collections = set(db.list_collection_names())

# loop through configured classes (i.e. the expected collection names)
for expected_collection_name in self._config.searchable_classes:
if expected_collection_name not in existing_collections:
db.create_collection(expected_collection_name)
collection = db[expected_collection_name]

# see if the wildcard text index exists and add it if not
wildcard_text_index_exists = any(
index["name"] == f"$**_{TEXT}" for index in collection.list_indexes()
)

if not wildcard_text_index_exists:
collection.create_index([("$**", TEXT)])

# close client and remember that the indexes have been set up
client.close()
self._indexes_created = True
3 changes: 3 additions & 0 deletions mass/core/query_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ async def load_resource(self, *, resource: models.Resource, class_name: str):
if class_name not in self._config.searchable_classes:
raise self.ClassNotConfiguredError(class_name=class_name)

self._dao_collection.create_collections_and_indexes_if_needed()

dao = self._dao_collection.get_dao(class_name=class_name)

await dao.upsert(resource)

async def delete_resource(self, *, resource_id: str, class_name: str):
Expand Down
4 changes: 2 additions & 2 deletions mass/ports/inbound/query_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ async def handle_query(
class_name: str,
query: str,
filters: list[models.Filter],
skip: int,
skip: int = 0,
limit: Optional[int] = None,
):
) -> models.QueryResults:
"""Processes a query

Raises:
Expand Down
10 changes: 10 additions & 0 deletions mass/ports/outbound/dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,13 @@ def get_dao(self, *, class_name: str) -> ResourceDao:
A DAO for the specified resource (ResourceDaoPort)
"""
...

def create_collections_and_indexes_if_needed(self) -> None:
"""Creates `MongoDB` collections and indexes.

Creates collections for all configured classes in `searchable_classes` if they don't
already exist. At the same time, it will also create the text index if it doesn't
already exist. This is primarily needed because the text index has to exist in order
to perform query string searches.
"""
...
8 changes: 4 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@
Since we're using session-scoped fixtures, declare everything in here.
"""

import pytest
import pytest_asyncio
from hexkit.providers.akafka.testutils import get_kafka_fixture
from hexkit.providers.mongodb.testutils import get_mongodb_fixture
from hexkit.providers.testing.utils import get_event_loop

from tests.fixtures.joint import JointFixture, get_joint_fixture


@pytest.fixture(autouse=True)
def reset_state(joint_fixture: JointFixture): # noqa: F811
@pytest_asyncio.fixture(autouse=True)
async def reset_state(joint_fixture: JointFixture): # noqa: F811
"""Clear joint_fixture state before tests.

This is a function-level fixture because it needs to run in each test.
"""
joint_fixture.remove_db_data()
joint_fixture.load_test_data()
await joint_fixture.load_test_data()


event_loop = get_event_loop("session")
Expand Down
17 changes: 9 additions & 8 deletions tests/fixtures/joint.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from hexkit.custom_types import JsonObject
from hexkit.providers.akafka.testutils import KafkaFixture
from hexkit.providers.mongodb.testutils import MongoDbFixture # noqa: F401
from pymongo import TEXT
from pytest_asyncio.plugin import _ScopeName

from mass.config import Config
Expand All @@ -52,20 +51,22 @@ def remove_db_data(self) -> None:
"""Delete everything in the database to start from a clean slate"""
self.mongodb.empty_collections()

def load_test_data(self) -> None:
async def load_test_data(self) -> None:
"""Populate a collection for each file in test_data"""
filename_pattern = re.compile(r"/(\w+)\.json")
query_handler = await self.container.query_handler()
query_handler._dao_collection._indexes_created = (
False # pylint: disable=protected-access
)
for filename in glob.glob("tests/fixtures/test_data/*.json"):
match_obj = re.search(filename_pattern, filename)
if match_obj:
collection_name = match_obj.group(1)
resources = get_resources_from_file(filename)
self.mongodb.client[self.config.db_name][collection_name].insert_many(
resources
)
self.mongodb.client[self.config.db_name][collection_name].create_index(
keys=[("$**", TEXT)]
)
for resource in resources:
await query_handler.load_resource(
resource=resource, class_name=collection_name
)

async def call_search_endpoint(
self, search_parameters: JsonObject
Expand Down
5 changes: 5 additions & 0 deletions tests/fixtures/test_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ searchable_classes:
name: Field 1
- key: "has_object.type"
name: Object Type
EmptyCollection:
description: An empty collection to test the index creation.
facetable_properties:
- key: fun_fact
name: Fun Fact
resource_change_event_topic: searchable_resources
resource_deletion_event_type: searchable_resource_deleted
resource_upsertion_event_type: searchable_resource_upserted
Expand Down
6 changes: 3 additions & 3 deletions tests/fixtures/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ def dto_to_document(dto: BaseModel):
return document


def get_resources_from_file(filename: str):
def get_resources_from_file(filename: str) -> list[Resource]:
"""Utility function to load resources from a file"""
with open(filename, "r", encoding="utf-8") as file:
json_object = json.loads(file.read())
resources = []
resources: list[Resource] = []
for item in json_object["items"]:
id_ = item.pop("id_")
resource = Resource(id_=id_, content=item)
resources.append(dto_to_document(resource))
resources.append(resource)
return resources
103 changes: 103 additions & 0 deletions tests/test_index_creation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright 2021 - 2023 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln
# for the German Human Genome-Phenome Archive (GHGA)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Test index creation"""
import pytest
from pymongo import TEXT

from mass.core import models
from tests.fixtures.joint import JointFixture

CLASS_NAME = "EmptyCollection"
RESOURCE = models.Resource(
id_="test",
content={"fun_fact": "The original name for the search engine Google was Backrub."},
)
QUERY_STRING = "Backrub"


@pytest.mark.parametrize("create_index_manually", (False, True))
@pytest.mark.asyncio
async def test_index_creation(joint_fixture: JointFixture, create_index_manually: bool):
"""Test the index creation function."""
# indexes will have been created in fixture setup, so we actually need to del those
joint_fixture.remove_db_data()

# verify collection does not exist
database = joint_fixture.mongodb.client[joint_fixture.config.db_name]
assert CLASS_NAME not in database.list_collection_names()

query_handler = await joint_fixture.container.query_handler()

# reset the flag so it actually runs the indexing function
query_handler._dao_collection._indexes_created = (
False # pylint: disable=protected-access
)

# make sure we do not get an error when trying to query non-existent collection
results_without_coll = await query_handler.handle_query(
class_name=CLASS_NAME,
query=QUERY_STRING,
filters=[],
)
# should have received empty results model
assert results_without_coll == models.QueryResults()

# create collection without index
joint_fixture.mongodb.client[joint_fixture.config.db_name].create_collection(
CLASS_NAME
)

# verify collection exists
assert CLASS_NAME in database.list_collection_names()

collection = database[CLASS_NAME]

# verify collection does not have the text index
assert not any(
index["name"] == f"$**_{TEXT}" for index in collection.list_indexes()
)

# Verify querying empty collection with query string gives empty results model
results_without_coll = await query_handler.handle_query(
class_name=CLASS_NAME,
query=QUERY_STRING,
filters=[],
)
assert results_without_coll == models.QueryResults()

if create_index_manually:
# check that the index creation function works when an index is already present
collection.create_index([("$**", TEXT)])
assert any(
index["name"] == f"$**_{TEXT}" for index in collection.list_indexes()
)

# load a resource
await query_handler.load_resource(resource=RESOURCE, class_name=CLASS_NAME)

# verify the text index exists now
assert any(index["name"] == f"$**_{TEXT}" for index in collection.list_indexes())

# verify that supplying a query string doesn't result in an error
results_with_coll = await query_handler.handle_query(
class_name=CLASS_NAME,
query=QUERY_STRING,
filters=[],
)

assert results_with_coll.count == 1
assert results_with_coll.hits[0] == RESOURCE