Skip to content

Commit

Permalink
Add non-functional consumer and basic test (GSI-7) (#2)
Browse files Browse the repository at this point in the history
* Add consumer with no functionality

* Add joint_fixture and basic test for consumer

* Remove hexkit[dev] from extras_require

* Rename email_notification to notification

---------

Co-authored-by: Byron Himes <[email protected]>
  • Loading branch information
TheByronHimes and TheByronHimes authored Apr 13, 2023
1 parent 59f63ed commit eeaccb6
Show file tree
Hide file tree
Showing 12 changed files with 293 additions and 8 deletions.
2 changes: 2 additions & 0 deletions .devcontainer/.dev_config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Please only mention the non-default settings here:

notification_event_topic: "notifications"
notification_event_type: "notification"
service_instance_id: 001
kafka_servers: ["kafka:9092"]
20 changes: 20 additions & 0 deletions config_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,24 @@
"description": "Modifies the orginal Settings class provided by the user",
"type": "object",
"properties": {
"notification_event_topic": {
"title": "Notification Event Topic",
"description": "Name of the event topic used to track notification events",
"example": "notifications",
"env_names": [
"ns_notification_event_topic"
],
"type": "string"
},
"notification_event_type": {
"title": "Notification Event Type",
"description": "The type to use for events containing content to be sent",
"example": "notification",
"env_names": [
"ns_notification_event_type"
],
"type": "string"
},
"service_name": {
"title": "Service Name",
"default": "ns",
Expand Down Expand Up @@ -36,6 +54,8 @@
}
},
"required": [
"notification_event_topic",
"notification_event_type",
"service_instance_id",
"kafka_servers"
],
Expand Down
2 changes: 2 additions & 0 deletions example_config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
kafka_servers:
- kafka:9092
notification_event_topic: notifications
notification_event_type: notification
service_instance_id: '1'
service_name: ns
6 changes: 5 additions & 1 deletion ns/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
# limitations under the License.

"""Entrypoint of the package"""
import asyncio

from ns.main import consume_events

def run():

def run(run_forever: bool = True):
"""Run the service"""
asyncio.run(consume_events(run_forever=run_forever))


if __name__ == "__main__":
Expand Down
57 changes: 57 additions & 0 deletions ns/adapters/inbound/akafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright 2021 - 2023 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln
# for the German Human Genome-Phenome Archive (GHGA)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Event subscriber details for notification events"""
from hexkit.custom_types import Ascii, JsonObject
from hexkit.protocols.eventsub import EventSubscriberProtocol
from pydantic import BaseSettings, Field


class EventSubTranslatorConfig(BaseSettings):
"""Config for the event subscriber"""

notification_event_topic: str = Field(
...,
description="Name of the event topic used to track notification events",
example="notifications",
)
notification_event_type: str = Field(
...,
description="The type to use for events containing content to be sent",
example="notification",
)


class EventSubTranslator(EventSubscriberProtocol):
"""A translator that can consume Notification events"""

def __init__(self, *, config: EventSubTranslatorConfig):
self.topics_of_interest = [config.notification_event_topic]
self.types_of_interest = [config.notification_event_type]
self._config = config

async def _send_notification(self, *, payload: JsonObject):
"""Validates the schema, then makes a call to the notifier with the payload"""
raise NotImplementedError()

async def _consume_validated(
self, *, payload: JsonObject, type_: Ascii, topic: Ascii
) -> None:
"""Consumes an event"""
if (
type_ == self._config.notification_event_type
and topic == self._config.notification_event_topic
):
await self._send_notification(payload=payload)
4 changes: 3 additions & 1 deletion ns/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
from hexkit.config import config_from_yaml
from hexkit.providers.akafka import KafkaConfig

from ns.adapters.inbound.akafka import EventSubTranslatorConfig


@config_from_yaml(prefix="ns")
class Config(KafkaConfig):
class Config(KafkaConfig, EventSubTranslatorConfig):
"""Config parameters and their defaults."""

service_name: str = "ns"
Expand Down
35 changes: 35 additions & 0 deletions ns/container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2021 - 2023 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln
# for the German Human Genome-Phenome Archive (GHGA)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Contains dependency injection setup"""
from hexkit.inject import ContainerBase, get_configurator, get_constructor
from hexkit.providers.akafka.provider import KafkaEventSubscriber

from ns.adapters.inbound.akafka import EventSubTranslator
from ns.config import Config


class Container(ContainerBase):
"""Dependency-Injection Container"""

config = get_configurator(Config)

# inbound translators
event_sub_translator = get_constructor(EventSubTranslator, config=config)

# inbound providers
kafka_event_subscriber = get_constructor(
KafkaEventSubscriber, config=config, translator=event_sub_translator
)
37 changes: 37 additions & 0 deletions ns/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright 2021 - 2023 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln
# for the German Human Genome-Phenome Archive (GHGA)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Top-level DI Container and consumer creation, entry point is consume_events()"""

from ns.config import Config
from ns.container import Container


def get_configured_container(*, config: Config) -> Container:
"""Create and configure a DI container."""

container = Container()
container.config.load_config(config)

return container


async def consume_events(run_forever: bool = True):
"""Start consuming events with kafka"""
config = Config()

async with get_configured_container(config=config) as container:
event_consumer = await container.kafka_event_subscriber()
await event_consumer.run(forever=run_forever)
44 changes: 44 additions & 0 deletions tests/fixtures/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright 2021 - 2023 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln
# for the German Human Genome-Phenome Archive (GHGA)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Test config"""

from pathlib import Path
from typing import Dict, List, Optional

from pydantic.env_settings import BaseSettings

from ns.config import Config
from tests.fixtures.utils import BASE_DIR

TEST_CONFIG_YAML = BASE_DIR / "test_config.yaml"


def get_config(
sources: Optional[List[BaseSettings]] = None,
default_config_yaml: Path = TEST_CONFIG_YAML,
) -> Config:
"""Merges parameters from the default TEST_CONFIG_YAML with params inferred
from testcontainers."""
sources_dict: Dict[str, object] = {}

if sources is not None:
for source in sources:
sources_dict.update(**source.dict())

return Config(config_yaml=default_config_yaml, **sources_dict) # type: ignore


DEFAULT_CONFIG = get_config()
49 changes: 49 additions & 0 deletions tests/fixtures/joint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2021 - 2023 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln
# for the German Human Genome-Phenome Archive (GHGA)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from dataclasses import dataclass
from typing import AsyncGenerator

import pytest_asyncio
from hexkit.providers.akafka.testutils import KafkaFixture, kafka_fixture # noqa: F401

from ns.config import Config
from ns.container import Container
from ns.main import get_configured_container
from tests.fixtures.config import get_config


@dataclass
class JointFixture:
"""returned by joint_fixture"""

config: Config
kafka: KafkaFixture
container: Container


@pytest_asyncio.fixture
async def joint_fixture(
kafka_fixture: KafkaFixture, # noqa: F811
) -> AsyncGenerator[JointFixture, None]:
"""A fixture that embeds all other fixtures for API-level integration testing"""

# merge configs from different sources with the default one:
config = get_config(sources=[kafka_fixture.config])

# create a DI container instance:translators
async with get_configured_container(config=config) as container:
yield JointFixture(config=config, container=container, kafka=kafka_fixture)
11 changes: 5 additions & 6 deletions tests/test_dummy.py → tests/fixtures/test_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Test dummy."""


def test_dummy():
"""Just makes the CI pass."""
assert True
notification_event_topic: "notifications"
notification_event_type: "notification"
service_instance_id: 001
kafka_servers: ["kafka:9092"]
34 changes: 34 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2021 - 2023 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln
# for the German Human Genome-Phenome Archive (GHGA)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Test dummy."""
import pytest
from hexkit.providers.akafka.testutils import kafka_fixture # noqa: F401

from tests.fixtures.joint import JointFixture, joint_fixture # noqa: F401


@pytest.mark.asyncio
async def test_basic_consume(joint_fixture: JointFixture): # noqa: F811
"""Verify that the consumer runs the dummy _send_email() and raises the error."""
await joint_fixture.kafka.publish_event(
payload={"key": "value"},
type_=joint_fixture.config.notification_event_type,
topic=joint_fixture.config.notification_event_topic,
)

event_subscriber = await joint_fixture.container.kafka_event_subscriber()
with pytest.raises(NotImplementedError):
await event_subscriber.run(forever=False)

0 comments on commit eeaccb6

Please sign in to comment.