From 79c5ddca830de0bdb5ef245138ca8a78e8825cc6 Mon Sep 17 00:00:00 2001 From: rob-parker-what <53180582+rob-parker-what@users.noreply.github.com> Date: Wed, 29 Mar 2023 08:57:39 +0100 Subject: [PATCH 1/6] bump and remove flexisettings --- event_consumer/__about__.py | 2 +- event_consumer/conf/__init__.py | 4 --- event_consumer/conf/defaults.py | 54 -------------------------------- event_consumer/conf/settings.py | 55 +++++++++++++++++++++++++++++++++ requirements-base.txt | 3 +- requirements-dev.txt | 1 - requirements-test.txt | 4 --- setup.py | 1 - test_app/factories.py | 2 +- tests/test_consumer_step.py | 23 ++++++++------ 10 files changed, 72 insertions(+), 77 deletions(-) delete mode 100644 event_consumer/conf/defaults.py create mode 100644 event_consumer/conf/settings.py diff --git a/event_consumer/__about__.py b/event_consumer/__about__.py index 87dad15..f304981 100644 --- a/event_consumer/__about__.py +++ b/event_consumer/__about__.py @@ -1,4 +1,4 @@ -__version__ = '1.2.1' +__version__ = '2.0.0' if __name__ == '__main__': diff --git a/event_consumer/conf/__init__.py b/event_consumer/conf/__init__.py index 605f456..e69de29 100644 --- a/event_consumer/conf/__init__.py +++ b/event_consumer/conf/__init__.py @@ -1,4 +0,0 @@ -from flexisettings import Settings - - -settings = Settings('EVENT_CONSUMER', 'event_consumer.conf.defaults') diff --git a/event_consumer/conf/defaults.py b/event_consumer/conf/defaults.py deleted file mode 100644 index 7b5f31a..0000000 --- a/event_consumer/conf/defaults.py +++ /dev/null @@ -1,54 +0,0 @@ -from datetime import timedelta -import os -from typing import Callable, Dict, Optional # noqa - - -# namespace for config keys loaded from e.g. Django conf or env vars -CONFIG_NAMESPACE = os.getenv('EVENT_CONSUMER_CONFIG_NAMESPACE', 'EVENT_CONSUMER') - -# optional import path to file containing namespaced config (e.g. 'django.conf.settings') -APP_CONFIG = os.getenv('EVENT_CONSUMER_APP_CONFIG', None) - - -# safety var to prevent accidentally enabling the handlers in `test_utils.handlers` -# set to True and then import the module to enable them -TEST_ENABLED = False - -# http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-task_serializer -SERIALIZER = 'json' -ACCEPT = [SERIALIZER] - -QUEUE_NAME_PREFIX = '' - -MAX_RETRIES = 4 # type: int - -# By default will use `AMQPRetryHandler.backoff`, otherwise supply your own. -# Should accept a single arg and return a delay time (seconds). -BACKOFF_FUNC = None # type: Optional[Callable[[int], float]] - -RETRY_HEADER = 'x-retry-count' - -# Set the consumer prefetch limit -PREFETCH_COUNT = 1 - -# to set TTL for archived message (milliseconds) -ARCHIVE_EXPIRY = int(timedelta(days=24).total_seconds() * 1000) # type: int -# max size of archive queue before dropping messages -ARCHIVE_MAX_LENGTH = 1000000 # type: int -ARCHIVE_QUEUE_ARGS = { - "x-message-ttl": ARCHIVE_EXPIRY, # Messages dropped after this - "x-max-length": ARCHIVE_MAX_LENGTH, # Maximum size of the queue - "x-queue-mode": "lazy", # Keep messages on disk (reqs. rabbitmq 3.6.0+) -} - - -USE_DJANGO = False - -EXCHANGES = {} # type: Dict[str, Dict[str, str]] -# EXCHANGES = { -# 'default': { # a reference name for this config, used when attaching handlers -# 'name': 'data', # actual name of exchange in RabbitMQ -# 'type': 'topic', # an AMQP exchange type -# }, -# ... -# } diff --git a/event_consumer/conf/settings.py b/event_consumer/conf/settings.py new file mode 100644 index 0000000..129da32 --- /dev/null +++ b/event_consumer/conf/settings.py @@ -0,0 +1,55 @@ +from datetime import timedelta +import os +from typing import Optional, Callable + +try: + from django.conf import settings +except ImportError: + settings = None + +CONFIG_NAMESPACE: str = "EVENT_CONSUMER" + +# safety var to prevent accidentally enabling the handlers in `test_utils.handlers` +# set to True and then import the module to enable them +TEST_ENABLED: bool = getattr(settings, f"{CONFIG_NAMESPACE}_TEST_ENABLED", False) + +# http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-task_serializer +SERIALIZER: str = getattr(settings, f"{CONFIG_NAMESPACE}_SERIALIZER", 'json') +ACCEPT = [SERIALIZER] + +QUEUE_NAME_PREFIX: str = getattr(settings, f"{CONFIG_NAMESPACE}_QUEUE_NAME_PREFIX", '') + +MAX_RETRIES: int = getattr(settings, f"{CONFIG_NAMESPACE}_MAX_RETRIES", 4) + +# By default will use `AMQPRetryHandler.backoff`, otherwise supply your own. +# Should accept a single arg and return a delay time (seconds). +BACKOFF_FUNC: Optional[Callable[[int], float]] = getattr(settings, f"{CONFIG_NAMESPACE}_BACKOFF_FUNC", None) + +RETRY_HEADER: str = getattr(settings, f"{CONFIG_NAMESPACE}_RETRY_HEADER", 'x-retry-count') + +# Set the consumer prefetch limit +PREFETCH_COUNT: int = getattr(settings, f"{CONFIG_NAMESPACE}_PREFETCH_COUNT", 1) + +# to set TTL for archived message (milliseconds) +twenty_four_days = int(timedelta(days=24).total_seconds() * 1000) +ARCHIVE_EXPIRY: int = getattr(settings, f"{CONFIG_NAMESPACE}_ARCHIVE_EXPIRY", twenty_four_days) +ARCHIVE_MAX_LENGTH: int = getattr(settings, f"{CONFIG_NAMESPACE}_ARCHIVE_MAX_LENGTH", 1000000) +ARCHIVE_QUEUE_ARGS = { + "x-message-ttl": ARCHIVE_EXPIRY, # Messages dropped after this + "x-max-length": ARCHIVE_MAX_LENGTH, # Maximum size of the queue + "x-queue-mode": "lazy", # Keep messages on disk (reqs. rabbitmq 3.6.0+) +} + + +USE_DJANGO: bool = getattr(settings, f"{CONFIG_NAMESPACE}_USE_DJANGO", False) + +EXCHANGES: dict[str, dict[str, str]] = getattr(settings, f"{CONFIG_NAMESPACE}_EXCHANGES", {}) +# EXCHANGES = { +# 'default': { # a reference name for this config, used when attaching handlers +# 'name': 'data', # actual name of exchange in RabbitMQ +# 'type': 'topic', # an AMQP exchange type +# }, +# ... +# } + +BROKER_URL = 'amqp://{0}:5672'.format(os.getenv('BROKER_HOST', 'localhost')) diff --git a/requirements-base.txt b/requirements-base.txt index 460bd4d..0d2e9d4 100644 --- a/requirements-base.txt +++ b/requirements-base.txt @@ -1,3 +1,2 @@ -git+https://github.com/depop/python-flexisettings.git@1.0.0#egg=flexisettings -typing>=3.6.2,<4.0; python_version < '3.6' +typing six \ No newline at end of file diff --git a/requirements-dev.txt b/requirements-dev.txt index deef4a8..9de50cf 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -2,5 +2,4 @@ ipython ipdb -robpol86-pytest-ipdb twine diff --git a/requirements-test.txt b/requirements-test.txt index b09e5f5..e14fded 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,12 +1,8 @@ -r requirements.txt -tox>=2.0 pytest mock flaky - -# note for Django 1.4 you will need specific versions of these -# (see tox.ini) pytest-django factory-boy django diff --git a/setup.py b/setup.py index 14f2bd7..da5389d 100644 --- a/setup.py +++ b/setup.py @@ -41,7 +41,6 @@ 'Programming Language :: Python :: 2.7', ], install_requires=[ - 'flexisettings>=1.0,<1.1', 'typing>=3.6.2,<4.0; python_version < "3.6"', 'six', ], diff --git a/test_app/factories.py b/test_app/factories.py index 5b1ad8c..1106c41 100644 --- a/test_app/factories.py +++ b/test_app/factories.py @@ -1,7 +1,7 @@ import factory -class UserFactory(factory.DjangoModelFactory): +class UserFactory(factory.django.DjangoModelFactory): class Meta: model = 'test_app.User' diff --git a/tests/test_consumer_step.py b/tests/test_consumer_step.py index c5697cd..56b97a6 100644 --- a/tests/test_consumer_step.py +++ b/tests/test_consumer_step.py @@ -1,13 +1,12 @@ import mock import pytest -from flexisettings.utils import override_settings - from event_consumer import message_handler from event_consumer import handlers as ec -from event_consumer.conf import settings from event_consumer.errors import InvalidQueueRegistration from event_consumer.types import QueueKey +from unittest.mock import patch + def test_get_handlers_with_defaults(): @@ -51,7 +50,6 @@ def f2(body): assert handler.consumer.callbacks[0].func is reg[key].handler -@override_settings(settings, QUEUE_NAME_PREFIX='myapp:') def test_get_handlers_queue_prefix(*mocks): """ Should build handlers from tasks decorated with `@message_handler` @@ -60,7 +58,10 @@ def test_get_handlers_queue_prefix(*mocks): with mock.patch.object(ec, 'REGISTRY', new=dict()) as reg: # named exchange is required if using QUEUE_NAME_PREFIX - with pytest.raises(InvalidQueueRegistration): + with ( + pytest.raises(InvalidQueueRegistration), + patch('event_consumer.handlers.settings.QUEUE_NAME_PREFIX', 'myapp:') + ): @message_handler('my.routing.key1') def bad(body): return None @@ -103,13 +104,15 @@ def f2(body): assert handler.consumer.callbacks[0].func is reg[key].handler -@override_settings(settings, EXCHANGES={'my.exchange1': {}, 'my.exchange2': {}}) def test_get_handlers_with_queue_and_exchange(*mocks): """ Should build handlers from tasks decorated with `@message_handler` using the specified routing key, queue and exchange """ - with mock.patch.object(ec, 'REGISTRY', new=dict()) as reg: + with ( + mock.patch.object(ec, 'REGISTRY', new=dict()) as reg, + patch('event_consumer.handlers.settings.EXCHANGES', {'my.exchange1': {}, 'my.exchange2': {}}) + ): # named exchange is required if using custom queue name with pytest.raises(InvalidQueueRegistration): @@ -250,7 +253,6 @@ def f2(body): assert handler.consumer.callbacks[0].func is reg[key].handler -@override_settings(settings, EXCHANGES={'my.exchange1': {}, 'my.exchange2': {}}) def test_get_handlers_with_multiple_routes(*mocks): """ Can connect the handler to multiple routing keys, each having a queue. @@ -258,7 +260,10 @@ def test_get_handlers_with_multiple_routes(*mocks): with mock.patch.object(ec, 'REGISTRY', new=dict()) as reg: # custom queue name is not possible with multiple routes, even with named exchange - with pytest.raises(InvalidQueueRegistration): + with ( + pytest.raises(InvalidQueueRegistration), + patch('event_consumer.handlers.settings.EXCHANGES', {'my.exchange1': {}, 'my.exchange2': {}}) + ): @message_handler(['my.routing.key1', 'my.routing.key2'], 'my.queue1', 'my.exchange1') def bad(body): return None From 4338958b49a4829baa21741eb88a90d2a1ef7c82 Mon Sep 17 00:00:00 2001 From: rob-parker-what <53180582+rob-parker-what@users.noreply.github.com> Date: Wed, 29 Mar 2023 09:31:08 +0100 Subject: [PATCH 2/6] remove circleci --- .circleci/config.yml | 123 ------------------------------------------- README.rst | 26 +-------- 2 files changed, 2 insertions(+), 147 deletions(-) delete mode 100644 .circleci/config.yml diff --git a/.circleci/config.yml b/.circleci/config.yml deleted file mode 100644 index 229a66d..0000000 --- a/.circleci/config.yml +++ /dev/null @@ -1,123 +0,0 @@ -version: 2.0 - -jobs: - "python-2.7": - docker: - - image: circleci/python:2.7 - environment: - - EVENT_CONSUMER_APP_CONFIG: test_app.settings - - BROKER_HOST: localhost - - image: circleci/rabbitmq:3.6.6 - steps: - - checkout - - restore_cache: - key: py27-{{ .Branch }}-{{ checksum "requirements-base.txt" }}-{{ checksum "tox.ini" }} - - run: - command: | - virtualenv venv - . venv/bin/activate - pip install tox - - save_cache: - key: py27-{{ .Branch }}-{{ checksum "requirements-base.txt" }}-{{ checksum "tox.ini" }} - paths: - - "venv" - - run: - command: | - . venv/bin/activate - tox -e py27-dj111-cel3,py27-dj111-cel4 - - store_artifacts: - path: /tmp/results - destination: python-2.7 - - "python-3.6": - docker: - - image: circleci/python:3.6 - environment: - - EVENT_CONSUMER_APP_CONFIG: test_app.settings - - BROKER_HOST: localhost - - image: circleci/rabbitmq:3.6.6 - steps: - - checkout - - restore_cache: - key: py36-{{ .Branch }}-{{ checksum "requirements-base.txt" }}-{{ checksum "tox.ini" }} - - run: - command: | - python -m venv venv - . venv/bin/activate - pip install tox - - save_cache: - key: py36-{{ .Branch }}-{{ checksum "requirements-base.txt" }}-{{ checksum "tox.ini" }} - paths: - - "venv" - - run: - command: | - . venv/bin/activate - tox -e py36-dj111-cel3,py36-dj111-cel4,py36-dj22-cel3,py36-dj22-cel4 - - store_artifacts: - path: /tmp/results - destination: python-3.6 - - "python-3.7": - docker: - - image: circleci/python:3.7 - environment: - - EVENT_CONSUMER_APP_CONFIG: test_app.settings - - BROKER_HOST: localhost - - image: circleci/rabbitmq:3.6.6 - steps: - - checkout - - restore_cache: - key: py37-{{ .Branch }}-{{ checksum "requirements-base.txt" }}-{{ checksum "tox.ini" }} - - run: - command: | - python -m venv venv - . venv/bin/activate - pip install tox - - save_cache: - key: py37-{{ .Branch }}-{{ checksum "requirements-base.txt" }}-{{ checksum "tox.ini" }} - paths: - - "venv" - - run: - command: | - . venv/bin/activate - tox -e py37-dj22-cel3,py37-dj22-cel4 - - store_artifacts: - path: /tmp/results - destination: python-3.7 - - "python-3.8": - docker: - - image: circleci/python:3.8 - environment: - - EVENT_CONSUMER_APP_CONFIG: test_app.settings - - BROKER_HOST: localhost - - image: circleci/rabbitmq:3.6.6 - steps: - - checkout - - restore_cache: - key: py38-{{ .Branch }}-{{ checksum "requirements-base.txt" }}-{{ checksum "tox.ini" }} - - run: - command: | - python -m venv venv - . venv/bin/activate - pip install tox - - save_cache: - key: py38-{{ .Branch }}-{{ checksum "requirements-base.txt" }}-{{ checksum "tox.ini" }} - paths: - - "venv" - - run: - command: | - . venv/bin/activate - tox -e py38-dj22-cel3,py38-dj22-cel4 - - store_artifacts: - path: /tmp/results - destination: python-3.8 - -workflows: - version: 2 - build: - jobs: - - "python-2.7" - - "python-3.6" - - "python-3.7" - - "python-3.8" diff --git a/README.rst b/README.rst index 43121ff..9b87e9f 100644 --- a/README.rst +++ b/README.rst @@ -1,15 +1,12 @@ celery-message-consumer ======================= -|PyPI Version| |Build Status| +|PyPI Version| .. |PyPI Version| image:: http://img.shields.io/pypi/v/celery-message-consumer.svg?style=flat :target: https://pypi.python.org/pypi/celery-message-consumer/ :alt: Latest PyPI version -.. |Build Status| image:: https://circleci.com/gh/depop/celery-message-consumer.svg?style=shield&circle-token=a9ea2909c5cbc4cb32a87f50444ca79b99e3b09c - :alt: Build Status - Tool for using the ``bin/celery`` worker to consume vanilla AMQP messages (i.e. not Celery tasks) @@ -212,24 +209,6 @@ Python 3.8 * * * Running the tests ----------------- -CircleCI -~~~~~~~~ - -| The easiest way to test the full version matrix is to install the - CircleCI command line app: -| https://circleci.com/docs/2.0/local-jobs/ -| (requires Docker) - -The cli does not support 'workflows' at the moment so you have to run -the two Python version jobs separately: - -.. code:: bash - - circleci build --job python-2.7 - -.. code:: bash - - circleci build --job python-3.6 py.test (single combination of dependency versions) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -286,8 +265,7 @@ Now we can run the tests: tox (all version combinations for current Python) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -You'll notice in the CircleCI config we run tests against the matrix -dependency versions using ``tox``. +We can run tests against the matrix dependency versions using ``tox``. There are `some warts `__ around using ``tox`` with ``pyenv-virtualenv`` so if you created a Python 3.6 From 0fd3571ed9e30492bdaeb51d5a34a9d3592aca69 Mon Sep 17 00:00:00 2001 From: rob-parker-what <53180582+rob-parker-what@users.noreply.github.com> Date: Wed, 29 Mar 2023 15:48:31 +0100 Subject: [PATCH 3/6] tests patched --- tests/test_consume_handler.py | 38 ++++++++++++++--- tests/test_consumer_step.py | 79 +++++++++++++++++++++-------------- 2 files changed, 80 insertions(+), 37 deletions(-) diff --git a/tests/test_consume_handler.py b/tests/test_consume_handler.py index dd1925a..4c985e5 100644 --- a/tests/test_consume_handler.py +++ b/tests/test_consume_handler.py @@ -7,6 +7,7 @@ from event_consumer import handlers as ec from .base import BaseConsumerIntegrationTest +from unittest.mock import patch class ConsumeMessageHandlerTest(BaseConsumerIntegrationTest): @@ -65,12 +66,23 @@ def test_consume_custom_queue_name(self): Test that we can connect multiple routing keys on the same queue and the appropriate handler will be called in each case. """ - with mock.patch.object(ec, 'REGISTRY', new=dict()) as reg: + + + with ( + mock.patch.object(ec, 'REGISTRY', new=dict()) as reg, + patch('event_consumer.handlers.settings.EXCHANGES', { + 'custom': { + 'name': 'custom', + 'type': 'topic', + } + } + ), + ): # we have to use a named exchange to be able to bind a custom queue name f1 = message_handler('my.routing.key1', queue='custom_queue', exchange='custom')( mock.MagicMock(__name__='mock_handler1') ) - + assert len(reg) == 1 self.configure_handlers() @@ -104,7 +116,15 @@ def test_consume_wildcard_route(self): Test that we can connect multiple routing keys on the same queue and the appropriate handler will be called in each case. """ - with mock.patch.object(ec, 'REGISTRY', new=dict()) as reg: + with (mock.patch.object(ec, 'REGISTRY', new=dict()) as reg, + patch('event_consumer.handlers.settings.EXCHANGES', { + 'custom': { + 'name': 'custom', + 'type': 'topic', + } + } + ) + ): f1 = message_handler('my.routing.*', exchange='custom')( mock.MagicMock(__name__='mock_handler1') @@ -147,8 +167,16 @@ def test_consume_multiple_routes(self): Test that we can connect multiple routing keys on the same queue and the appropriate handler will be called in each case. """ - with mock.patch.object(ec, 'REGISTRY', new=dict()) as reg: - + with ( + mock.patch.object(ec, 'REGISTRY', new=dict()) as reg, + patch('event_consumer.handlers.settings.EXCHANGES', { + 'custom': { + 'name': 'custom', + 'type': 'topic', + } + } + ) + ): decorator = message_handler( ['my.routing.key1', 'my.routing.key2'], exchange='custom', diff --git a/tests/test_consumer_step.py b/tests/test_consumer_step.py index 56b97a6..2d784da 100644 --- a/tests/test_consumer_step.py +++ b/tests/test_consumer_step.py @@ -60,48 +60,55 @@ def test_get_handlers_queue_prefix(*mocks): # named exchange is required if using QUEUE_NAME_PREFIX with ( pytest.raises(InvalidQueueRegistration), - patch('event_consumer.handlers.settings.QUEUE_NAME_PREFIX', 'myapp:') + patch('event_consumer.handlers.settings.QUEUE_NAME_PREFIX', 'myapp:'), + patch('event_consumer.handlers.settings.EXCHANGES', { + 'custom': { + 'name': 'custom', + 'type': 'topic', + } + } + ) ): @message_handler('my.routing.key1') def bad(body): return None - @message_handler('my.routing.key1', exchange='custom') - def f1(body): - return None + @message_handler('my.routing.key1', exchange='custom') + def f1(body): + return None - @message_handler('my.routing.key2', exchange='custom') - def f2(body): - return None + @message_handler('my.routing.key2', exchange='custom') + def f2(body): + return None - assert len(reg) == 2 + assert len(reg) == 2 - handler_reg1 = reg[ - QueueKey(queue='myapp:my.routing.key1', exchange='custom') - ] - assert handler_reg1.handler is f1 - assert handler_reg1.routing_key == 'my.routing.key1' - assert handler_reg1.queue_arguments == {} + handler_reg1 = reg[ + QueueKey(queue='myapp:my.routing.key1', exchange='custom') + ] + assert handler_reg1.handler is f1 + assert handler_reg1.routing_key == 'my.routing.key1' + assert handler_reg1.queue_arguments == {} - handler_reg2 = reg[ - QueueKey(queue='myapp:my.routing.key2', exchange='custom') - ] - assert handler_reg2.handler is f2 - assert handler_reg2.routing_key == 'my.routing.key2' - assert handler_reg2.queue_arguments == {} + handler_reg2 = reg[ + QueueKey(queue='myapp:my.routing.key2', exchange='custom') + ] + assert handler_reg2.handler is f2 + assert handler_reg2.routing_key == 'my.routing.key2' + assert handler_reg2.queue_arguments == {} - step = ec.AMQPRetryConsumerStep(None) - handlers = step.get_handlers(channel=mock.MagicMock()) + step = ec.AMQPRetryConsumerStep(None) + handlers = step.get_handlers(channel=mock.MagicMock()) - assert len(handlers) == len(reg) + assert len(handlers) == len(reg) - for handler in handlers: - assert isinstance(handler, ec.AMQPRetryHandler) - assert len(handler.consumer.queues) == 1 - assert len(handler.consumer.callbacks) == 1 - assert isinstance(handler.consumer.callbacks[0], ec.AMQPRetryHandler) - key = QueueKey(queue=handler.queue, exchange=handler.exchange) - assert handler.consumer.callbacks[0].func is reg[key].handler + for handler in handlers: + assert isinstance(handler, ec.AMQPRetryHandler) + assert len(handler.consumer.queues) == 1 + assert len(handler.consumer.callbacks) == 1 + assert isinstance(handler.consumer.callbacks[0], ec.AMQPRetryHandler) + key = QueueKey(queue=handler.queue, exchange=handler.exchange) + assert handler.consumer.callbacks[0].func is reg[key].handler def test_get_handlers_with_queue_and_exchange(*mocks): @@ -219,8 +226,16 @@ def test_get_handlers_same_queue_name_and_exchange(): """ Attempt to attach handler with same queue name + exchange should fail. """ - with mock.patch.object(ec, 'REGISTRY', new=dict()) as reg: - + with ( + mock.patch.object(ec, 'REGISTRY', new=dict()) as reg, + patch('event_consumer.handlers.settings.EXCHANGES', { + 'custom': { + 'name': 'custom', + 'type': 'topic', + } + } + ) + ): @message_handler('my.routing.key1', queue='custom_queue', exchange='custom') def f1(body): return None From 8197068f68922f6f040bfdd485eddb70935618c1 Mon Sep 17 00:00:00 2001 From: rob-parker-what <53180582+rob-parker-what@users.noreply.github.com> Date: Wed, 29 Mar 2023 16:14:26 +0100 Subject: [PATCH 4/6] tidy up, pin for 3.11 and remove tox --- Makefile | 2 +- README.rst | 70 ++--------------------------------- requirements-base.txt | 3 +- requirements-test.txt | 12 +++--- requirements.txt | 4 +- setup.py | 4 +- tests/test_consume_handler.py | 3 +- tox.ini | 33 ----------------- 8 files changed, 16 insertions(+), 115 deletions(-) delete mode 100644 tox.ini diff --git a/Makefile b/Makefile index 8a0024b..c3b51c1 100644 --- a/Makefile +++ b/Makefile @@ -11,4 +11,4 @@ tag: git push --tags test: - PYTHONPATH=. py.test -v -s --pdb tests/ + PYTHONPATH=. py.test -v -s tests/ diff --git a/README.rst b/README.rst index 9b87e9f..81bab38 100644 --- a/README.rst +++ b/README.rst @@ -183,8 +183,6 @@ Briefly: for each routing key it listens to, the consumer sets up Compatibility ------------- -Python 2.7 and 3.6-3.8 are both supported. - **Only** RabbitMQ transport is supported. We depend on Celery and Kombu. Their versioning seems to be loosely in @@ -195,16 +193,6 @@ Django is not required, but when used we have some extra integration which is needed if your event handlers use the Django db connection. This must be enabled if required via the ``settings.USE_DJANGO`` flag. -This project is tested against: - -=========== ============ ============= ================== ================== - x Django 1.11 Django 2.2 Celery/Kombu 3.x Celery/Kombu 4.x -=========== ============ ============= ================== ================== -Python 2.7 * * * -Python 3.6 * * * * -Python 3.7 * * * -Python 3.8 * * * -=========== ============ ============= ================== ================== Running the tests ----------------- @@ -223,7 +211,7 @@ provided. .. code:: bash docker-compose up -d - export BROKER_HOST=$(docker-machine ip default) + export BROKER_HOST=0.0.0.0 (adjust the last line to suit your local Docker installation) @@ -233,19 +221,12 @@ The ``rabbitmqadmin`` web UI is available to aid in debugging queue issues: http://{BROKER_HOST}:15672/ -Now decide which version combination from the matrix you're going to -test and set up your virtualenv accordingly: - -.. code:: bash - pyenv virtualenv 3.6.2 celery-message-consumer - -You will need to edit ``requirements.txt`` and ``requirements-test.txt`` -for the specific versions of dependencies you want to test against. Then +You will need to create a virtualenv then you can install everything via: .. code:: bash - + pyenv virtualenv 3.11.1 celery-message-consumer pip install -r requirements-test.txt Set an env to point to the target Django version's settings in the test @@ -261,48 +242,3 @@ Now we can run the tests: .. code:: bash PYTHONPATH=. py.test -v -s --pdb tests/ - -tox (all version combinations for current Python) -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -We can run tests against the matrix dependency versions using ``tox``. - -There are `some warts `__ -around using ``tox`` with ``pyenv-virtualenv`` so if you created a Python 3.6 -virtualenv using the instructions above the best thing to do is delete it and -recreate it like this: - -.. code:: bash - - pyenv virtualenv -p python3.6 myenv - pip install tox - -(it's actually easier not to use a virtualenv at all - tox creates its -own virtualenvs anyway, but that does mean you'd have to install tox -globally) - -You need the Docker container running: - -.. code:: bash - - docker-compose up -d - export BROKER_HOST=$(docker-machine ip default) - -You can now run tests for any versions compatible with your virtualenv -python version, e.g. - -.. code:: bash - - tox -e py36-dj111-cel4 - -To run the full version matrix you need to have both Python 2.7 and 3.6. The -easiest way is via ``pyenv``. You will also need to make both Python versions -'global' (or 'local') via pyenv, and then install and run ``tox`` outside of -any virtualenv. - -.. code:: bash - - source deactivate - pyenv global 2.7.14 3.6.2 - pip install tox - tox diff --git a/requirements-base.txt b/requirements-base.txt index 0d2e9d4..e0a1dba 100644 --- a/requirements-base.txt +++ b/requirements-base.txt @@ -1,2 +1 @@ -typing -six \ No newline at end of file +six==1.16.0 \ No newline at end of file diff --git a/requirements-test.txt b/requirements-test.txt index e14fded..c9d790c 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,8 +1,8 @@ -r requirements.txt -pytest -mock -flaky -pytest-django -factory-boy -django +pytest==7.2.2 +mock==5.0.1 +flaky==3.7.0 +pytest-django==4.5.2 +factory-boy==3.2.1 +Django==4.1.7 diff --git a/requirements.txt b/requirements.txt index 47f3ab1..86410c6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -r requirements-base.txt -celery -kombu \ No newline at end of file +celery==5.2.7 +kombu==5.2.4 \ No newline at end of file diff --git a/setup.py b/setup.py index da5389d..63a0909 100644 --- a/setup.py +++ b/setup.py @@ -38,11 +38,9 @@ 'Operating System :: OS Independent', 'Programming Language :: Python', 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 2.7', ], install_requires=[ - 'typing>=3.6.2,<4.0; python_version < "3.6"', - 'six', + 'six==1.16.0', ], packages=[ diff --git a/tests/test_consume_handler.py b/tests/test_consume_handler.py index 4c985e5..7824f1d 100644 --- a/tests/test_consume_handler.py +++ b/tests/test_consume_handler.py @@ -116,7 +116,8 @@ def test_consume_wildcard_route(self): Test that we can connect multiple routing keys on the same queue and the appropriate handler will be called in each case. """ - with (mock.patch.object(ec, 'REGISTRY', new=dict()) as reg, + with ( + mock.patch.object(ec, 'REGISTRY', new=dict()) as reg, patch('event_consumer.handlers.settings.EXCHANGES', { 'custom': { 'name': 'custom', diff --git a/tox.ini b/tox.ini deleted file mode 100644 index ffab2f4..0000000 --- a/tox.ini +++ /dev/null @@ -1,33 +0,0 @@ -# Tox (http://tox.testrun.org/) is a tool for running tests -# in multiple virtualenvs. This configuration file will run the -# test suite on all supported python versions. To use it, "pip install tox" -# and then run "tox" from this directory. - -[tox] -envlist = py{27,36}-dj{111}-cel{3,4}, py{37,38}-dj{111,22}-cel{3,4} - -[testenv] -setenv = - py{27,36,37,38}-dj{111,22}: PYTHONPATH={toxinidir} - py{27,36}-dj111: DJANGO_SETTINGS_MODULE=test_app.dj111.settings - py{36,37,38}-dj22: DJANGO_SETTINGS_MODULE=test_app.dj22.settings - EVENT_CONSUMER_APP_CONFIG=test_app.settings -passenv = BROKER_HOST -deps = - -rrequirements-base.txt - pytest>=3.2,<3.3 - mock - flaky - py{36,37,38}: mypy - pytest-django>=3.1,<3.2 - factory-boy>=2.9,<2.10 - dj111: django==1.11 - dj22: django==2.2 - cel3: celery>=3.1,<4.0 - cel3: kombu>=3.0,<4.0 - cel4: celery>=4.0,<5.0 - cel4: kombu>=4.0,<5.0 -commands = - /bin/mkdir -p /tmp/results - py{36,37,38}: mypy --py2 --ignore-missing-imports --junit-xml=/tmp/results/mypy-{envname}.xml event_consumer - py{27,36,37,38}: py.test -v -s --fulltrace --junitxml=/tmp/results/pytest-{envname}.xml tests/ From 27befbfa6f29b4989d17dbcd1a549efc05c72bcd Mon Sep 17 00:00:00 2001 From: rob-parker-what <53180582+rob-parker-what@users.noreply.github.com> Date: Wed, 29 Mar 2023 16:20:11 +0100 Subject: [PATCH 5/6] lint --- event_consumer/conf/settings.py | 20 +++++-- event_consumer/handlers.py | 81 ++++++++++++++------------- event_consumer/test_utils/handlers.py | 9 +-- event_consumer/types.py | 4 +- setup.py | 6 -- test_app/dj111/settings.py | 3 +- test_app/dj111/urls.py | 3 +- test_app/dj22/settings.py | 3 +- test_app/dj22/urls.py | 3 +- test_app/factories.py | 1 - test_app/settings.py | 4 +- tests/base.py | 12 ++-- tests/test_consume_handler.py | 56 +++++++++--------- tests/test_consumer_step.py | 72 +++++++++++------------- tests/test_retry_handler.py | 8 +-- tests/test_retry_handler_django.py | 17 ++++-- 16 files changed, 157 insertions(+), 145 deletions(-) diff --git a/event_consumer/conf/settings.py b/event_consumer/conf/settings.py index 129da32..81a302b 100644 --- a/event_consumer/conf/settings.py +++ b/event_consumer/conf/settings.py @@ -23,17 +23,25 @@ # By default will use `AMQPRetryHandler.backoff`, otherwise supply your own. # Should accept a single arg and return a delay time (seconds). -BACKOFF_FUNC: Optional[Callable[[int], float]] = getattr(settings, f"{CONFIG_NAMESPACE}_BACKOFF_FUNC", None) +BACKOFF_FUNC: Optional[Callable[[int], float]] = getattr( + settings, f"{CONFIG_NAMESPACE}_BACKOFF_FUNC", None +) -RETRY_HEADER: str = getattr(settings, f"{CONFIG_NAMESPACE}_RETRY_HEADER", 'x-retry-count') +RETRY_HEADER: str = getattr( + settings, f"{CONFIG_NAMESPACE}_RETRY_HEADER", 'x-retry-count' +) # Set the consumer prefetch limit PREFETCH_COUNT: int = getattr(settings, f"{CONFIG_NAMESPACE}_PREFETCH_COUNT", 1) # to set TTL for archived message (milliseconds) twenty_four_days = int(timedelta(days=24).total_seconds() * 1000) -ARCHIVE_EXPIRY: int = getattr(settings, f"{CONFIG_NAMESPACE}_ARCHIVE_EXPIRY", twenty_four_days) -ARCHIVE_MAX_LENGTH: int = getattr(settings, f"{CONFIG_NAMESPACE}_ARCHIVE_MAX_LENGTH", 1000000) +ARCHIVE_EXPIRY: int = getattr( + settings, f"{CONFIG_NAMESPACE}_ARCHIVE_EXPIRY", twenty_four_days +) +ARCHIVE_MAX_LENGTH: int = getattr( + settings, f"{CONFIG_NAMESPACE}_ARCHIVE_MAX_LENGTH", 1000000 +) ARCHIVE_QUEUE_ARGS = { "x-message-ttl": ARCHIVE_EXPIRY, # Messages dropped after this "x-max-length": ARCHIVE_MAX_LENGTH, # Maximum size of the queue @@ -43,7 +51,9 @@ USE_DJANGO: bool = getattr(settings, f"{CONFIG_NAMESPACE}_USE_DJANGO", False) -EXCHANGES: dict[str, dict[str, str]] = getattr(settings, f"{CONFIG_NAMESPACE}_EXCHANGES", {}) +EXCHANGES: dict[str, dict[str, str]] = getattr( + settings, f"{CONFIG_NAMESPACE}_EXCHANGES", {} +) # EXCHANGES = { # 'default': { # a reference name for this config, used when attaching handlers # 'name': 'data', # actual name of exchange in RabbitMQ diff --git a/event_consumer/handlers.py b/event_consumer/handlers.py index 5a2ca0e..e0caba3 100644 --- a/event_consumer/handlers.py +++ b/event_consumer/handlers.py @@ -51,11 +51,12 @@ def _validate_registration(register_key): # type: (QueueKey) -> None ) -def message_handler(routing_keys, # type: Union[str, Iterable] - queue=None, # type: Optional[str] - exchange=DEFAULT_EXCHANGE, # type: str - queue_arguments=None, # Optional[Dict[str, object]] - ): +def message_handler( + routing_keys, # type: Union[str, Iterable] + queue=None, # type: Optional[str] + exchange=DEFAULT_EXCHANGE, # type: str + queue_arguments=None, # Optional[Dict[str, object]] +): # type: (...) -> Callable[[Callable], Any] """ Register a function as a handler for messages on a rabbitmq exchange with @@ -96,8 +97,9 @@ def process_message(body): In order for the event handler to be registered, its containing module must be imported before starting the AMQPRetryConsumerStep. """ - if (queue or (queue is None and settings.QUEUE_NAME_PREFIX)) \ - and exchange not in settings.EXCHANGES: + if ( + queue or (queue is None and settings.QUEUE_NAME_PREFIX) + ) and exchange not in settings.EXCHANGES: raise InvalidQueueRegistration( "You must use a named exchange from settings.EXCHANGES " "if you want to bind a custom queue name." @@ -119,7 +121,9 @@ def decorator(f): # type: (Callable) -> Callable global REGISTRY for routing_key in routing_keys: - queue_name = (settings.QUEUE_NAME_PREFIX + routing_key) if queue is None else queue + queue_name = ( + (settings.QUEUE_NAME_PREFIX + routing_key) if queue is None else queue + ) # kombu.Consumer has no concept of routing-key (only queue name) so # so handler registrations must be unique on queue+exchange (otherwise @@ -138,7 +142,7 @@ def decorator(f): # type: (Callable) -> Callable 'registered: %s to handler: %s.%s', register_key, f.__module__, - f.__name__ + f.__name__, ) return f @@ -157,11 +161,13 @@ class AMQPRetryConsumerStep(bootsteps.StartStopStep): See http://docs.celeryproject.org/en/latest/userguide/extending.html """ - requires = ('celery.worker.consumer:Connection', ) + requires = ('celery.worker.consumer:Connection',) def __init__(self, *args, **kwargs): self.handlers = [] # type: List[AMQPRetryHandler] - self._tasks = kwargs.pop('tasks', REGISTRY) # type: Dict[QueueKey, HandlerRegistration] + self._tasks = kwargs.pop( + 'tasks', REGISTRY + ) # type: Dict[QueueKey, HandlerRegistration] super(AMQPRetryConsumerStep, self).__init__(*args, **kwargs) def start(self, c): @@ -214,15 +220,16 @@ class AMQPRetryHandler(object): backoff on retries. """ - def __init__(self, - channel, # type: amqp.channel.Channel - routing_key, # type: str - queue, # type: str - exchange, # type: str - queue_arguments, # type: Dict[str, str] - func, # type: Callable[[Any], Any] - backoff_func=None # type: Optional[Callable[[int], float]] - ): + def __init__( + self, + channel, # type: amqp.channel.Channel + routing_key, # type: str + queue, # type: str + exchange, # type: str + queue_arguments, # type: Dict[str, str] + func, # type: Callable[[Any], Any] + backoff_func=None, # type: Optional[Callable[[int], float]] + ): # type: (...) -> None self.channel = channel self.routing_key = routing_key @@ -231,14 +238,11 @@ def __init__(self, self.func = func self.backoff_func = backoff_func or self.backoff - self.exchanges = { - DEFAULT_EXCHANGE: kombu.Exchange(channel=self.channel) - } + self.exchanges = {DEFAULT_EXCHANGE: kombu.Exchange(channel=self.channel)} for name, exchange_settings in settings.EXCHANGES.items(): self.exchanges[name] = kombu.Exchange( - channel=self.channel, - **exchange_settings + channel=self.channel, **exchange_settings ) try: @@ -274,8 +278,7 @@ def __init__(self, raise NoExchange( "The exchange {exchange} was not found in settings.EXCHANGES.\n" "settings.EXCHANGES = {exchanges}".format( - exchange=key_exc, - exchanges=settings.EXCHANGES + exchange=key_exc, exchanges=settings.EXCHANGES ) ) @@ -310,10 +313,10 @@ def __repr__(self): "exchange={exchange}, " "func={func.__module__}.{func.__name__}" ")".format( - routing_key=self.routing_key, - queue=self.queue, - exchange=self.exchange, - func=self.func, + routing_key=self.routing_key, + queue=self.queue, + exchange=self.exchange, + func=self.func, ) ) @@ -354,7 +357,7 @@ def __call__(self, body, message): cls=e.__class__.__name__, error=e, traceback=traceback.format_exc(), - ) + ), ) elif retry_count >= settings.MAX_RETRIES: self.archive( @@ -368,7 +371,7 @@ def __call__(self, body, message): cls=e.__class__.__name__, error=e, traceback=traceback.format_exc(), - ) + ), ) else: self.retry( @@ -382,12 +385,14 @@ def __call__(self, body, message): cls=e.__class__.__name__, error=e, traceback=traceback.format_exc(), - ) + ), ) else: message.ack() _logger.debug( - "Task '{routing_key}' processed and ack() sent".format(routing_key=self.routing_key) + "Task '{routing_key}' processed and ack() sent".format( + routing_key=self.routing_key + ) ) finally: @@ -412,15 +417,13 @@ def retry(self, body, message, reason=''): try: retry_count = self.retry_count(message) headers = message.headers.copy() - headers.update({ - settings.RETRY_HEADER: retry_count + 1 - }) + headers.update({settings.RETRY_HEADER: retry_count + 1}) self.retry_producer.publish( body, headers=headers, retry=True, declares=[self.retry_queue], - expiration=self.backoff_func(retry_count) + expiration=self.backoff_func(retry_count), ) except Exception as e: message.requeue() diff --git a/event_consumer/test_utils/handlers.py b/event_consumer/test_utils/handlers.py index 31e2d53..301d1cf 100644 --- a/event_consumer/test_utils/handlers.py +++ b/event_consumer/test_utils/handlers.py @@ -43,9 +43,10 @@ def py_integration_raise_permanent(body): if settings.TEST_ENABLED: # Add tasks for interactive testing (call decorators directly) - message_handler('py.integration.ok')( - IntegrationTestHandlers.py_integration_ok) + message_handler('py.integration.ok')(IntegrationTestHandlers.py_integration_ok) message_handler('py.integration.raise')( - IntegrationTestHandlers.py_integration_raise) + IntegrationTestHandlers.py_integration_raise + ) message_handler('py.integration.raise.permanent')( - IntegrationTestHandlers.py_integration_raise_permanent) + IntegrationTestHandlers.py_integration_raise_permanent + ) diff --git a/event_consumer/types.py b/event_consumer/types.py index fb18ed6..e751aa4 100644 --- a/event_consumer/types.py +++ b/event_consumer/types.py @@ -7,7 +7,7 @@ [ ('queue', str), ('exchange', str), - ] + ], ) @@ -17,5 +17,5 @@ ('routing_key', str), ('queue_arguments', Dict[str, str]), ('handler', Callable), - ] + ], ) diff --git a/setup.py b/setup.py index 63a0909..6cc4c98 100644 --- a/setup.py +++ b/setup.py @@ -16,20 +16,15 @@ setup( name='celery-message-consumer', - # Versions should comply with PEP440. For a discussion on single-sourcing # the version across setup.py and the project code, see # https://packaging.python.org/en/latest/single_source_version.html version=about['__version__'], - description='Tool for using the bin/celery worker to consume vanilla AMQP messages (i.e. not Celery tasks)', long_description=long_description, - url='https://github.com/depop/celery-message-consumer', - author='Depop', author_email='dev@depop.com', - license='Apache 2.0', classifiers=[ 'Environment :: Web Environment', @@ -42,7 +37,6 @@ install_requires=[ 'six==1.16.0', ], - packages=[ 'event_consumer', 'event_consumer.conf', diff --git a/test_app/dj111/settings.py b/test_app/dj111/settings.py index 77d5dac..e5df9eb 100644 --- a/test_app/dj111/settings.py +++ b/test_app/dj111/settings.py @@ -10,6 +10,7 @@ # Build paths inside the project like this: os.path.join(BASE_DIR, ...) import os + BASE_DIR = os.path.dirname(__file__) MANAGE_PY_PATH = os.path.join(BASE_DIR, 'manage.py') @@ -62,7 +63,7 @@ 'NAME': os.path.join(BASE_DIR, 'db.sqlite3'), 'TEST': { 'NAME': os.path.join(BASE_DIR, 'test_db.sqlite3'), - } + }, } } diff --git a/test_app/dj111/urls.py b/test_app/dj111/urls.py index 7334adb..5efd04b 100644 --- a/test_app/dj111/urls.py +++ b/test_app/dj111/urls.py @@ -1,6 +1,7 @@ from django.conf.urls import patterns, include, url -urlpatterns = patterns('', +urlpatterns = patterns( + '', # Examples: # url(r'^$', 'dj17_testproject.views.home', name='home'), # url(r'^blog/', include('blog.urls')), diff --git a/test_app/dj22/settings.py b/test_app/dj22/settings.py index a35dd37..cb42ff8 100644 --- a/test_app/dj22/settings.py +++ b/test_app/dj22/settings.py @@ -10,6 +10,7 @@ # Build paths inside the project like this: os.path.join(BASE_DIR, ...) import os + BASE_DIR = os.path.dirname(__file__) MANAGE_PY_PATH = os.path.join(BASE_DIR, 'manage.py') @@ -62,7 +63,7 @@ 'NAME': os.path.join(BASE_DIR, 'db.sqlite3'), 'TEST': { 'NAME': os.path.join(BASE_DIR, 'test_db.sqlite3'), - } + }, } } diff --git a/test_app/dj22/urls.py b/test_app/dj22/urls.py index 7334adb..5efd04b 100644 --- a/test_app/dj22/urls.py +++ b/test_app/dj22/urls.py @@ -1,6 +1,7 @@ from django.conf.urls import patterns, include, url -urlpatterns = patterns('', +urlpatterns = patterns( + '', # Examples: # url(r'^$', 'dj17_testproject.views.home', name='home'), # url(r'^blog/', include('blog.urls')), diff --git a/test_app/factories.py b/test_app/factories.py index 1106c41..2e7bbd8 100644 --- a/test_app/factories.py +++ b/test_app/factories.py @@ -2,7 +2,6 @@ class UserFactory(factory.django.DjangoModelFactory): - class Meta: model = 'test_app.User' diff --git a/test_app/settings.py b/test_app/settings.py index 563d11f..65130a5 100644 --- a/test_app/settings.py +++ b/test_app/settings.py @@ -3,7 +3,9 @@ EVENT_CONSUMER_BACKOFF_FUNC = lambda count: 0.5 # noqa -EVENT_CONSUMER_BROKER_URL = 'amqp://{0}:5672'.format(os.getenv('BROKER_HOST', 'localhost')) +EVENT_CONSUMER_BROKER_URL = 'amqp://{0}:5672'.format( + os.getenv('BROKER_HOST', 'localhost') +) EVENT_CONSUMER_EXCHANGES = { 'custom': { diff --git a/tests/base.py b/tests/base.py index 2b2a4d2..79d8262 100644 --- a/tests/base.py +++ b/tests/base.py @@ -15,7 +15,6 @@ def random_body(): class BaseRetryHandlerIntegrationTest(unittest.TestCase): - # Must not collide with real queue names! routing_key = 'RetryHandlerIntegrationTest' exchange = 'default' # see settings.EXCHANGES @@ -58,7 +57,7 @@ def setUp(self): self.archive_consumer = kombu.Consumer( channel=self.channel, queues=[self.handler.archive_queue], - callbacks=[self.handle_archive] + callbacks=[self.handle_archive], ) for consumer in [self.handler.consumer, self.archive_consumer]: @@ -68,7 +67,7 @@ def setUp(self): self.channel, exchange=self.handler.exchanges[self.handler.exchange], routing_key=self.routing_key, - serializer='json' + serializer='json', ) self.archives = [] @@ -101,7 +100,6 @@ def handle_archive(self, body, message): class BaseConsumerIntegrationTest(unittest.TestCase): - exchange = 'default' # see settings.EXCHANGES body = staticmethod(random_body) @@ -170,12 +168,10 @@ def get_producer(self, handler, routing_key=None): handler.channel, exchange=handler.exchanges[handler.exchange], routing_key=handler.routing_key if routing_key is None else routing_key, - serializer='json' + serializer='json', ) def get_handlers_for_key(self, routing_key): return [ - handler - for handler in self.handlers - if handler.routing_key == routing_key + handler for handler in self.handlers if handler.routing_key == routing_key ] diff --git a/tests/test_consume_handler.py b/tests/test_consume_handler.py index 7824f1d..c10ee0f 100644 --- a/tests/test_consume_handler.py +++ b/tests/test_consume_handler.py @@ -11,14 +11,12 @@ class ConsumeMessageHandlerTest(BaseConsumerIntegrationTest): - @flaky(max_runs=5, min_passes=5) def test_consume_basic(self): """ Should run the wrapped function when a message arrives with its routing key. """ with mock.patch.object(ec, 'REGISTRY', new=dict()) as reg: - f1 = message_handler('my.routing.key1')( mock.MagicMock(__name__='mock_handler1') ) @@ -54,7 +52,9 @@ def test_consume_basic(self): self.connection.drain_events(timeout=0.3) except socket.timeout as exc: e = exc - self.assertIsNotNone(e, msg="e=None here means task was unexpectedly retried") + self.assertIsNotNone( + e, msg="e=None here means task was unexpectedly retried" + ) # no further calls f1.call_count = 1 f2.call_count = 1 @@ -67,22 +67,23 @@ def test_consume_custom_queue_name(self): appropriate handler will be called in each case. """ - with ( mock.patch.object(ec, 'REGISTRY', new=dict()) as reg, - patch('event_consumer.handlers.settings.EXCHANGES', { + patch( + 'event_consumer.handlers.settings.EXCHANGES', + { 'custom': { 'name': 'custom', 'type': 'topic', - } } - ), - ): + }, + ), + ): # we have to use a named exchange to be able to bind a custom queue name - f1 = message_handler('my.routing.key1', queue='custom_queue', exchange='custom')( - mock.MagicMock(__name__='mock_handler1') - ) - + f1 = message_handler( + 'my.routing.key1', queue='custom_queue', exchange='custom' + )(mock.MagicMock(__name__='mock_handler1')) + assert len(reg) == 1 self.configure_handlers() @@ -105,7 +106,9 @@ def test_consume_custom_queue_name(self): self.connection.drain_events(timeout=0.3) except socket.timeout as exc: e = exc - self.assertIsNotNone(e, msg="e=None here means task was unexpectedly retried") + self.assertIsNotNone( + e, msg="e=None here means task was unexpectedly retried" + ) # no further calls f1.call_count = 1 @@ -116,17 +119,15 @@ def test_consume_wildcard_route(self): Test that we can connect multiple routing keys on the same queue and the appropriate handler will be called in each case. """ - with ( - mock.patch.object(ec, 'REGISTRY', new=dict()) as reg, - patch('event_consumer.handlers.settings.EXCHANGES', { + with mock.patch.object(ec, 'REGISTRY', new=dict()) as reg, patch( + 'event_consumer.handlers.settings.EXCHANGES', + { 'custom': { 'name': 'custom', 'type': 'topic', - } } - ) + }, ): - f1 = message_handler('my.routing.*', exchange='custom')( mock.MagicMock(__name__='mock_handler1') ) @@ -157,7 +158,9 @@ def test_consume_wildcard_route(self): self.connection.drain_events(timeout=0.3) except socket.timeout as exc: e = exc - self.assertIsNotNone(e, msg="e=None here means task was unexpectedly retried") + self.assertIsNotNone( + e, msg="e=None here means task was unexpectedly retried" + ) # no further calls f1.call_count = 2 @@ -168,15 +171,14 @@ def test_consume_multiple_routes(self): Test that we can connect multiple routing keys on the same queue and the appropriate handler will be called in each case. """ - with ( - mock.patch.object(ec, 'REGISTRY', new=dict()) as reg, - patch('event_consumer.handlers.settings.EXCHANGES', { + with mock.patch.object(ec, 'REGISTRY', new=dict()) as reg, patch( + 'event_consumer.handlers.settings.EXCHANGES', + { 'custom': { 'name': 'custom', 'type': 'topic', - } } - ) + }, ): decorator = message_handler( ['my.routing.key1', 'my.routing.key2'], @@ -211,6 +213,8 @@ def test_consume_multiple_routes(self): self.connection.drain_events(timeout=0.3) except socket.timeout as exc: e = exc - self.assertIsNotNone(e, msg="e=None here means task was unexpectedly retried") + self.assertIsNotNone( + e, msg="e=None here means task was unexpectedly retried" + ) # no further calls f1.call_count = 2 diff --git a/tests/test_consumer_step.py b/tests/test_consumer_step.py index 2d784da..9cda30d 100644 --- a/tests/test_consumer_step.py +++ b/tests/test_consumer_step.py @@ -8,7 +8,6 @@ from unittest.mock import patch - def test_get_handlers_with_defaults(): """ Should build handlers from tasks decorated with `@message_handler` @@ -56,19 +55,21 @@ def test_get_handlers_queue_prefix(*mocks): and use defaults for routing key and exchange if none provided """ with mock.patch.object(ec, 'REGISTRY', new=dict()) as reg: - # named exchange is required if using QUEUE_NAME_PREFIX with ( pytest.raises(InvalidQueueRegistration), patch('event_consumer.handlers.settings.QUEUE_NAME_PREFIX', 'myapp:'), - patch('event_consumer.handlers.settings.EXCHANGES', { - 'custom': { - 'name': 'custom', - 'type': 'topic', + patch( + 'event_consumer.handlers.settings.EXCHANGES', + { + 'custom': { + 'name': 'custom', + 'type': 'topic', } - } - ) + }, + ), ): + @message_handler('my.routing.key1') def bad(body): return None @@ -116,13 +117,13 @@ def test_get_handlers_with_queue_and_exchange(*mocks): Should build handlers from tasks decorated with `@message_handler` using the specified routing key, queue and exchange """ - with ( - mock.patch.object(ec, 'REGISTRY', new=dict()) as reg, - patch('event_consumer.handlers.settings.EXCHANGES', {'my.exchange1': {}, 'my.exchange2': {}}) + with mock.patch.object(ec, 'REGISTRY', new=dict()) as reg, patch( + 'event_consumer.handlers.settings.EXCHANGES', + {'my.exchange1': {}, 'my.exchange2': {}}, ): - # named exchange is required if using custom queue name with pytest.raises(InvalidQueueRegistration): + @message_handler('my.routing.key1', 'my.queue1') def bad(body): return None @@ -142,23 +143,17 @@ def f3(body): assert len(reg) == 3 - handler_reg1 = reg[ - QueueKey(queue='my.queue1', exchange='my.exchange1') - ] + handler_reg1 = reg[QueueKey(queue='my.queue1', exchange='my.exchange1')] assert handler_reg1.handler is f1 assert handler_reg1.routing_key == 'my.routing.key1' assert handler_reg1.queue_arguments == {} - handler_reg2 = reg[ - QueueKey(queue='my.queue2', exchange='my.exchange1') - ] + handler_reg2 = reg[QueueKey(queue='my.queue2', exchange='my.exchange1')] assert handler_reg2.handler is f2 assert handler_reg2.routing_key == 'my.routing.key2' assert handler_reg2.queue_arguments == {} - handler_reg3 = reg[ - QueueKey(queue='my.queue2', exchange='my.exchange2') - ] + handler_reg3 = reg[QueueKey(queue='my.queue2', exchange='my.exchange2')] assert handler_reg3.handler is f3 assert handler_reg3.routing_key == 'my.routing.key2' assert handler_reg3.queue_arguments == {} @@ -226,30 +221,29 @@ def test_get_handlers_same_queue_name_and_exchange(): """ Attempt to attach handler with same queue name + exchange should fail. """ - with ( - mock.patch.object(ec, 'REGISTRY', new=dict()) as reg, - patch('event_consumer.handlers.settings.EXCHANGES', { + with mock.patch.object(ec, 'REGISTRY', new=dict()) as reg, patch( + 'event_consumer.handlers.settings.EXCHANGES', + { 'custom': { 'name': 'custom', 'type': 'topic', - } } - ) + }, ): + @message_handler('my.routing.key1', queue='custom_queue', exchange='custom') def f1(body): return None with pytest.raises(InvalidQueueRegistration): + @message_handler('my.routing.key2', queue='custom_queue', exchange='custom') def f2(body): return None assert len(reg) == 1 - handler_reg1 = reg[ - QueueKey(queue='custom_queue', exchange='custom') - ] + handler_reg1 = reg[QueueKey(queue='custom_queue', exchange='custom')] assert handler_reg1.handler is f1 assert handler_reg1.routing_key == 'my.routing.key1' assert handler_reg1.queue_arguments == {} @@ -273,13 +267,15 @@ def test_get_handlers_with_multiple_routes(*mocks): Can connect the handler to multiple routing keys, each having a queue. """ with mock.patch.object(ec, 'REGISTRY', new=dict()) as reg: - # custom queue name is not possible with multiple routes, even with named exchange - with ( - pytest.raises(InvalidQueueRegistration), - patch('event_consumer.handlers.settings.EXCHANGES', {'my.exchange1': {}, 'my.exchange2': {}}) + with pytest.raises(InvalidQueueRegistration), patch( + 'event_consumer.handlers.settings.EXCHANGES', + {'my.exchange1': {}, 'my.exchange2': {}}, ): - @message_handler(['my.routing.key1', 'my.routing.key2'], 'my.queue1', 'my.exchange1') + + @message_handler( + ['my.routing.key1', 'my.routing.key2'], 'my.queue1', 'my.exchange1' + ) def bad(body): return None @@ -289,16 +285,12 @@ def f1(body): assert len(reg) == 2 - handler_reg1 = reg[ - QueueKey(queue='my.routing.key1', exchange='default') - ] + handler_reg1 = reg[QueueKey(queue='my.routing.key1', exchange='default')] assert handler_reg1.handler is f1 assert handler_reg1.routing_key == 'my.routing.key1' assert handler_reg1.queue_arguments == {} - handler_reg2 = reg[ - QueueKey(queue='my.routing.key2', exchange='default') - ] + handler_reg2 = reg[QueueKey(queue='my.routing.key2', exchange='default')] assert handler_reg2.handler is f1 assert handler_reg2.routing_key == 'my.routing.key2' assert handler_reg2.queue_arguments == {} diff --git a/tests/test_retry_handler.py b/tests/test_retry_handler.py index 699dc61..fce3de9 100644 --- a/tests/test_retry_handler.py +++ b/tests/test_retry_handler.py @@ -9,7 +9,6 @@ class AMQPRetryHandlerIntegrationTest(BaseRetryHandlerIntegrationTest): - def test_wrapped_func(self): """Should run the wrapped function when a message arrives with its routing key""" with mock.patch.object(self.handler, 'func') as f: @@ -26,7 +25,9 @@ def test_wrapped_func(self): self.connection.drain_events(timeout=0.3) except socket.timeout as exc: e = exc - self.assertIsNotNone(e, msg="e=None here means task was unexpectedly retried") + self.assertIsNotNone( + e, msg="e=None here means task was unexpectedly retried" + ) f.call_count = 1 def test_wrapped_func_raises_exception(self): @@ -51,8 +52,7 @@ def test_wrapped_func_raises_exception(self): archived_body, archived_message = self.archives[0] self.assertEqual(body, archived_body) self.assertEqual( - archived_message.headers[settings.RETRY_HEADER], - settings.MAX_RETRIES + archived_message.headers[settings.RETRY_HEADER], settings.MAX_RETRIES ) self.assertEqual(f.call_count, expected_attempts) diff --git a/tests/test_retry_handler_django.py b/tests/test_retry_handler_django.py index 367e4ab..ed6fdfa 100644 --- a/tests/test_retry_handler_django.py +++ b/tests/test_retry_handler_django.py @@ -22,7 +22,6 @@ def cleanup(): @pytest.mark.django_db(transaction=True) class DjangoDBTransactionIntegrationTest(BaseRetryHandlerIntegrationTest): - def setUp(self): self.addCleanup(cleanup) super(DjangoDBTransactionIntegrationTest, self).setUp() @@ -60,7 +59,9 @@ def good(*args, **kwargs): self.connection.drain_events(timeout=0.3) except socket.timeout as exc: e1 = exc - self.assertIsNotNone(e1, msg="e1=None here means task was unexpectedly retried") + self.assertIsNotNone( + e1, msg="e1=None here means task was unexpectedly retried" + ) f.call_count = 1 with mock.patch.object(self.handler, 'func') as f: @@ -93,7 +94,9 @@ def good(*args, **kwargs): self.connection.drain_events(timeout=0.3) except socket.timeout as exc: e2 = exc - self.assertIsNotNone(e2, msg="e2=None here means task was unexpectedly retried") + self.assertIsNotNone( + e2, msg="e2=None here means task was unexpectedly retried" + ) f.call_count = 1 self.assertEqual(len(User.objects.all()), 3) @@ -132,7 +135,9 @@ def good(*args, **kwargs): self.connection.drain_events(timeout=0.3) except socket.timeout as exc: e1 = exc - self.assertIsNotNone(e1, msg="e1=None here means task was unexpectedly retried") + self.assertIsNotNone( + e1, msg="e1=None here means task was unexpectedly retried" + ) f.call_count = 1 with mock.patch.object(self.handler, 'func') as f: @@ -165,7 +170,9 @@ def good(*args, **kwargs): self.connection.drain_events(timeout=0.3) except socket.timeout as exc: e2 = exc - self.assertIsNotNone(e2, msg="e2=None here means task was unexpectedly retried") + self.assertIsNotNone( + e2, msg="e2=None here means task was unexpectedly retried" + ) f.call_count = 1 self.assertEqual(len(User.objects.all()), 3) From 623b35a035f91070c84078904bfaea44cbed66a9 Mon Sep 17 00:00:00 2001 From: rob-parker-what <53180582+rob-parker-what@users.noreply.github.com> Date: Wed, 29 Mar 2023 16:29:38 +0100 Subject: [PATCH 6/6] relax pinning --- requirements-base.txt | 2 +- requirements-test.txt | 12 ++++++------ requirements.txt | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/requirements-base.txt b/requirements-base.txt index e0a1dba..1388284 100644 --- a/requirements-base.txt +++ b/requirements-base.txt @@ -1 +1 @@ -six==1.16.0 \ No newline at end of file +six>=1.16.0,<2.0.0 \ No newline at end of file diff --git a/requirements-test.txt b/requirements-test.txt index c9d790c..da16e3a 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,8 +1,8 @@ -r requirements.txt -pytest==7.2.2 -mock==5.0.1 -flaky==3.7.0 -pytest-django==4.5.2 -factory-boy==3.2.1 -Django==4.1.7 +pytest +mock +flaky +pytest-django +factory-boy>=3.2.1,<4.0.0 +Django>=4.1.7,<5.0.0 diff --git a/requirements.txt b/requirements.txt index 86410c6..2a09ad0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -r requirements-base.txt -celery==5.2.7 -kombu==5.2.4 \ No newline at end of file +celery>=5.2.7,<6.0.0 +kombu>=5.2.4,<6.0.0 \ No newline at end of file