diff --git a/.travis.yml b/.travis.yml index 62be8d60..c02d4e34 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,55 +5,54 @@ sudo: false cache: directories: - $HOME/.composer/cache + - $HOME/phpcs-cache php: - - 5.6 - - 7.0 - 7.1 - 7.2 + - 7.3 + - 7.4 -services: - - rabbitmq +addons: + apt: + packages: + - rabbitmq-server env: matrix: - - NETTE=nette-2.4-dev - - NETTE=nette-2.4 + - RUN_TESTS=1 # dev + - RUN_TESTS=1 COMPOSER_EXTRA_ARGS="--prefer-stable" + - RUN_TESTS=1 COMPOSER_EXTRA_ARGS="--prefer-lowest --prefer-stable" matrix: + fast_finish: true include: - - php: 5.6 - env: NETTE=nette-2.4 COMPOSER_EXTRA_ARGS="--prefer-lowest --prefer-stable" - - php: 7.0 - env: NETTE=nette-2.4 COVERAGE="--coverage ./coverage.xml --coverage-src ./src" TESTER_RUNTIME="phpdbg" + - php: 7.3 + env: COMPOSER_EXTRA_ARGS="" COVERAGE="--coverage ./coverage.xml --coverage-src ./src" TESTER_RUNTIME="phpdbg" + - php: 7.3 + env: COMPOSER_EXTRA_ARGS="" PHPSTAN=1 RUN_TESTS=0 + - php: 7.3 + env: COMPOSER_EXTRA_ARGS="" CODING_STANDARD=1 RUN_TESTS=0 exclude: - - php: 7.1 - env: NETTE=nette-2.4-dev - - php: 7.1 - env: NETTE=nette-2.4 - - php: 7.2 - env: NETTE=nette-2.4-dev - - php: 7.2 - env: NETTE=nette-2.4 + - php: 7.3 + env: COMPOSER_EXTRA_ARGS="" + allow_failures: + - env: RUN_TESTS=1 before_install: - travis_retry composer self-update - - wget -O /tmp/composer-nette https://raw.githubusercontent.com/Kdyby/TesterExtras/master/bin/composer-nette.php - - php /tmp/composer-nette install: - - travis_retry composer update --no-interaction --prefer-dist $COMPOSER_EXTRA_ARGS - - travis_retry composer create-project --no-interaction jakub-onderka/php-parallel-lint /tmp/php-parallel-lint - - travis_retry composer create-project --no-interaction kdyby/code-checker /tmp/code-checker - - travis_retry wget -O /tmp/coveralls.phar https://github.com/satooshi/php-coveralls/releases/download/v1.0.1/coveralls.phar + - travis_retry composer update --no-interaction --no-suggest --no-progress --prefer-dist $COMPOSER_EXTRA_ARGS # update because we may need --prefer-lowest option script: - - vendor/bin/tester $COVERAGE -s -p ${TESTER_RUNTIME:-php} -c ./tests/php.ini-unix ./tests/KdybyTests/ - - php /tmp/php-parallel-lint/parallel-lint.php -e php,phpt --exclude vendor . - - php /tmp/code-checker/src/code-checker.php --short-arrays + - if [ "$PHPSTAN" = "1" ]; then vendor/bin/phpstan analyse -l 2 -c phpstan.neon src tests/KdybyTests; fi + - if [ "$CODING_STANDARD" = "1" ]; then php vendor/bin/phpcs --standard=ruleset.xml --cache=$HOME/phpcs-cache/.phpcs-cache --encoding=utf-8 -sp src tests/KdybyTests; fi + - if [ "$CODING_STANDARD" = "1" ]; then vendor/bin/parallel-lint -e php,phpt --exclude vendor .; fi + - if [ "$RUN_TESTS" = "1" ]; then vendor/bin/tester $COVERAGE -s -p ${TESTER_RUNTIME:-php} -c ./tests/php.ini-unix ./tests/KdybyTests/; fi after_script: - - if [ "$COVERAGE" != "" ]; then php /tmp/coveralls.phar --verbose --config tests/.coveralls.yml || true; fi + - if [ "$COVERAGE" != "" ]; then vendor/bin/php-coveralls --verbose --config tests/.coveralls.yml || true; fi after_failure: - 'for i in $(find ./tests -name \*.actual); do echo "--- $i"; cat $i; echo; echo; done' diff --git a/composer.json b/composer.json index 8634d68b..b19a98ba 100644 --- a/composer.json +++ b/composer.json @@ -16,54 +16,50 @@ } ], "require": { - "php": ">=5.4", - "nette/di": "~2.4@dev", - "nette/utils": "~2.4@dev", + "php": "^7.1", + "nette/di": "~2.4.10 || ^3.0", + "nette/utils": "^3.0", "php-amqplib/php-amqplib": "~2.6.2" }, "require-dev": { - "nette/application": "~2.4@dev", - "nette/bootstrap": "~2.4@dev", - "nette/caching": "~2.4@dev", - "nette/component-model": "~2.3@dev", - "nette/database": "~2.4@dev", - "nette/deprecated": "~2.3@dev", - "nette/di": "~2.4@dev", - "nette/finder": "~2.4@dev", - "nette/forms": "~2.4@dev", - "nette/http": "~2.4@dev", - "nette/mail": "~2.4@dev", - "nette/neon": "~2.4@dev", - "nette/php-generator": "~2.4@dev", - "nette/reflection": "~2.4@dev", - "nette/robot-loader": "~2.4@dev", - "nette/safe-stream": "~2.3@dev", - "nette/security": "~2.4@dev", - "nette/tokenizer": "~2.3@dev", - "nette/utils": "~2.4@dev", - "latte/latte": "~2.4@dev", - "tracy/tracy": "~2.4@dev", + "kdyby/console": "~2.8", - "nette/tester": "~1.3@rc", - "mockery/mockery": "~0.9.1", - "kdyby/console": "~2.5@dev" + "nette/bootstrap": "~2.4 || ~3.0", + "latte/latte": "~2.4", + "tracy/tracy": "~2.4", + "phpstan/phpstan-shim": "^0.11.4", + "kdyby/coding-standard": "dev-master", + "php-coveralls/php-coveralls": "^2.1", + "nette/tester": "^2.3.2", + "mockery/mockery": "~1.3.0", + "jakub-onderka/php-parallel-lint": "^1.0", + "typo3/class-alias-loader": "^1.0", + "nette/caching": "^3.0" }, "support": { "email": "filip@prochazka.su", "issues": "https://github.com/Kdyby/RabbitMq/issues" }, "autoload": { - "psr-0": { - "Kdyby\\RabbitMq\\": "src/" - }, - "classmap": [ - "src/Kdyby/RabbitMq/exceptions.php" - ] + "psr-4": { + "Kdyby\\RabbitMq\\": "src/Kdyby/RabbitMq" + } }, + "autoload-dev": { + "psr-4": { + "KdybyTests\\RabbitMq\\": "tests/KdybyTests/RabbitMq" + } + }, + "minimum-stability": "dev", "extra": { "branch-alias": { - "dev-master": "1.0-dev" + "dev-master": "1.3-dev" + }, + "typo3/class-alias-loader": { + "class-alias-maps": [ + "src/Kdyby/RabbitMq/DI/ClassAliasMap.php" + ] } } } diff --git a/phpstan.neon b/phpstan.neon new file mode 100644 index 00000000..6f792ebb --- /dev/null +++ b/phpstan.neon @@ -0,0 +1,8 @@ +parameters: + ignoreErrors: + - '#Parameter \$callback of method KdybyTests\\RabbitMq\\Mock\\ChannelMock\:\:basic_consume\(\) has invalid typehint type PhpAmqpLib\\Channel\\callback.#' + + excludes_analyse: + - *src/Kdyby/RabbitMq/DI/ClassAliasMap.php + + diff --git a/ruleset.xml b/ruleset.xml new file mode 100644 index 00000000..1ae3907c --- /dev/null +++ b/ruleset.xml @@ -0,0 +1,4 @@ + + + + diff --git a/src/Kdyby/RabbitMq/AmqpMember.php b/src/Kdyby/RabbitMq/AmqpMember.php index 3065891a..7dd5be55 100644 --- a/src/Kdyby/RabbitMq/AmqpMember.php +++ b/src/Kdyby/RabbitMq/AmqpMember.php @@ -1,32 +1,28 @@ - * @author Filip Procházka - * * @property array $exchangeOptions * @property array $queueOptions */ abstract class AmqpMember { - use Nette\SmartObject; + use \Nette\SmartObject; /** - * @var Connection + * @var \Kdyby\RabbitMq\Connection */ protected $conn; /** - * @var Channel + * @var \Kdyby\RabbitMq\Channel */ protected $ch; @@ -43,14 +39,14 @@ abstract class AmqpMember /** * @var bool */ - protected $autoSetupFabric = true; + protected $autoSetupFabric = TRUE; /** * @var array */ protected $basicProperties = [ 'content_type' => 'text/plain', - 'delivery_mode' => 2 + 'delivery_mode' => 2, ]; /** @@ -58,59 +54,51 @@ abstract class AmqpMember */ protected $exchangeOptions = [ 'name' => NULL, - 'passive' => false, - 'durable' => true, - 'autoDelete' => false, - 'internal' => false, - 'nowait' => false, - 'arguments' => null, - 'ticket' => null, - 'declare' => true, + 'passive' => FALSE, + 'durable' => TRUE, + 'autoDelete' => FALSE, + 'internal' => FALSE, + 'nowait' => FALSE, + 'arguments' => NULL, + 'ticket' => NULL, + 'declare' => TRUE, ]; /** * @var bool */ - protected $exchangeDeclared = false; + protected $exchangeDeclared = FALSE; /** * @var array */ protected $queueOptions = [ 'name' => '', - 'passive' => false, - 'durable' => true, - 'exclusive' => false, - 'autoDelete' => false, - 'nowait' => false, - 'arguments' => null, - 'ticket' => null, + 'passive' => FALSE, + 'durable' => TRUE, + 'exclusive' => FALSE, + 'autoDelete' => FALSE, + 'nowait' => FALSE, + 'arguments' => NULL, + 'ticket' => NULL, 'routing_keys' => [], ]; /** * @var bool */ - protected $queueDeclared = false; + protected $queueDeclared = FALSE; - - - /** - * @param Connection $conn - * @param string $consumerTag - */ - public function __construct(Connection $conn, $consumerTag = null) + public function __construct(Connection $conn, ?string $consumerTag = NULL) { $this->conn = $conn; - $this->consumerTag = empty($consumerTag) ? sprintf("PHPPROCESS_%s_%s", gethostname(), getmypid()) : $consumerTag; + $this->consumerTag = empty($consumerTag) ? \sprintf('PHPPROCESS_%s_%s', \gethostname(), \getmypid()) : $consumerTag; if (!($conn instanceof AMQPLazyConnection)) { $this->getChannel(); } } - - public function __destruct() { if ($this->ch) { @@ -122,12 +110,7 @@ public function __destruct() } } - - - /** - * @return AMQPChannel - */ - public function getChannel() + public function getChannel(): AMQPChannel { if (empty($this->ch)) { $this->ch = $this->conn->channel(); @@ -136,24 +119,17 @@ public function getChannel() return $this->ch; } - - - /** - * @param AMQPChannel $ch - */ - public function setChannel(AMQPChannel $ch) + public function setChannel(AMQPChannel $ch): void { $this->ch = $ch; } - - /** * @throws \InvalidArgumentException - * @param array $options + * @param array $options * @return void */ - public function setExchangeOptions(array $options = []) + public function setExchangeOptions(array $options = []): void { if (!isset($options['name'])) { throw new \InvalidArgumentException('You must provide an exchange name'); @@ -166,51 +142,37 @@ public function setExchangeOptions(array $options = []) $this->exchangeOptions = $options + $this->exchangeOptions; } - - /** - * @return array + * @return array */ - public function getExchangeOptions() + public function getExchangeOptions(): array { return $this->exchangeOptions; } - - /** - * @param array $options + * @param array $options * @return void */ - public function setQueueOptions(array $options = []) + public function setQueueOptions(array $options = []): void { $this->queueOptions = $options + $this->queueOptions; } - - /** - * @return array + * @return array */ - public function getQueueOptions() + public function getQueueOptions(): array { return $this->queueOptions; } - - - /** - * @param string $routingKey - * @return void - */ - public function setRoutingKey($routingKey) + public function setRoutingKey(string $routingKey): void { $this->routingKey = $routingKey; } - - - protected function exchangeDeclare() + protected function exchangeDeclare(): void { if (empty($this->exchangeOptions['declare']) || empty($this->exchangeOptions['name'])) { return; @@ -225,28 +187,29 @@ protected function exchangeDeclare() $this->exchangeOptions['internal'], $this->exchangeOptions['nowait'], $this->exchangeOptions['arguments'], - $this->exchangeOptions['ticket']); + $this->exchangeOptions['ticket'] + ); - $this->exchangeDeclared = true; + $this->exchangeDeclared = TRUE; } - - - protected function queueDeclare() + protected function queueDeclare(): void { if (empty($this->queueOptions['name'])) { return; } $this->doQueueDeclare($this->queueOptions['name'], $this->queueOptions); - $this->queueDeclared = true; + $this->queueDeclared = TRUE; } - - - protected function doQueueDeclare($name, array $options) + /** + * @param string $name + * @param array $options + */ + protected function doQueueDeclare(string $name, array $options): void { - list($queueName, ,) = $this->getChannel()->queue_declare( + [$queueName] = $this->getChannel()->queue_declare( $name, $options['passive'], $options['durable'], @@ -269,9 +232,7 @@ protected function doQueueDeclare($name, array $options) } } - - - public function setupFabric() + public function setupFabric(): void { if (!$this->exchangeDeclared) { $this->exchangeDeclare(); @@ -282,13 +243,12 @@ public function setupFabric() } } - - /** * disables the automatic SetupFabric when using a consumer or producer */ - public function disableAutoSetupFabric() + public function disableAutoSetupFabric(): void { - $this->autoSetupFabric = false; + $this->autoSetupFabric = FALSE; } + } diff --git a/src/Kdyby/RabbitMq/AnonymousConsumer.php b/src/Kdyby/RabbitMq/AnonymousConsumer.php index c4af5e91..2bbec597 100644 --- a/src/Kdyby/RabbitMq/AnonymousConsumer.php +++ b/src/Kdyby/RabbitMq/AnonymousConsumer.php @@ -1,15 +1,10 @@ - * @author Filip Procházka - */ -class AnonymousConsumer extends Consumer +class AnonymousConsumer extends \Kdyby\RabbitMq\Consumer { public function __construct(Connection $conn) @@ -18,13 +13,13 @@ public function __construct(Connection $conn) $this->setQueueOptions([ 'name' => '', - 'passive' => false, - 'durable' => false, - 'exclusive' => true, - 'autoDelete' => true, - 'nowait' => false, - 'arguments' => null, - 'ticket' => null + 'passive' => FALSE, + 'durable' => FALSE, + 'exclusive' => TRUE, + 'autoDelete' => TRUE, + 'nowait' => FALSE, + 'arguments' => NULL, + 'ticket' => NULL, ]); } diff --git a/src/Kdyby/RabbitMq/BaseConsumer.php b/src/Kdyby/RabbitMq/BaseConsumer.php index 9f966cdf..3895ae83 100644 --- a/src/Kdyby/RabbitMq/BaseConsumer.php +++ b/src/Kdyby/RabbitMq/BaseConsumer.php @@ -1,18 +1,15 @@ - * @author Filip Procházka - * - * @method onStop(BaseConsumer $self) + * @method onStop(\Kdyby\RabbitMq\BaseConsumer $self) */ -abstract class BaseConsumer extends AmqpMember +abstract class BaseConsumer extends \Kdyby\RabbitMq\AmqpMember { /** @@ -51,7 +48,7 @@ abstract class BaseConsumer extends AmqpMember protected $qosOptions = [ 'prefetchSize' => 0, 'prefetchCount' => 0, - 'global' => FALSE + 'global' => FALSE, ]; /** @@ -59,25 +56,19 @@ abstract class BaseConsumer extends AmqpMember */ protected $qosDeclared = FALSE; - - - public function setCallback($callback) + public function setCallback(callable $callback): void { Callback::check($callback); $this->callback = $callback; } - - - public function stopConsuming() + public function stopConsuming(): void { $this->getChannel()->basic_cancel($this->getConsumerTag()); $this->onStop($this); } - - - protected function setupConsumer() + protected function setupConsumer(): void { if ($this->autoSetupFabric) { $this->setupFabric(); @@ -98,19 +89,17 @@ protected function setupConsumer() ); } - - - protected function maybeStopConsumer() + protected function maybeStopConsumer(): void { - if (extension_loaded('pcntl') && (defined('AMQP_WITHOUT_SIGNALS') ? !AMQP_WITHOUT_SIGNALS : true)) { - if (!function_exists('pcntl_signal_dispatch')) { + if (\extension_loaded('pcntl') && (\defined('AMQP_WITHOUT_SIGNALS') ? !AMQP_WITHOUT_SIGNALS : TRUE)) { + if (!\function_exists('pcntl_signal_dispatch')) { throw new \BadFunctionCallException("Function 'pcntl_signal_dispatch' is referenced in the php.ini 'disable_functions' and can't be called."); } - pcntl_signal_dispatch(); + \pcntl_signal_dispatch(); } - if ($this->forceStop || ($this->consumed == $this->target && $this->target > 0)) { + if ($this->forceStop || ($this->consumed === $this->target && $this->target > 0)) { $this->stopConsuming(); } else { @@ -118,29 +107,21 @@ protected function maybeStopConsumer() } } - - - public function setConsumerTag($tag) + public function setConsumerTag(string $tag): void { $this->consumerTag = $tag; } - - - public function getConsumerTag() + public function getConsumerTag(): string { return $this->consumerTag; } - - - public function forceStopConsumer() + public function forceStopConsumer(): void { $this->forceStop = TRUE; } - - /** * Sets the qos settings for the current channel * Consider that prefetchSize and global do not work with rabbitMQ version <= 8.0 @@ -149,7 +130,7 @@ public function forceStopConsumer() * @param int $prefetchCount * @param bool $global */ - public function setQosOptions($prefetchSize = 0, $prefetchCount = 0, $global = FALSE) + public function setQosOptions(int $prefetchSize = 0, int $prefetchCount = 0, bool $global = FALSE): void { $this->qosOptions = [ 'prefetchSize' => $prefetchSize, @@ -158,11 +139,9 @@ public function setQosOptions($prefetchSize = 0, $prefetchCount = 0, $global = F ]; } - - - protected function qosDeclare() + protected function qosDeclare(): void { - if (!array_filter($this->qosOptions)) { + if (!\array_filter($this->qosOptions)) { return; } @@ -175,16 +154,12 @@ protected function qosDeclare() $this->qosDeclared = TRUE; } - - - public function setIdleTimeout($seconds) + public function setIdleTimeout(int $seconds): void { $this->idleTimeout = $seconds; } - - - public function getIdleTimeout() + public function getIdleTimeout(): int { return $this->idleTimeout; } diff --git a/src/Kdyby/RabbitMq/Channel.php b/src/Kdyby/RabbitMq/Channel.php index d4a4a8f6..b4342d59 100644 --- a/src/Kdyby/RabbitMq/Channel.php +++ b/src/Kdyby/RabbitMq/Channel.php @@ -1,36 +1,27 @@ - * @author Filip Procházka - */ -class Channel extends PhpAmqpLib\Channel\AMQPChannel +class Channel extends \PhpAmqpLib\Channel\AMQPChannel { /** - * @var Diagnostics\Panel + * @var \Kdyby\RabbitMq\Diagnostics\Panel */ private $panel; - - - public function injectPanel(Diagnostics\Panel $panel) + public function injectPanel(Diagnostics\Panel $panel): void { $this->panel = $panel; } - - - public function basic_publish($msg, $exchange = '', $routingKey = '', $mandatory = false, $immediate = false, $ticket = NULL) + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName.NotCamelCaps + public function basic_publish($msg, $exchange = '', $routingKey = '', $mandatory = FALSE, $immediate = FALSE, $ticket = NULL) { if ($this->panel) { - $this->panel->published(get_defined_vars()); // all args + $this->panel->published(\get_defined_vars()); // all args } parent::basic_publish($msg, $exchange, $routingKey, $mandatory, $immediate, $ticket); diff --git a/src/Kdyby/RabbitMq/Command/AnonConsumerCommand.php b/src/Kdyby/RabbitMq/Command/AnonConsumerCommand.php index 59ea5678..1ef4fc0c 100644 --- a/src/Kdyby/RabbitMq/Command/AnonConsumerCommand.php +++ b/src/Kdyby/RabbitMq/Command/AnonConsumerCommand.php @@ -1,22 +1,17 @@ - * @author Filip Procházka - */ -class AnonConsumerCommand extends BaseConsumerCommand +class AnonConsumerCommand extends \Kdyby\RabbitMq\Command\BaseConsumerCommand { - protected function configure() + protected function configure(): void { parent::configure(); @@ -27,16 +22,14 @@ protected function configure() $this->getDefinition()->getOption('route')->setDefault('#'); } - - - protected function initialize(InputInterface $input, OutputInterface $output) + protected function initialize(InputInterface $input, OutputInterface $output): void { parent::initialize($input, $output); if (!$this->consumer instanceof AnonymousConsumer) { - throw new InvalidArgumentException( + throw new \Kdyby\RabbitMq\Exception\InvalidArgumentException( 'Expected instance of Kdyby\RabbitMq\AnonymousConsumer, ' . - 'but consumer ' . $input->getArgument('name'). ' is ' . get_class($this->consumer) + 'but consumer ' . $input->getArgument('name') . ' is ' . \get_class($this->consumer) ); } } diff --git a/src/Kdyby/RabbitMq/Command/BaseConsumerCommand.php b/src/Kdyby/RabbitMq/Command/BaseConsumerCommand.php index b63978fd..aca6d1d9 100644 --- a/src/Kdyby/RabbitMq/Command/BaseConsumerCommand.php +++ b/src/Kdyby/RabbitMq/Command/BaseConsumerCommand.php @@ -1,21 +1,15 @@ - * @author Filip Procházka - */ -abstract class BaseConsumerCommand extends Command +abstract class BaseConsumerCommand extends \Symfony\Component\Console\Command\Command { /** @@ -25,7 +19,7 @@ abstract class BaseConsumerCommand extends Command public $connection; /** - * @var Consumer|\Kdyby\RabbitMq\Consumer + * @var \Kdyby\RabbitMq\Consumer */ protected $consumer; @@ -34,113 +28,96 @@ abstract class BaseConsumerCommand extends Command */ protected $amount; - - - protected function configure() + protected function configure(): void { $this ->addArgument('name', InputArgument::REQUIRED, 'Consumer Name') ->addOption('messages', 'm', InputOption::VALUE_OPTIONAL, 'Messages to consume', 0) ->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '') - ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', null) + ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', NULL) ->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging') ->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals'); } - - /** - * @param InputInterface $input An InputInterface instance - * @param OutputInterface $output An OutputInterface instance - * * @throws \InvalidArgumentException When the number of messages to consume is less than 0 * @throws \BadFunctionCallException When the pcntl is not installed and option -s is true */ - protected function initialize(InputInterface $input, OutputInterface $output) + protected function initialize(InputInterface $input, OutputInterface $output): void { parent::initialize($input, $output); - if (defined('AMQP_WITHOUT_SIGNALS') === false) { - define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals')); + if (\defined('AMQP_WITHOUT_SIGNALS') === FALSE) { + \define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals')); } - if (!AMQP_WITHOUT_SIGNALS && extension_loaded('pcntl')) { - if (!function_exists('pcntl_signal')) { + if (!AMQP_WITHOUT_SIGNALS && \extension_loaded('pcntl')) { + if (!\function_exists('pcntl_signal')) { throw new \BadFunctionCallException("Function 'pcntl_signal' is referenced in the php.ini 'disable_functions' and can't be called."); } - pcntl_signal(SIGTERM, [$this, 'signalTerm']); - pcntl_signal(SIGINT, [$this, 'signalInt']); - pcntl_signal(SIGHUP, [$this, 'signalHup']); + \pcntl_signal(SIGTERM, [$this, 'signalTerm']); + \pcntl_signal(SIGINT, [$this, 'signalInt']); + \pcntl_signal(SIGHUP, [$this, 'signalHup']); } - if (defined('AMQP_DEBUG') === false) { - define('AMQP_DEBUG', (bool) $input->getOption('debug')); + if (\defined('AMQP_DEBUG') === FALSE) { + \define('AMQP_DEBUG', (bool) $input->getOption('debug')); } - if (($this->amount = $input->getOption('messages')) < 0) { - throw new \InvalidArgumentException("The -m option should be null or greater than 0"); + $this->amount = (int) $input->getOption('messages'); + if ($this->amount < 0) { + throw new \InvalidArgumentException('The -m option should be null or greater than 0'); } $this->consumer = $this->connection->getConsumer($input->getArgument('name')); - if (!is_null($input->getOption('memory-limit')) && ctype_digit((string) $input->getOption('memory-limit')) && $input->getOption('memory-limit') > 0) { - $this->consumer->setMemoryLimit($input->getOption('memory-limit')); + /** @var int|null $memoryLimit */ + $memoryLimit = $input->getOption('memory-limit'); + if ($memoryLimit !== NULL && \ctype_digit((string) $memoryLimit) && $memoryLimit > 0) { + $this->consumer->setMemoryLimit($memoryLimit); } - if ($routingKey = $input->getOption('route')) { + $routingKey = $input->getOption('route'); + if ($routingKey) { $this->consumer->setRoutingKey($routingKey); } } - - - /** - * @param InputInterface $input An InputInterface instance - * @param OutputInterface $output An OutputInterface instance - * - * @return integer 0 if everything went fine, or an error code - */ - protected function execute(InputInterface $input, OutputInterface $output) + protected function execute(InputInterface $input, OutputInterface $output): void { $this->consumer->consume($this->amount); } - - /** * @internal for pcntl only */ - public function signalTerm() + public function signalTerm(): void { if ($this->consumer) { - pcntl_signal(SIGTERM, SIG_DFL); + \pcntl_signal(SIGTERM, SIG_DFL); $this->consumer->forceStopConsumer(); } } - - /** * @internal for pcntl only */ - public function signalInt() + public function signalInt(): void { if ($this->consumer) { - pcntl_signal(SIGINT, SIG_DFL); + \pcntl_signal(SIGINT, SIG_DFL); $this->consumer->forceStopConsumer(); } } - - /** * @internal for pcntl only */ - public function signalHup() + public function signalHup(): void { if ($this->consumer) { - pcntl_signal(SIGHUP, SIG_DFL); + \pcntl_signal(SIGHUP, SIG_DFL); $this->consumer->forceStopConsumer(); } // TODO: Implement restarting of consumer diff --git a/src/Kdyby/RabbitMq/Command/ConsumerCommand.php b/src/Kdyby/RabbitMq/Command/ConsumerCommand.php index da9fbb3e..cf2a3410 100644 --- a/src/Kdyby/RabbitMq/Command/ConsumerCommand.php +++ b/src/Kdyby/RabbitMq/Command/ConsumerCommand.php @@ -1,17 +1,13 @@ - * @author Filip Procházka - */ -class ConsumerCommand extends BaseConsumerCommand +class ConsumerCommand extends \Kdyby\RabbitMq\Command\BaseConsumerCommand { - protected function configure() + protected function configure(): void { parent::configure(); diff --git a/src/Kdyby/RabbitMq/Command/PurgeConsumerCommand.php b/src/Kdyby/RabbitMq/Command/PurgeConsumerCommand.php index fbd403b1..06a164df 100644 --- a/src/Kdyby/RabbitMq/Command/PurgeConsumerCommand.php +++ b/src/Kdyby/RabbitMq/Command/PurgeConsumerCommand.php @@ -1,21 +1,15 @@ - * @author Filip Procházka - */ -class PurgeConsumerCommand extends Command +class PurgeConsumerCommand extends \Symfony\Component\Console\Command\Command { /** @@ -24,31 +18,21 @@ class PurgeConsumerCommand extends Command */ public $connection; - - - protected function configure() + protected function configure(): void { $this ->setName('rabbitmq:purge') ->setDescription('Purges all messages in queue associated with given consumer') ->addArgument('name', InputArgument::REQUIRED, 'Consumer Name') - ->addOption('no-confirmation', null, InputOption::VALUE_NONE, 'Whether it must be confirmed before purging'); + ->addOption('no-confirmation', NULL, InputOption::VALUE_NONE, 'Whether it must be confirmed before purging'); } - - - /** - * @param InputInterface $input - * @param OutputInterface $output - * - * @return int - */ - protected function execute(InputInterface $input, OutputInterface $output) + protected function execute(InputInterface $input, OutputInterface $output): int { $noConfirmation = (bool) $input->getOption('no-confirmation'); if (!$noConfirmation && $input->isInteractive()) { - $confirmation = $this->getHelper('dialog')->askConfirmation($output, sprintf('Are you sure you wish to purge "%s" queue? (y/n)', $input->getArgument('name')), false); + $confirmation = $this->getHelper('dialog')->askConfirmation($output, \sprintf('Are you sure you wish to purge "%s" queue? (y/n)', $input->getArgument('name')), FALSE); if (!$confirmation) { $output->writeln('Purging cancelled!'); @@ -56,7 +40,7 @@ protected function execute(InputInterface $input, OutputInterface $output) } } - /** @var Consumer $consumer */ + /** @var \Kdyby\RabbitMq\Consumer $consumer */ $consumer = $this->connection->getConsumer($input->getArgument('name')); $consumer->purge(); diff --git a/src/Kdyby/RabbitMq/Command/RpcServerCommand.php b/src/Kdyby/RabbitMq/Command/RpcServerCommand.php index 4e734025..097d8b57 100644 --- a/src/Kdyby/RabbitMq/Command/RpcServerCommand.php +++ b/src/Kdyby/RabbitMq/Command/RpcServerCommand.php @@ -1,20 +1,15 @@ - * @author Filip Procházka - */ -class RpcServerCommand extends Command +class RpcServerCommand extends \Symfony\Component\Console\Command\Command { /** @@ -23,36 +18,29 @@ class RpcServerCommand extends Command */ public $connection; - - - protected function configure() + protected function configure(): void { $this ->setName('rabbitmq:rpc-server') - ->setDescription("Starts a configured RPC server") + ->setDescription('Starts a configured RPC server') ->addArgument('name', InputArgument::REQUIRED, 'Server Name') ->addOption('messages', 'm', InputOption::VALUE_OPTIONAL, 'Messages to consume', 0) - ->addOption('debug', 'd', InputOption::VALUE_OPTIONAL, 'Debug mode', false); + ->addOption('debug', 'd', InputOption::VALUE_OPTIONAL, 'Debug mode', FALSE); } - - /** * Executes the current command. * - * @param InputInterface $input An InputInterface instance - * @param OutputInterface $output An OutputInterface instance - * - * @return integer 0 if everything went fine, or an error code - * * @throws \InvalidArgumentException When the number of messages to consume is less than 0 */ - protected function execute(InputInterface $input, OutputInterface $output) + protected function execute(InputInterface $input, OutputInterface $output): void { - define('AMQP_DEBUG', (bool) $input->getOption('debug')); + \define('AMQP_DEBUG', (bool) $input->getOption('debug')); - if (($amount = $input->getOption('messages')) < 0) { - throw new \InvalidArgumentException("The -m option should be null or greater than 0"); + /** @var int $amount */ + $amount = $input->getOption('messages'); + if ($amount < 0) { + throw new \InvalidArgumentException('The -m option should be null or greater than 0'); } $rpcServer = $this->connection->getRpcServer($input->getArgument('name')); diff --git a/src/Kdyby/RabbitMq/Command/SetupFabricCommand.php b/src/Kdyby/RabbitMq/Command/SetupFabricCommand.php index 78a802ca..36db7875 100644 --- a/src/Kdyby/RabbitMq/Command/SetupFabricCommand.php +++ b/src/Kdyby/RabbitMq/Command/SetupFabricCommand.php @@ -1,21 +1,15 @@ - * @author Filip Procházka - */ -class SetupFabricCommand extends Command +class SetupFabricCommand extends \Symfony\Component\Console\Command\Command { /** @@ -24,9 +18,7 @@ class SetupFabricCommand extends Command */ public $container; - - - protected function configure() + protected function configure(): void { $this ->setName('rabbitmq:setup-fabric') @@ -34,12 +26,10 @@ protected function configure() ->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging'); } - - - protected function execute(InputInterface $input, OutputInterface $output) + protected function execute(InputInterface $input, OutputInterface $output): void { - if (defined('AMQP_DEBUG') === false) { - define('AMQP_DEBUG', (bool) $input->getOption('debug')); + if (\defined('AMQP_DEBUG') === FALSE) { + \define('AMQP_DEBUG', (bool) $input->getOption('debug')); } $output->writeln('Setting up the Rabbit MQ fabric'); @@ -48,10 +38,10 @@ protected function execute(InputInterface $input, OutputInterface $output) RabbitMqExtension::TAG_PRODUCER, RabbitMqExtension::TAG_CONSUMER, RabbitMqExtension::TAG_RPC_CLIENT, - RabbitMqExtension::TAG_RPC_SERVER + RabbitMqExtension::TAG_RPC_SERVER, ] as $tag) { - foreach ($this->container->findByTag($tag) as $serviceId => $meta) { - /** @var AmqpMember $service */ + foreach (\array_keys($this->container->findByTag($tag)) as $serviceId) { + /** @var \Kdyby\RabbitMq\AmqpMember $service */ $service = $this->container->getService($serviceId); $service->setupFabric(); } diff --git a/src/Kdyby/RabbitMq/Command/StdInProducerCommand.php b/src/Kdyby/RabbitMq/Command/StdInProducerCommand.php index 35ac1695..1cccef55 100644 --- a/src/Kdyby/RabbitMq/Command/StdInProducerCommand.php +++ b/src/Kdyby/RabbitMq/Command/StdInProducerCommand.php @@ -1,20 +1,15 @@ - * @author Filip Procházka - */ -class StdInProducerCommand extends Command +class StdInProducerCommand extends \Symfony\Component\Console\Command\Command { /** @@ -23,39 +18,30 @@ class StdInProducerCommand extends Command */ public $connection; - - - protected function configure() + protected function configure(): void { $this ->setName('rabbitmq:stdin-producer') ->setDescription('Creates message from given STDIN and passes it to configured producer') ->addArgument('name', InputArgument::REQUIRED, 'Producer Name') - ->addOption('debug', 'd', InputOption::VALUE_OPTIONAL, 'Enable Debugging', false); + ->addOption('debug', 'd', InputOption::VALUE_OPTIONAL, 'Enable Debugging', FALSE); } - - /** * Executes the current command. - * - * @param InputInterface $input An InputInterface instance - * @param OutputInterface $output An OutputInterface instance - * - * @return integer 0 if everything went fine, or an error code */ - protected function execute(InputInterface $input, OutputInterface $output) + protected function execute(InputInterface $input, OutputInterface $output): void { - define('AMQP_DEBUG', (bool) $input->getOption('debug')); + \define('AMQP_DEBUG', (bool) $input->getOption('debug')); $producer = $this->connection->getProducer($input->getArgument('name')); $data = ''; - while (!feof(STDIN)) { - $data .= fread(STDIN, 8192); + while (!\feof(STDIN)) { + $data .= \fread(STDIN, 8192); } - $producer->publish(serialize($data)); + $producer->publish(\serialize($data)); } } diff --git a/src/Kdyby/RabbitMq/Connection.php b/src/Kdyby/RabbitMq/Connection.php index 6fbe36b5..bdd44eba 100644 --- a/src/Kdyby/RabbitMq/Connection.php +++ b/src/Kdyby/RabbitMq/Connection.php @@ -1,26 +1,21 @@ - */ -class Connection extends PhpAmqpLib\Connection\AMQPLazyConnection implements IConnection +class Connection extends \PhpAmqpLib\Connection\AMQPLazyConnection implements \Kdyby\RabbitMq\IConnection { /** - * @var Nette\DI\Container + * @var \Nette\DI\Container */ private $serviceLocator; /** - * @var Diagnostics\Panel + * @var \Kdyby\RabbitMq\Diagnostics\Panel */ private $panel; @@ -29,72 +24,63 @@ class Connection extends PhpAmqpLib\Connection\AMQPLazyConnection implements ICo */ private $serviceMap = []; - - - /** - * @param string $name - * @return BaseConsumer - */ - public function getConsumer($name) + public function getConsumer(string $name): \Kdyby\RabbitMq\Consumer { if (!isset($this->serviceMap['consumer'][$name])) { - throw new InvalidArgumentException("Unknown consumer {$name}"); + throw new \Kdyby\RabbitMq\Exception\InvalidArgumentException( + \sprintf('Unknown consumer %s', $name) + ); } return $this->serviceLocator->getService($this->serviceMap['consumer'][$name]); } - - - /** - * @param $name - * @return Producer - */ - public function getProducer($name) + public function getProducer(string $name): Producer { if (!isset($this->serviceMap['producer'][$name])) { - throw new InvalidArgumentException("Unknown producer {$name}"); + throw new \Kdyby\RabbitMq\Exception\InvalidArgumentException( + \sprintf('Unknown producer %s', $name) + ); } return $this->serviceLocator->getService($this->serviceMap['producer'][$name]); } - - - /** - * @param $name - * @return RpcClient - */ - public function getRpcClient($name) + public function getRpcClient(string $name): RpcClient { if (!isset($this->serviceMap['rpcClient'][$name])) { - throw new InvalidArgumentException("Unknown RPC client {$name}"); + throw new \Kdyby\RabbitMq\Exception\InvalidArgumentException( + \sprintf('Unknown RPC client %s', $name) + ); } return $this->serviceLocator->getService($this->serviceMap['rpcClient'][$name]); } - - - /** - * @param $name - * @return RpcServer - */ - public function getRpcServer($name) + public function getRpcServer(string $name): RpcServer { if (!isset($this->serviceMap['rpcServer'][$name])) { - throw new InvalidArgumentException("Unknown RPC server {$name}"); + throw new \Kdyby\RabbitMq\Exception\InvalidArgumentException( + \sprintf('Unknown RPC server %s', $name) + ); } return $this->serviceLocator->getService($this->serviceMap['rpcServer'][$name]); } - - /** * @internal + * @param array<\Kdyby\RabbitMq\IProducer> $producers + * @param array<\Kdyby\RabbitMq\IConsumer> $consumers + * @param array<\Kdyby\RabbitMq\RpcClient> $rpcClients + * @param array<\Kdyby\RabbitMq\RpcServer> $rpcServers */ - public function injectServiceMap(array $producers, array $consumers, array $rpcClients, array $rpcServers) + public function injectServiceMap( + array $producers, + array $consumers, + array $rpcClients, + array $rpcServers + ): void { $this->serviceMap = [ 'consumer' => $consumers, @@ -104,56 +90,46 @@ public function injectServiceMap(array $producers, array $consumers, array $rpcC ]; } - - /** * @internal - * @param Nette\DI\Container $sl + * @param \Nette\DI\Container $sl */ - public function injectServiceLocator(Nette\DI\Container $sl) + public function injectServiceLocator(Nette\DI\Container $sl): void { $this->serviceLocator = $sl; } - - /** * @internal - * @param Diagnostics\Panel $panel + * @param \Kdyby\RabbitMq\Diagnostics\Panel $panel */ - public function injectPanel(Diagnostics\Panel $panel) + public function injectPanel(Diagnostics\Panel $panel): void { $this->panel = $panel->register($this); } - - /** * Fetch a Channel object identified by the numeric channel_id, or * create that object if it doesn't already exist. * * @param string $id - * @return Channel + * @return \Kdyby\RabbitMq\Channel + * @throws \Exception */ - public function channel($id = null) + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint + public function channel($id = NULL): Channel { if (isset($this->channels[$id])) { return $this->channels[$id]; } $this->connect(); - $id = $id ? $id : $this->get_free_channel_id(); + $id = $id ?: $this->get_free_channel_id(); - return $this->channels[$id] = $this->doCreateChannel($id); + return $this->channels[$id] = $this->doCreateChannel((string) $id); } - - - /** - * @param string $id - * @return Channel - */ - protected function doCreateChannel($id) + protected function doCreateChannel(string $id): Channel { $channel = new Channel($this->connection, $id); diff --git a/src/Kdyby/RabbitMq/Consumer.php b/src/Kdyby/RabbitMq/Consumer.php index 8fe5aa94..59578792 100644 --- a/src/Kdyby/RabbitMq/Consumer.php +++ b/src/Kdyby/RabbitMq/Consumer.php @@ -1,26 +1,21 @@ - * @author Filip Procházka - * - * @method onStart(Consumer $self) - * @method onConsume(Consumer $self, AMQPMessage $msg) - * @method onReject(Consumer $self, AMQPMessage $msg, $processFlag) - * @method onAck(Consumer $self, AMQPMessage $msg) - * @method onError(Consumer $self, AMQPExceptionInterface $e) - * @method onTimeout(Consumer $self) + * @method onStart(\Kdyby\RabbitMq\Consumer $self) + * @method onConsume(\Kdyby\RabbitMq\Consumer $self, \PhpAmqpLib\Message\AMQPMessage $msg) + * @method onReject(\Kdyby\RabbitMq\Consumer $self, \PhpAmqpLib\Message\AMQPMessage $msg, $processFlag) + * @method onAck(\Kdyby\RabbitMq\Consumer $self, \PhpAmqpLib\Message\AMQPMessage $msg) + * @method onError(\Kdyby\RabbitMq\Consumer $self, \PhpAmqpLib\Exception\AMQPExceptionInterface $e) + * @method onTimeout(\Kdyby\RabbitMq\Consumer $self) */ -class Consumer extends BaseConsumer +class Consumer extends \Kdyby\RabbitMq\BaseConsumer { /** @@ -63,62 +58,54 @@ class Consumer extends BaseConsumer */ protected $memoryLimit; - - /** * Set the memory limit * * @param int $memoryLimit */ - public function setMemoryLimit($memoryLimit) + public function setMemoryLimit(int $memoryLimit): void { $this->memoryLimit = $memoryLimit; } - - /** * Get the memory limit - * - * @return int */ - public function getMemoryLimit() + public function getMemoryLimit(): ?int { return $this->memoryLimit; } - - - public function consume($msgAmount) + public function consume(int $msgAmount): void { $this->target = $msgAmount; $this->setupConsumer(); $this->onStart($this); - $previousErrorHandler = set_error_handler(function ($severity, $message, $file, $line, $context) use (&$previousErrorHandler) { - if (!preg_match('~stream_select\\(\\)~i', $message)) { - $args = func_get_args(); - return call_user_func_array($previousErrorHandler, $args); + $previousErrorHandler = \set_error_handler(static function ($severity, $message, $file, $line, $context) use (&$previousErrorHandler) { + if (!\preg_match('~stream_select\\(\\)~i', $message)) { + $args = \func_get_args(); + return \call_user_func_array($previousErrorHandler, $args); } - throw new AMQPRuntimeException($message . ' in ' . $file . ':' . $line, (int) $severity); + throw new \PhpAmqpLib\Exception\AMQPRuntimeException($message . ' in ' . $file . ':' . $line, (int) $severity); }); try { - while (count($this->getChannel()->callbacks)) { + while (\count($this->getChannel()->callbacks)) { $this->maybeStopConsumer(); try { $this->getChannel()->wait(NULL, FALSE, $this->getIdleTimeout()); - } catch (AMQPTimeoutException $e) { + } catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) { $this->onTimeout($this); // nothing bad happened, right? // intentionally not throwing the exception } } - } catch (AMQPRuntimeException $e) { - restore_error_handler(); + } catch (\PhpAmqpLib\Exception\AMQPRuntimeException $e) { + \restore_error_handler(); // sending kill signal to the consumer causes the stream_select to return false // the reader doesn't like the false value, so it throws AMQPRuntimeException @@ -129,63 +116,61 @@ public function consume($msgAmount) } } catch (AMQPExceptionInterface $e) { - restore_error_handler(); + \restore_error_handler(); $this->onError($this, $e); throw $e; - } catch (TerminateException $e) { + } catch (\Kdyby\RabbitMq\Exception\TerminateException $e) { $this->stopConsuming(); } } - - /** * Purge the queue */ - public function purge() + public function purge(): void { - $this->getChannel()->queue_purge($this->queueOptions['name'], true); + $this->getChannel()->queue_purge($this->queueOptions['name'], TRUE); } - - - public function processMessage(AMQPMessage $msg) + public function processMessage(AMQPMessage $msg): void { $this->onConsume($this, $msg); try { - $processFlag = call_user_func($this->callback, $msg); + $processFlag = \call_user_func($this->callback, $msg); $this->handleProcessMessage($msg, $processFlag); - } catch (TerminateException $e) { + } catch (\Kdyby\RabbitMq\Exception\TerminateException $e) { $this->handleProcessMessage($msg, $e->getResponse()); throw $e; - } catch (\Exception $e) { + } catch (\Throwable $e) { $this->onReject($this, $msg, IConsumer::MSG_REJECT_REQUEUE); throw $e; } } - - - protected function handleProcessMessage(AMQPMessage $msg, $processFlag) + /** + * @param \PhpAmqpLib\Message\AMQPMessage $msg + * @param int|bool $processFlag + */ + protected function handleProcessMessage(AMQPMessage $msg, $processFlag): void { - if ($processFlag === IConsumer::MSG_REJECT_REQUEUE || false === $processFlag) { + if ($processFlag === IConsumer::MSG_REJECT_REQUEUE || $processFlag === FALSE) { // Reject and requeue message to RabbitMQ - $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true); + $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], TRUE); $this->onReject($this, $msg, $processFlag); } elseif ($processFlag === IConsumer::MSG_SINGLE_NACK_REQUEUE) { // NACK and requeue message to RabbitMQ - $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, true); + $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], FALSE, TRUE); $this->onReject($this, $msg, $processFlag); } else { if ($processFlag === IConsumer::MSG_REJECT) { // Reject and drop - $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false); + $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], FALSE); $this->onReject($this, $msg, $processFlag); } else { @@ -203,20 +188,18 @@ protected function handleProcessMessage(AMQPMessage $msg, $processFlag) } } - - /** * Checks if memory in use is greater or equal than memory allowed for this process * - * @return boolean + * @return bool */ - protected function isRamAlmostOverloaded() + protected function isRamAlmostOverloaded(): bool { if ($this->getMemoryLimit() === NULL) { return FALSE; } - return memory_get_usage(true) >= ($this->getMemoryLimit() * 1024 * 1024); + return \memory_get_usage(TRUE) >= ($this->getMemoryLimit() * 1024 * 1024); } } diff --git a/src/Kdyby/RabbitMq/DI/ClassAliasMap.php b/src/Kdyby/RabbitMq/DI/ClassAliasMap.php new file mode 100644 index 00000000..d881ec49 --- /dev/null +++ b/src/Kdyby/RabbitMq/DI/ClassAliasMap.php @@ -0,0 +1,10 @@ + \Kdyby\RabbitMq\Exception\Exception::class, + \Kdyby\RabbitMq\InvalidArgumentException::class => \Kdyby\RabbitMq\Exception\InvalidArgumentException::class, + \Kdyby\RabbitMq\QueueNotFoundException::class => \Kdyby\RabbitMq\Exception\QueueNotFoundException::class, + \Kdyby\RabbitMq\TerminateException::class => \Kdyby\RabbitMq\Exception\TerminateException::class, +]; diff --git a/src/Kdyby/RabbitMq/DI/IConsumersProvider.php b/src/Kdyby/RabbitMq/DI/IConsumersProvider.php index ea20654a..a988f72b 100644 --- a/src/Kdyby/RabbitMq/DI/IConsumersProvider.php +++ b/src/Kdyby/RabbitMq/DI/IConsumersProvider.php @@ -1,5 +1,7 @@ - */ interface IConsumersProvider { /** * Returns array of name => array config. * - * @return array + * @return array> */ - function getRabbitConsumers(); + public function getRabbitConsumers(): array; + } diff --git a/src/Kdyby/RabbitMq/DI/IProducersProvider.php b/src/Kdyby/RabbitMq/DI/IProducersProvider.php index 2bdc8172..c850becd 100644 --- a/src/Kdyby/RabbitMq/DI/IProducersProvider.php +++ b/src/Kdyby/RabbitMq/DI/IProducersProvider.php @@ -1,5 +1,7 @@ - */ interface IProducersProvider { /** * Returns array of name => array config. * - * @return array + * @return array> */ - function getRabbitProducers(); + public function getRabbitProducers(): array; + } diff --git a/src/Kdyby/RabbitMq/DI/IRpcClientsProvider.php b/src/Kdyby/RabbitMq/DI/IRpcClientsProvider.php index 933c6dff..98e6ecf9 100644 --- a/src/Kdyby/RabbitMq/DI/IRpcClientsProvider.php +++ b/src/Kdyby/RabbitMq/DI/IRpcClientsProvider.php @@ -1,5 +1,7 @@ - */ interface IRpcClientsProvider { /** * Returns array of name => array config. * - * @return array + * @return array> */ - function getRabbitRpcClients(); + public function getRabbitRpcClients(): array; + } diff --git a/src/Kdyby/RabbitMq/DI/IRpcServersProvider.php b/src/Kdyby/RabbitMq/DI/IRpcServersProvider.php index 0a175965..1d63b907 100644 --- a/src/Kdyby/RabbitMq/DI/IRpcServersProvider.php +++ b/src/Kdyby/RabbitMq/DI/IRpcServersProvider.php @@ -1,5 +1,7 @@ - */ interface IRpcServersProvider { /** * Returns array of name => array config. * - * @return array + * @return array> */ - function getRabbitRpcServers(); + public function getRabbitRpcServers(): array; + } diff --git a/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php b/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php index 2a6cf5eb..0229227d 100644 --- a/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php +++ b/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php @@ -1,5 +1,7 @@ - * @author Filip Procházka - */ -class RabbitMqExtension extends Nette\DI\CompilerExtension +class RabbitMqExtension extends \Nette\DI\CompilerExtension { - const TAG_PRODUCER = 'kdyby.rabbitmq.producer'; - const TAG_CONSUMER = 'kdyby.rabbitmq.consumer'; - const TAG_RPC_CLIENT = 'kdyby.rabbitmq.rpc.client'; - const TAG_RPC_SERVER = 'kdyby.rabbitmq.rpc.server'; + public const TAG_COMMAND_KDYBY = 'kdyby.console.command'; + public const TAG_COMMAND = 'console.command'; + public const TAG_PRODUCER = 'kdyby.rabbitmq.producer'; + public const TAG_CONSUMER = 'kdyby.rabbitmq.consumer'; + public const TAG_RPC_CLIENT = 'kdyby.rabbitmq.rpc.client'; + public const TAG_RPC_SERVER = 'kdyby.rabbitmq.rpc.server'; + public const EXTENDS_KEY = '_extends'; /** * @var array @@ -60,7 +56,7 @@ class RabbitMqExtension extends Nette\DI\CompilerExtension */ public $producersDefaults = [ 'connection' => 'default', - 'class' => 'Kdyby\RabbitMq\Producer', + 'class' => \Kdyby\RabbitMq\Producer::class, 'exchange' => [], 'queue' => [], 'contentType' => 'text/plain', @@ -150,38 +146,46 @@ class RabbitMqExtension extends Nette\DI\CompilerExtension */ private $producersConfig = []; - - - public function loadConfiguration() + /** + * @throws \Nette\Utils\AssertionException + */ + public function loadConfiguration(): void { $builder = $this->getContainerBuilder(); - $config = $this->getConfig($this->defaults); + $config = \Nette\DI\Config\Helpers::merge($this->getConfig(), $this->defaults); foreach ($this->compiler->getExtensions() as $extension) { if ($extension instanceof IProducersProvider) { $producers = $extension->getRabbitProducers(); Validators::assert($producers, 'array:1..'); - $config['producers'] = array_merge($config['producers'], $producers); + $config['producers'] = \array_merge($config['producers'], $producers); } if ($extension instanceof IConsumersProvider) { $consumers = $extension->getRabbitConsumers(); Validators::assert($consumers, 'array:1..'); - $config['consumers'] = array_merge($config['consumers'], $consumers); + $config['consumers'] = \array_merge($config['consumers'], $consumers); } if ($extension instanceof IRpcClientsProvider) { $rpcClients = $extension->getRabbitRpcClients(); Validators::assert($rpcClients, 'array:1..'); - $config['rpcClients'] = array_merge($config['rpcClients'], $rpcClients); + $config['rpcClients'] = \array_merge($config['rpcClients'], $rpcClients); } if ($extension instanceof IRpcServersProvider) { $rpcServers = $extension->getRabbitRpcServers(); Validators::assert($rpcServers, 'array:1..'); - $config['rpcServers'] = array_merge($config['rpcServers'], $rpcServers); + $config['rpcServers'] = \array_merge($config['rpcServers'], $rpcServers); } } - if ($unexpected = array_diff(array_keys($config), array_keys($this->defaults))) { - throw new Nette\Utils\AssertionException("Unexpected key '" . implode("', '", $unexpected) . "' in configuration of {$this->name}."); + $unexpected = \array_diff(\array_keys($config), \array_keys($this->defaults)); + if ($unexpected) { + throw new \Nette\Utils\AssertionException( + \sprintf( + 'Unexpected key \'%s\' in configuration of %s.', + \implode("', '", $unexpected), + $this->name + ) + ); } $builder->parameters[$this->name] = $config; @@ -192,17 +196,18 @@ public function loadConfiguration() $this->loadRpcClients($config['rpcClients']); $this->loadRpcServers($config['rpcServers']); - foreach ($this->connectionsMeta as $name => $meta) { + foreach ($this->connectionsMeta as $meta) { + /** @var \Nette\DI\Definitions\ServiceDefinition $connection */ $connection = $builder->getDefinition($meta['serviceId']); if ($config['debugger']) { $builder->addDefinition($panelService = $meta['serviceId'] . '.panel') - ->setClass('Kdyby\RabbitMq\Diagnostics\Panel') + ->setType(\Kdyby\RabbitMq\Diagnostics\Panel::class) ->addSetup('injectServiceMap', [ $meta['consumers'], $meta['rpcServers'], ]) - ->setInject(FALSE) + ->addTag(\Nette\DI\Extensions\InjectExtension::TAG_INJECT, FALSE) ->setAutowired(FALSE); $connection->addSetup('injectPanel', ['@' . $panelService]); @@ -220,16 +225,16 @@ public function loadConfiguration() $this->loadConsole(); } - - - public function beforeCompile() + public function beforeCompile(): void { unset($this->getContainerBuilder()->parameters[$this->name]); } - - - protected function loadConnections($connections) + /** + * @param array $connections + * @throws \Nette\Utils\AssertionException + */ + protected function loadConnections(array $connections): void { $this->connectionsMeta = []; // reset @@ -241,17 +246,35 @@ protected function loadConnections($connections) foreach ($connections as $name => $config) { $config = $this->mergeConfig($config, $this->connectionDefaults); - Nette\Utils\Validators::assertField($config, 'user', 'string:3..', "The config item '%' of connection {$this->name}.{$name}"); - Nette\Utils\Validators::assertField($config, 'password', 'string:3..', "The config item '%' of connection {$this->name}.{$name}"); + Nette\Utils\Validators::assertField( + $config, + 'user', + 'string:3..', + \sprintf( + 'The config item \'%%\' of connection %s.%s', + $this->name, + $name + ) + ); + Nette\Utils\Validators::assertField( + $config, + 'password', + 'string:3..', + \sprintf( + 'The config item \'%%\' of connection %s.%s', + $this->name, + $name + ) + ); $connection = $builder->addDefinition($serviceName = $this->prefix($name . '.connection')) - ->setClass('Kdyby\RabbitMq\Connection') + ->setType(\Kdyby\RabbitMq\Connection::class) ->setArguments([ $config['host'], $config['port'], $config['user'], $config['password'], - $config['vhost'] + $config['vhost'], ]); $this->connectionsMeta[$name] = [ @@ -263,15 +286,17 @@ protected function loadConnections($connections) ]; // only the first connection is autowired - if (count($this->connectionsMeta) > 1) { + if (\count($this->connectionsMeta) > 1) { $connection->setAutowired(FALSE); } } } - - - protected function loadProducers($producers) + /** + * @param array $producers + * @throws \Nette\Utils\AssertionException + */ + protected function loadProducers(array $producers): void { $builder = $this->getContainerBuilder(); @@ -279,12 +304,19 @@ protected function loadProducers($producers) $config = $this->mergeConfig($config, ['autoSetupFabric' => $builder->parameters[$this->name]['autoSetupFabric']] + $this->producersDefaults); if (!isset($this->connectionsMeta[$config['connection']])) { - throw new Nette\Utils\AssertionException("Connection {$config['connection']} required in producer {$this->name}.{$name} was not defined."); + throw new \Nette\Utils\AssertionException( + \sprintf( + 'Connection %s required in producer %s.%s was not defined.', + $config['connection'], + $this->name, + $name + ) + ); } $producer = $builder->addDefinition($serviceName = $this->prefix('producer.' . $name)) ->setFactory($config['class'], ['@' . $this->connectionsMeta[$config['connection']]['serviceId']]) - ->setClass('Kdyby\RabbitMq\IProducer') + ->setType(\Kdyby\RabbitMq\IProducer::class) ->addSetup('setContentType', [$config['contentType']]) ->addSetup('setDeliveryMode', [$config['deliveryMode']]) ->addSetup('setRoutingKey', [$config['routingKey']]) @@ -292,8 +324,26 @@ protected function loadProducers($producers) if (!empty($config['exchange'])) { $config['exchange'] = $this->mergeConfig($config['exchange'], $this->exchangeDefaults); - Nette\Utils\Validators::assertField($config['exchange'], 'name', 'string:3..', "The config item 'exchange.%' of producer {$this->name}.{$name}"); - Nette\Utils\Validators::assertField($config['exchange'], 'type', 'string:3..', "The config item 'exchange.%' of producer {$this->name}.{$name}"); + Nette\Utils\Validators::assertField( + $config['exchange'], + 'name', + 'string:3..', + \sprintf( + 'The config item \'exchange.%%\' of producer %s.%s', + $this->name, + $name + ) + ); + Nette\Utils\Validators::assertField( + $config['exchange'], + 'type', + 'string:3..', + \sprintf( + "The config item 'exchange.%%' of producer %s.%s", + $this->name, + $name + ) + ); $producer->addSetup('setExchangeOptions', [$config['exchange']]); } @@ -309,9 +359,11 @@ protected function loadProducers($producers) } } - - - protected function loadConsumers($consumers) + /** + * @param array $consumers + * @throws \Nette\Utils\AssertionException + */ + protected function loadConsumers(array $consumers): void { $builder = $this->getContainerBuilder(); @@ -320,7 +372,14 @@ protected function loadConsumers($consumers) $config = $this->extendConsumerFromProducer($name, $config); if (!isset($this->connectionsMeta[$config['connection']])) { - throw new Nette\Utils\AssertionException("Connection {$config['connection']} required in consumer {$this->name}.{$name} was not defined."); + throw new \Nette\Utils\AssertionException( + \sprintf( + 'Connection %s required in consumer %s.%s was not defined.', + $config['connection'], + $this->name, + $name + ) + ); } $consumer = $builder->addDefinition($serviceName = $this->prefix('consumer.' . $name)) @@ -328,8 +387,22 @@ protected function loadConsumers($consumers) ->setAutowired(FALSE); if (!empty($config['exchange'])) { - Nette\Utils\Validators::assertField($config['exchange'], 'name', 'string:3..', "The config item 'exchange.%' of consumer {$this->name}.{$name}"); - Nette\Utils\Validators::assertField($config['exchange'], 'type', 'string:3..', "The config item 'exchange.%' of consumer {$this->name}.{$name}"); + Nette\Utils\Validators::assertField( + $config['exchange'], + 'name', + 'string:3..', + \sprintf( + 'The config item \'exchange.%%\' of consumer %s.%s', + $this->name, + $name + ) + ); + Nette\Utils\Validators::assertField( + $config['exchange'], + 'type', + 'string:3..', + \sprintf('The config item \'exchange.%%\' of consumer %s.%s', $this->name, $name) + ); $consumer->addSetup('setExchangeOptions', [$this->mergeConfig($config['exchange'], $this->exchangeDefaults)]); } @@ -344,24 +417,24 @@ protected function loadConsumers($consumers) } $consumer - ->setClass('Kdyby\RabbitMq\MultipleConsumer') + ->setType(\Kdyby\RabbitMq\MultipleConsumer::class) ->addSetup('setQueues', [$config['queues']]); } elseif (empty($config['queues']) && !empty($config['queue'])) { $consumer - ->setClass('Kdyby\RabbitMq\Consumer') + ->setType(\Kdyby\RabbitMq\Consumer::class) ->addSetup('setQueueOptions', [$this->mergeConfig($config['queue'], $this->queueDefaults)]) ->addSetup('setCallback', [self::fixCallback($config['callback'])]); } else { $consumer - ->setClass('Kdyby\RabbitMq\AnonymousConsumer') + ->setType(\Kdyby\RabbitMq\AnonymousConsumer::class) ->addSetup('setCallback', [self::fixCallback($config['callback'])]); } $consumer->setArguments(['@' . $this->connectionsMeta[$config['connection']]['serviceId']]); - if (array_filter($config['qos'])) { // has values + if (\array_filter($config['qos'])) { // has values $config['qos'] = $this->mergeConfig($config['qos'], $this->qosDefaults); $consumer->addSetup('setQosOptions', [ $config['qos']['prefetchSize'], @@ -382,14 +455,22 @@ protected function loadConsumers($consumers) } } - - - private function extendConsumerFromProducer(&$consumerName, $config) + /** + * @param string $consumerName + * @param array $config + * @return array + * @throws \Nette\Utils\AssertionException + */ + private function extendConsumerFromProducer(string &$consumerName, array $config): array { - if (isset($config[Config\Helpers::EXTENDS_KEY])) { - $producerName = $config[Config\Helpers::EXTENDS_KEY]; - - } elseif ($m = Nette\Utils\Strings::match($consumerName, '~^(?P[^>\s]+)\s*\<\s*(?P[^>\s]+)\z~')) { + $m = Nette\Utils\Strings::match( + $consumerName, + '~^(?P[^>\s]+)\s*\<\s*(?P[^>\s]+)\z~' + ); + if (isset($config[self::EXTENDS_KEY])) { + $producerName = $config[self::EXTENDS_KEY]; + + } elseif ($m) { $consumerName = $m['consumerName']; $producerName = $m['producerName']; @@ -398,7 +479,15 @@ private function extendConsumerFromProducer(&$consumerName, $config) } if ( ! isset($this->producersConfig[$producerName])) { - throw new Nette\Utils\AssertionException("Consumer {$this->name}.{$consumerName} cannot extend unknown producer {$this->name}.{$producerName}."); + throw new \Nette\Utils\AssertionException( + \sprintf( + 'Consumer %s.%s cannot extend unknown producer %s.%s.', + $this->name, + $consumerName, + $this->name, + $producerName + ) + ); } $producerConfig = $this->producersConfig[$producerName]; @@ -413,9 +502,11 @@ private function extendConsumerFromProducer(&$consumerName, $config) return $config; } - - - protected function loadRpcClients($clients) + /** + * @param array $clients + * @throws \Nette\Utils\AssertionException + */ + protected function loadRpcClients(array $clients): void { $builder = $this->getContainerBuilder(); @@ -423,11 +514,21 @@ protected function loadRpcClients($clients) $config = $this->mergeConfig($config, $this->rpcClientDefaults); if (!isset($this->connectionsMeta[$config['connection']])) { - throw new Nette\Utils\AssertionException("Connection {$config['connection']} required in rpc client {$this->name}.{$name} was not defined."); + throw new \Nette\Utils\AssertionException( + \sprintf( + 'Connection %s required in rpc client %s.%s was not defined.', + $config['connection'], + $this->name, + $name + ) + ); } $builder->addDefinition($serviceName = $this->prefix('rpcClient.' . $name)) - ->setClass('Kdyby\RabbitMq\RpcClient', ['@' . $this->connectionsMeta[$config['connection']]['serviceId']]) + ->setType(\Kdyby\RabbitMq\RpcClient::class) + ->setArguments([ + '@' . $this->connectionsMeta[$config['connection']]['serviceId'], + ]) ->addSetup('initClient', [$config['expectSerializedResponse']]) ->addTag(self::TAG_RPC_CLIENT) ->setAutowired(FALSE); @@ -436,9 +537,11 @@ protected function loadRpcClients($clients) } } - - - protected function loadRpcServers($servers) + /** + * @param array $servers + * @throws \Nette\Utils\AssertionException + */ + protected function loadRpcServers(array $servers): void { $builder = $this->getContainerBuilder(); @@ -446,17 +549,25 @@ protected function loadRpcServers($servers) $config = $this->mergeConfig($config, $this->rpcServerDefaults); if (!isset($this->connectionsMeta[$config['connection']])) { - throw new Nette\Utils\AssertionException("Connection {$config['connection']} required in rpc server {$this->name}.{$name} was not defined."); + throw new \Nette\Utils\AssertionException( + \sprintf( + 'Connection %s required in rpc server %s.%s was not defined.', + $config['connection'], + $this->name, + $name + ) + ); } $rpcServer = $builder->addDefinition($serviceName = $this->prefix('rpcServer.' . $name)) - ->setClass('Kdyby\RabbitMq\RpcServer', ['@' . $this->connectionsMeta[$config['connection']]['serviceId']]) + ->setType(\Kdyby\RabbitMq\RpcServer::class) + ->setArguments(['@' . $this->connectionsMeta[$config['connection']]['serviceId']]) ->addSetup('initServer', [$name]) ->addSetup('setCallback', [self::fixCallback($config['callback'])]) ->addTag(self::TAG_RPC_SERVER) ->setAutowired(FALSE); - if (array_filter($config['qos'])) { // has values + if (\array_filter($config['qos'])) { // has values $config['qos'] = $this->mergeConfig($config['qos'], $this->qosDefaults); $rpcServer->addSetup('setQosOptions', [ $config['qos']['prefetchSize'], @@ -469,66 +580,64 @@ protected function loadRpcServers($servers) } } - - - private function loadConsole() + private function loadConsole(): void { - if (!class_exists('Kdyby\Console\DI\ConsoleExtension') || PHP_SAPI !== 'cli') { + if (!\class_exists(\Symfony\Component\Console\Command\Command::class) || PHP_SAPI !== 'cli') { return; } $builder = $this->getContainerBuilder(); foreach ([ - 'Kdyby\RabbitMq\Command\ConsumerCommand', - 'Kdyby\RabbitMq\Command\PurgeConsumerCommand', - 'Kdyby\RabbitMq\Command\RpcServerCommand', - 'Kdyby\RabbitMq\Command\SetupFabricCommand', - 'Kdyby\RabbitMq\Command\StdInProducerCommand', + \Kdyby\RabbitMq\Command\ConsumerCommand::class, + \Kdyby\RabbitMq\Command\PurgeConsumerCommand::class, + \Kdyby\RabbitMq\Command\RpcServerCommand::class, + \Kdyby\RabbitMq\Command\SetupFabricCommand::class, + \Kdyby\RabbitMq\Command\StdInProducerCommand::class, ] as $i => $class) { $builder->addDefinition($this->prefix('console.' . $i)) - ->setClass($class) - ->addTag(Kdyby\Console\DI\ConsoleExtension::COMMAND_TAG); + ->setType($class) + ->addTag(self::TAG_COMMAND_KDYBY) + ->addTag(self::TAG_COMMAND); } } - - + /** + * @param array|string $config + * @param array|string $defaults + * @return array|string + */ protected function mergeConfig($config, $defaults) { - return Config\Helpers::merge($config, $this->compiler->getContainerBuilder()->expand($defaults)); + return Config\Helpers::merge( + $config, + \Nette\DI\Helpers::expand($defaults, $this->compiler->getContainerBuilder()->parameters) + ); } - - + /** + * @param string|mixed $callback + * @return string|mixed + */ protected static function fixCallback($callback) { - list($callback) = self::filterArgs($callback); - if ($callback instanceof Nette\DI\Statement && substr_count($callback->entity, '::') && empty($callback->arguments)) { - $callback = explode('::', $callback->entity, 2); + [$callback] = self::filterArgs($callback); + if ($callback instanceof Nette\DI\Statement && \substr_count($callback->entity, '::') && empty($callback->arguments)) { + $callback = \explode('::', $callback->entity, 2); } return $callback; } - - /** - * @param string|\stdClass $statement - * @return Nette\DI\Statement[] + * @param string|mixed $statement + * @return \Nette\DI\Statement[] */ - protected static function filterArgs($statement) - { - return Nette\DI\Compiler::filterArguments([is_string($statement) ? new Nette\DI\Statement($statement) : $statement]); - } - - - - public static function register(Nette\Configurator $configurator) + protected static function filterArgs($statement): array { - $configurator->onCompile[] = function ($config, Compiler $compiler) { - $compiler->addExtension('rabbitmq', new RabbitMqExtension()); - }; + return Nette\DI\Helpers::filterArguments([ + \is_string($statement) ? new Nette\DI\Statement($statement) : $statement, + ]); } } diff --git a/src/Kdyby/RabbitMq/Diagnostics/Panel.php b/src/Kdyby/RabbitMq/Diagnostics/Panel.php index d7a1ee8a..2b344217 100644 --- a/src/Kdyby/RabbitMq/Diagnostics/Panel.php +++ b/src/Kdyby/RabbitMq/Diagnostics/Panel.php @@ -1,5 +1,7 @@ - * * @property callable $begin * @property callable $failure * @property callable $success */ -class Panel implements IBarPanel +class Panel implements \Tracy\IBarPanel { - use Nette\SmartObject; + use \Nette\SmartObject; /** * @var array @@ -40,9 +36,11 @@ class Panel implements IBarPanel */ private $serviceMap = []; - - - public function injectServiceMap(array $consumers, array $rpcServers) + /** + * @param array $consumers + * @param array $rpcServers + */ + public function injectServiceMap(array $consumers, array $rpcServers): void { $this->serviceMap = [ 'consumer' => $consumers, @@ -50,49 +48,40 @@ public function injectServiceMap(array $consumers, array $rpcServers) ]; } - - - /** - * @return string - */ - public function getTab() + public function getTab(): string { - $img = Html::el('')->addHtml(file_get_contents(__DIR__ . '/rabbitmq-logo.svg')); - $tab = Html::el('span')->title('RabbitMq')->addHtml($img); - $title = Html::el('span')->class('tracy-label'); + $img = Html::el('')->addHtml(\file_get_contents(__DIR__ . '/rabbitmq-logo.svg')); + $tab = Html::el('span')->setAttribute('title', 'RabbitMq')->addHtml($img); + $title = Html::el('span')->setAttribute('class', 'tracy-label'); if ($this->messages) { - $title->setText(count($this->messages) . ' message' . (count($this->messages) > 1 ? 's' : '')); + $title->setText(\count($this->messages) . ' message' . (\count($this->messages) > 1 ? 's' : '')); } return (string) $tab->addHtml($title); } - - - /** - * @return string - */ - public function getPanel() + public function getPanel(): string { - $isRunning = function ($type, $name) { - if (strncasecmp(PHP_OS, 'WIN', 3) == 0) { + $isRunning = static function ($type, $name) { + if (\strncasecmp(PHP_OS, 'WIN', 3) === 0) { return FALSE; // sry, I don't know how to do this } - $command = sprintf('ps aux |grep %s |grep %s', + $command = \sprintf( + 'ps aux |grep %s |grep %s', ($type === 'consumer' ? 'rabbitmq:consumer' : 'rabbitmq:rpc-server'), - escapeshellarg($name) + \escapeshellarg($name) ); - if (!@exec($command, $output)) { + if (!@\exec($command, $output)) { return FALSE; } $instances = 0; foreach ($output as $line) { - if (stripos($line, '|grep') === FALSE) { - $instances += 1; + if (\stripos($line, '|grep') === FALSE) { + $instances++; } } @@ -102,43 +91,40 @@ public function getPanel() $workers = []; $runningWorkers = $configuredWorkers = 0; foreach ($this->serviceMap as $type => $services) { - foreach ($services as $name => $serviceId) { + foreach (\array_keys($services) as $name) { $workers[$key = $type . '/' . $name] = $isRunning($type, $name); + // phpcs:disable SlevomatCodingStandard.Variables.UnusedVariable.UnusedVariable $runningWorkers += (int) $workers[$key]; + // phpcs:disable SlevomatCodingStandard.Variables.UnusedVariable.UnusedVariable $configuredWorkers++; } } - ob_start(); - $esc = class_exists('Nette\Templating\Helpers') + \ob_start(); + // phpcs:disable SlevomatCodingStandard.Variables.UnusedVariable.UnusedVariable + $esc = \class_exists('Nette\Templating\Helpers') ? ['Nette\Templating\Helpers', 'escapeHtml'] : ['Latte\Runtime\Filters', 'escapeHtml']; - $click = class_exists('\Tracy\Dumper') - ? function ($o, $c = FALSE) { return \Tracy\Dumper::toHtml($o, ['collapse' => $c]); } + // phpcs:disable SlevomatCodingStandard.Variables.UnusedVariable.UnusedVariable + $click = \class_exists('\Tracy\Dumper') + ? static function ($o, $c = FALSE) { + return \Tracy\Dumper::toHtml($o, ['collapse' => $c]); + } : ['Tracy\Helpers', 'clickableDump']; require __DIR__ . '/panel.phtml'; - return ob_get_clean(); + return \ob_get_clean(); } - - /** - * @param $message - * @return object + * @param array $message */ - public function published($message) + public function published(array $message): void { $this->messages[] = $message; } - - - /** - * @param Connection $connection - * @return Panel - */ - public function register(Connection $connection) + public function register(Connection $connection): Panel { Debugger::getBar()->addPanel($this); return $this; diff --git a/src/Kdyby/RabbitMq/Exception/Exception.php b/src/Kdyby/RabbitMq/Exception/Exception.php new file mode 100644 index 00000000..a2a9c01a --- /dev/null +++ b/src/Kdyby/RabbitMq/Exception/Exception.php @@ -0,0 +1,13 @@ +response = $response; + + return $e; + } + + public function getResponse(): int + { + return $this->response; + } + +} diff --git a/src/Kdyby/RabbitMq/IConnection.php b/src/Kdyby/RabbitMq/IConnection.php index 5e8f4fe6..3ef3ee2e 100644 --- a/src/Kdyby/RabbitMq/IConnection.php +++ b/src/Kdyby/RabbitMq/IConnection.php @@ -1,46 +1,18 @@ - */ interface IConnection { - /** - * @param string $name - * @return BaseConsumer - */ - function getConsumer($name); - - - - /** - * @param $name - * @return Producer - */ - function getProducer($name); - - - - /** - * @param $name - * @return RpcClient - */ - function getRpcClient($name); + public function getConsumer(string $name): \Kdyby\RabbitMq\Consumer; + public function getProducer(string $name): \Kdyby\RabbitMq\Producer; + public function getRpcClient(string $name): \Kdyby\RabbitMq\RpcClient; - /** - * @param $name - * @return RpcServer - */ - function getRpcServer($name); + public function getRpcServer(string $name): \Kdyby\RabbitMq\RpcServer; } diff --git a/src/Kdyby/RabbitMq/IConsumer.php b/src/Kdyby/RabbitMq/IConsumer.php index 5efd7198..4d2cf284 100644 --- a/src/Kdyby/RabbitMq/IConsumer.php +++ b/src/Kdyby/RabbitMq/IConsumer.php @@ -1,14 +1,11 @@ - * @author Filip Procházka */ interface IConsumer { @@ -16,21 +13,21 @@ interface IConsumer /** * Flag for message ack */ - const MSG_ACK = 1; + public const MSG_ACK = 1; /** * Flag single for message nack and requeue */ - const MSG_SINGLE_NACK_REQUEUE = 2; + public const MSG_SINGLE_NACK_REQUEUE = 2; /** * Flag for reject and requeue */ - const MSG_REJECT_REQUEUE = 0; + public const MSG_REJECT_REQUEUE = 0; /** * Flag for reject and drop */ - const MSG_REJECT = -1; + public const MSG_REJECT = -1; } diff --git a/src/Kdyby/RabbitMq/IProducer.php b/src/Kdyby/RabbitMq/IProducer.php index 42036424..d1e67e50 100644 --- a/src/Kdyby/RabbitMq/IProducer.php +++ b/src/Kdyby/RabbitMq/IProducer.php @@ -1,28 +1,33 @@ - */ interface IProducer { - function setExchangeOptions(array $options = []); + /** + * @param array $options + */ + public function setExchangeOptions(array $options = []): void; - function setQueueOptions(array $options = []); + /** + * @param array $options + */ + public function setQueueOptions(array $options = []): void; - function setRoutingKey($routingKey); + public function setRoutingKey(string $routingKey): void; - function setContentType($contentType); + public function setContentType(string $contentType): IProducer; - function setDeliveryMode($deliveryMode); + public function setDeliveryMode(int $deliveryMode): IProducer; - function publish($msgBody, $routingKey = '', $additionalProperties = []); + /** + * @param string $msgBody + * @param string|NULL $routingKey + * @param array $additionalProperties + */ + public function publish(string $msgBody, ?string $routingKey = '', array $additionalProperties = []): void; } diff --git a/src/Kdyby/RabbitMq/MultipleConsumer.php b/src/Kdyby/RabbitMq/MultipleConsumer.php index ba0f6481..c81cdfba 100644 --- a/src/Kdyby/RabbitMq/MultipleConsumer.php +++ b/src/Kdyby/RabbitMq/MultipleConsumer.php @@ -1,44 +1,36 @@ - * @author Filip Procházka - */ -class MultipleConsumer extends Consumer +class MultipleConsumer extends \Kdyby\RabbitMq\Consumer { - /** - * @var array - */ - public $onConsume = []; - /** * @var array[]|callable[][] */ protected $queues = []; - - - public function getQueueConsumerTag($queue) + public function getQueueConsumerTag(string $queue): string { - return sprintf('%s-%s', $this->getConsumerTag(), $queue); + return \sprintf('%s-%s', $this->getConsumerTag(), $queue); } - - - public function setQueues(array $queues) + /** + * @param array $queues + */ + public function setQueues(array $queues): void { $this->queues = []; foreach ($queues as $name => $queue) { if (!isset($queue['callback'])) { - throw new InvalidArgumentException("The queue '$name' is missing a callback."); + throw new \Kdyby\RabbitMq\Exception\InvalidArgumentException( + \sprintf("The queue '%s' is missing a callback.", $name) + ); } Callback::check($queue['callback']); @@ -46,19 +38,15 @@ public function setQueues(array $queues) } } - - /** - * @return \array[]|\callable[][] + * @return array */ - public function getQueues() + public function getQueues(): array { return $this->queues; } - - - protected function setupConsumer() + protected function setupConsumer(): void { if ($this->autoSetupFabric) { $this->setupFabric(); @@ -68,45 +56,42 @@ protected function setupConsumer() $this->qosDeclare(); } - foreach ($this->queues as $name => $options) { + foreach (\array_keys($this->queues) as $name) { $self = $this; - $this->getChannel()->basic_consume($name, $this->getQueueConsumerTag($name), false, false, false, false, function (AMQPMessage $msg) use ($self, $name) { + $this->getChannel()->basic_consume($name, $this->getQueueConsumerTag($name), FALSE, FALSE, FALSE, FALSE, static function (AMQPMessage $msg) use ($self, $name): void { $self->processQueueMessage($name, $msg); }); } } - - - protected function queueDeclare() + protected function queueDeclare(): void { foreach ($this->queues as $name => $options) { $this->doQueueDeclare($name, $options); } - $this->queueDeclared = true; + $this->queueDeclared = TRUE; } - - - public function processQueueMessage($queueName, AMQPMessage $msg) + public function processQueueMessage(string $queueName, AMQPMessage $msg): void { if (!isset($this->queues[$queueName])) { - throw new QueueNotFoundException(); + throw new \Kdyby\RabbitMq\Exception\QueueNotFoundException(); } $this->onConsume($this, $msg); try { - $processFlag = call_user_func($this->queues[$queueName]['callback'], $msg); + $processFlag = \call_user_func($this->queues[$queueName]['callback'], $msg); $this->handleProcessMessage($msg, $processFlag); - } catch (TerminateException $e) { + } catch (\Kdyby\RabbitMq\Exception\TerminateException $e) { $this->handleProcessMessage($msg, $e->getResponse()); throw $e; - } catch (\Exception $e) { + } catch (\Throwable $e) { $this->onReject($this, $msg, IConsumer::MSG_REJECT_REQUEUE); throw $e; } } + } diff --git a/src/Kdyby/RabbitMq/Producer.php b/src/Kdyby/RabbitMq/Producer.php index ed7ee654..b6ab6c85 100644 --- a/src/Kdyby/RabbitMq/Producer.php +++ b/src/Kdyby/RabbitMq/Producer.php @@ -1,16 +1,12 @@ - * @author Filip Procházka - */ -class Producer extends AmqpMember implements IProducer +class Producer extends \Kdyby\RabbitMq\AmqpMember implements \Kdyby\RabbitMq\IProducer { /** @@ -19,45 +15,43 @@ class Producer extends AmqpMember implements IProducer protected $contentType = 'text/plain'; /** - * @var string + * @var int */ protected $deliveryMode = 2; - - - public function setContentType($contentType) + public function setContentType(string $contentType): IProducer { $this->contentType = $contentType; return $this; } - - - public function setDeliveryMode($deliveryMode) + public function setDeliveryMode(int $deliveryMode): IProducer { $this->deliveryMode = $deliveryMode; return $this; } - - - protected function getBasicProperties() + /** + * @return array + */ + protected function getBasicProperties(): array { - return ['content_type' => $this->contentType, 'delivery_mode' => $this->deliveryMode]; + return [ + 'content_type' => $this->contentType, + 'delivery_mode' => $this->deliveryMode, + ]; } - - /** * Publishes the message and merges additional properties with basic properties * * @param string $msgBody - * @param string $routingKey If not provided or set to null, used default routingKey from configuration of this producer - * @param array $additionalProperties + * @param string|NULL $routingKey If not provided or set to null, used default routingKey from configuration of this producer + * @param array $additionalProperties */ - public function publish($msgBody, $routingKey = '', $additionalProperties = []) + public function publish(string $msgBody, ?string $routingKey = '', array $additionalProperties = []): void { if ($this->autoSetupFabric) { $this->setupFabric(); @@ -67,7 +61,8 @@ public function publish($msgBody, $routingKey = '', $additionalProperties = []) $routingKey = $this->routingKey; } - $msg = new AMQPMessage((string) $msgBody, array_merge($this->getBasicProperties(), $additionalProperties)); + $msg = new AMQPMessage($msgBody, \array_merge($this->getBasicProperties(), $additionalProperties)); $this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string) $routingKey); } + } diff --git a/src/Kdyby/RabbitMq/RpcClient.php b/src/Kdyby/RabbitMq/RpcClient.php index 88ff5edb..63c55b54 100644 --- a/src/Kdyby/RabbitMq/RpcClient.php +++ b/src/Kdyby/RabbitMq/RpcClient.php @@ -1,16 +1,12 @@ - * @author Filip Procházka - */ -class RpcClient extends AmqpMember +class RpcClient extends \Kdyby\RabbitMq\AmqpMember { /** @@ -38,24 +34,27 @@ class RpcClient extends AmqpMember */ protected $timeout = 0; - - - public function initClient($expectSerializedResponse = true) + public function initClient(bool $expectSerializedResponse = TRUE): void { - list($this->queueName,,) = $this->getChannel()->queue_declare( - "", - $passive = false, - $durable = false, - $exclusive = true, - $autoDelete = true + [$this->queueName] = $this->getChannel()->queue_declare( + '', + $passive = FALSE, + $durable = FALSE, + $exclusive = TRUE, + $autoDelete = TRUE ); $this->expectSerializedResponse = $expectSerializedResponse; } - - - public function addRequest($msgBody, $server, $requestId = null, $routingKey = '', $expiration = 0) + /** + * @param string $msgBody + * @param string $server + * @param mixed $requestId + * @param string $routingKey + * @param int $expiration + */ + public function addRequest(string $msgBody, string $server, $requestId = NULL, string $routingKey = '', int $expiration = 0): void { if (empty($requestId)) { throw new \InvalidArgumentException('You must provide a $requestId'); @@ -66,7 +65,7 @@ public function addRequest($msgBody, $server, $requestId = null, $routingKey = ' 'reply_to' => $this->queueName, 'delivery_mode' => 1, // non durable 'expiration' => $expiration * 1000, - 'correlation_id' => $requestId + 'correlation_id' => $requestId, ]); $this->getChannel()->basic_publish($msg, $server, $routingKey); @@ -78,15 +77,16 @@ public function addRequest($msgBody, $server, $requestId = null, $routingKey = ' } } - - - public function getReplies() + /** + * @return array + */ + public function getReplies(): array { $this->replies = []; - $this->getChannel()->basic_consume($this->queueName, '', false, true, false, false, [$this, 'processMessage']); + $this->getChannel()->basic_consume($this->queueName, '', FALSE, TRUE, FALSE, FALSE, [$this, 'processMessage']); - while (count($this->replies) < $this->requests) { - $this->getChannel()->wait(null, false, $this->timeout); + while (\count($this->replies) < $this->requests) { + $this->getChannel()->wait(NULL, FALSE, $this->timeout); } $this->getChannel()->basic_cancel($this->queueName); @@ -96,13 +96,11 @@ public function getReplies() return $this->replies; } - - - public function processMessage(AMQPMessage $msg) + public function processMessage(AMQPMessage $msg): void { $messageBody = $msg->body; if ($this->expectSerializedResponse) { - $messageBody = unserialize($messageBody); + $messageBody = \unserialize($messageBody); } $this->replies[$msg->get('correlation_id')] = $messageBody; diff --git a/src/Kdyby/RabbitMq/RpcServer.php b/src/Kdyby/RabbitMq/RpcServer.php index 91f2ba34..13be11a8 100644 --- a/src/Kdyby/RabbitMq/RpcServer.php +++ b/src/Kdyby/RabbitMq/RpcServer.php @@ -1,24 +1,19 @@ - * @author Filip Procházka - * - * @method onStart(RpcServer $self) - * @method onConsume(RpcServer $self, AMQPMessage $msg) - * @method onReply(RpcServer $self, $result) - * @method onError(RpcServer $self, AMQPExceptionInterface $e) + * @method onStart(\Kdyby\RabbitMq\RpcServer $self) + * @method onConsume(\Kdyby\RabbitMq\RpcServer $self, \PhpAmqpLib\Message\AMQPMessage $msg) + * @method onReply(\Kdyby\RabbitMq\RpcServer $self, $result) + * @method onError(\Kdyby\RabbitMq\RpcServer $self, \PhpAmqpLib\Exception\AMQPExceptionInterface $e) */ -class RpcServer extends BaseConsumer +class RpcServer extends \Kdyby\RabbitMq\BaseConsumer { /** @@ -46,34 +41,30 @@ class RpcServer extends BaseConsumer */ public $onError = []; - - - public function initServer($name) + public function initServer(string $name): void { $this->setExchangeOptions(['name' => $name, 'type' => 'direct']); $this->setQueueOptions(['name' => $name . '-queue']); } - - - public function start($msgAmount = 0) + public function start(int $msgAmount = 0): void { $this->target = $msgAmount; $this->setupConsumer(); $this->onStart($this); try { - while (count($this->getChannel()->callbacks)) { + while (\count($this->getChannel()->callbacks)) { $this->maybeStopConsumer(); try { $this->getChannel()->wait(NULL, FALSE, $this->getIdleTimeout()); - } catch (AMQPTimeoutException $e) { + } catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) { // nothing bad happened, right? } } - } catch (AMQPRuntimeException $e) { + } catch (\PhpAmqpLib\Exception\AMQPRuntimeException $e) { // sending kill signal to the consumer causes the stream_select to return false // the reader doesn't like the false value, so it throws AMQPRuntimeException $this->maybeStopConsumer(); @@ -88,34 +79,35 @@ public function start($msgAmount = 0) } } - - - public function processMessage(AMQPMessage $msg) + public function processMessage(AMQPMessage $msg): void { try { $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); $this->onConsume($this, $msg); - $result = call_user_func($this->callback, $msg); + $result = \call_user_func($this->callback, $msg); $this->onReply($this, $result); - $this->sendReply(serialize($result), $msg->get('reply_to'), $msg->get('correlation_id')); + $this->sendReply(\serialize($result), $msg->get('reply_to'), $msg->get('correlation_id')); $this->consumed++; $this->maybeStopConsumer(); - } catch (\Exception $e) { + } catch (\Throwable $e) { $this->sendReply('error: ' . $e->getMessage(), $msg->get('reply_to'), $msg->get('correlation_id')); } } - - - protected function sendReply($result, $client, $correlationId) + /** + * @param string $result + * @param string $client + * @param mixed $correlationId + */ + protected function sendReply(string $result, string $client, $correlationId): void { $this->getChannel()->basic_publish( new AMQPMessage($result, [ 'content_type' => 'text/plain', - 'correlation_id' => $correlationId + 'correlation_id' => $correlationId, ]), $exchange = '', $client diff --git a/src/Kdyby/RabbitMq/exceptions.php b/src/Kdyby/RabbitMq/exceptions.php deleted file mode 100644 index b450fc5a..00000000 --- a/src/Kdyby/RabbitMq/exceptions.php +++ /dev/null @@ -1,66 +0,0 @@ - - */ -interface Exception -{ - -} - - - -/** - * @author Filip Procházka - */ -class InvalidArgumentException extends \InvalidArgumentException implements Exception -{ - -} - - - -/** - * @author Alvaro Videla - */ -class QueueNotFoundException extends \RuntimeException implements Exception -{ - -} - - - -class TerminateException extends \RuntimeException implements Exception -{ - - private $response = IConsumer::MSG_REJECT_REQUEUE; - - - - /** - * @param int $response - * @return TerminateException - */ - public static function withResponse($response) - { - $e = new self(); - $e->response = $response; - return $e; - } - - - - /** - * @return int - */ - public function getResponse() - { - return $this->response; - } - -} diff --git a/tests/KdybyTests/RabbitMq/BaseAmqpTest.php b/tests/KdybyTests/RabbitMq/BaseAmqpTest.phpt similarity index 50% rename from tests/KdybyTests/RabbitMq/BaseAmqpTest.php rename to tests/KdybyTests/RabbitMq/BaseAmqpTest.phpt index 649de47f..946b56e7 100644 --- a/tests/KdybyTests/RabbitMq/BaseAmqpTest.php +++ b/tests/KdybyTests/RabbitMq/BaseAmqpTest.phpt @@ -1,39 +1,35 @@ getChannel(); - }, 'PhpAmqpLib\Exception\AMQPRuntimeException', 'Error Connecting to server(111): Connection refused'); + }, \ErrorException::class, 'stream_socket_client(): unable to connect to tcp://localhost:123 (Connection refused)'); } } -\run(new BaseAmqpTest()); +(new BaseAmqpTest())->run(); diff --git a/tests/KdybyTests/RabbitMq/Consumer.phpt b/tests/KdybyTests/RabbitMq/Consumer.phpt deleted file mode 100644 index f95bcde0..00000000 --- a/tests/KdybyTests/RabbitMq/Consumer.phpt +++ /dev/null @@ -1,84 +0,0 @@ -getMockery('Kdyby\RabbitMq\Connection', ['127.0.0.1', 5672, 'guest', 'guest']) - ->makePartial(); - - /** @var Kdyby\RabbitMq\Channel|Mock $amqpChannel */ - $amqpChannel = $this->getMockery('Kdyby\RabbitMq\Channel', [$amqpConnection]) - ->makePartial(); - - $consumer = new Consumer($amqpConnection); - $consumer->setChannel($amqpChannel); - - $callbackFunction = function() use ($processFlag) { return $processFlag; }; // Create a callback function with a return value set by the data provider. - $consumer->setCallback($callbackFunction); - - // Create a default message - $amqpMessage = new AMQPMessage('foo body'); - $amqpMessage->delivery_info['channel'] = $amqpChannel; - $amqpMessage->delivery_info['delivery_tag'] = 0; - - $amqpChannel->shouldReceive('basic_reject') - ->andReturnUsing(function($delivery_tag, $requeue) use ($expectedMethod, $expectedRequeue) { - Assert::same($expectedMethod, 'basic_reject'); // Check if this function should be called. - Assert::same($requeue, $expectedRequeue); // Check if the message should be requeued. - }); - - $amqpChannel->shouldReceive('basic_ack') - ->andReturnUsing(function($delivery_tag) use ($expectedMethod) { - Assert::same($expectedMethod, 'basic_ack'); // Check if this function should be called. - }); - - $consumer->processMessage($amqpMessage); - } - - - - public function processMessageProvider() - { - return [ - [null, 'basic_ack'], // Remove message from queue only if callback return not false - [true, 'basic_ack'], // Remove message from queue only if callback return not false - [false, 'basic_reject', true], // Reject and requeue message to RabbitMQ - [IConsumer::MSG_ACK, 'basic_ack'], // Remove message from queue only if callback return not false - [IConsumer::MSG_REJECT_REQUEUE, 'basic_reject', true], // Reject and requeue message to RabbitMQ - [IConsumer::MSG_REJECT, 'basic_reject', false], // Reject and drop - ]; - } - -} - -\run(new ConsumerTest()); diff --git a/tests/KdybyTests/RabbitMq/ConsumerTest.phpt b/tests/KdybyTests/RabbitMq/ConsumerTest.phpt new file mode 100644 index 00000000..32b0a601 --- /dev/null +++ b/tests/KdybyTests/RabbitMq/ConsumerTest.phpt @@ -0,0 +1,101 @@ +getMockery(\Kdyby\RabbitMq\Connection::class, ['127.0.0.1', 5672, 'guest', 'guest']) + ->makePartial(); + + /** @var \Kdyby\RabbitMq\Channel|\Mockery\Mock $amqpChannel */ + $amqpChannel = $this->getMockery(\Kdyby\RabbitMq\Channel::class, [$amqpConnection]) + ->makePartial(); + + $consumer = new Consumer($amqpConnection); + $consumer->setChannel($amqpChannel); + + $callbackFunction = static function () use ( + $processFlag + ) { + return $processFlag; + }; // Create a callback function with a return value set by the data provider. + $consumer->setCallback($callbackFunction); + + // Create a default message + $amqpMessage = new AMQPMessage('foo body'); + $amqpMessage->delivery_info['channel'] = $amqpChannel; + $amqpMessage->delivery_info['delivery_tag'] = 0; + + $amqpChannel->shouldReceive('basic_reject') + ->andReturnUsing( + static function ( + $deliveryTag, + $requeue + ) use ( + $expectedMethod, + $expectedRequeue + ): void { + Assert::same($expectedMethod, 'basic_reject'); // Check if this function should be called. + Assert::same($requeue, $expectedRequeue); // Check if the message should be requeued. + } + ); + + $amqpChannel->shouldReceive('basic_ack') + ->andReturnUsing( + static function () use ( + $expectedMethod + ): void { + Assert::same($expectedMethod, 'basic_ack'); // Check if this function should be called. + } + ); + + $consumer->processMessage($amqpMessage); + }); + } + + /** + * @return array + */ + public function processMessageProvider(): array + { + return [ + [NULL, 'basic_ack'], // Remove message from queue only if callback return not false + [TRUE, 'basic_ack'], // Remove message from queue only if callback return not false + [FALSE, 'basic_reject', TRUE], // Reject and requeue message to RabbitMQ + [IConsumer::MSG_ACK, 'basic_ack'], // Remove message from queue only if callback return not false + [IConsumer::MSG_REJECT_REQUEUE, 'basic_reject', TRUE], // Reject and requeue message to RabbitMQ + [IConsumer::MSG_REJECT, 'basic_reject', FALSE], // Reject and drop + ]; + } + +} + +(new ConsumerTest())->run(); diff --git a/tests/KdybyTests/RabbitMq/Extension.phpt b/tests/KdybyTests/RabbitMq/ExtensionTest.phpt similarity index 71% rename from tests/KdybyTests/RabbitMq/Extension.phpt rename to tests/KdybyTests/RabbitMq/ExtensionTest.phpt index 803909c4..4b367c03 100644 --- a/tests/KdybyTests/RabbitMq/Extension.phpt +++ b/tests/KdybyTests/RabbitMq/ExtensionTest.phpt @@ -1,59 +1,57 @@ - * @package Kdyby\RabbitMq */ namespace KdybyTests\RabbitMq; use Kdyby; -use KdybyTests; use Nette; use PhpAmqpLib\Connection\AMQPStreamConnection; -use Tester; use Tester\Assert; -require_once __DIR__ . '/TestCase.php'; +require_once __DIR__ . '/../bootstrap.php'; -/** - * @author Filip Procházka - */ -class ExtensionTest extends TestCase +class ExtensionTest extends \KdybyTests\RabbitMq\TestCase { - /** - * @return \SystemContainer|\Nette\DI\Container - */ - protected function createContainer() + protected function createContainer(): \Nette\DI\Container { $config = new Nette\Configurator(); $config->setTempDirectory(TEMP_DIR); - Kdyby\RabbitMq\DI\RabbitMqExtension::register($config); + $config->onCompile[] = static function ($config, Nette\DI\Compiler $compiler): void { + $compiler->addExtension('rabbitmq', new Kdyby\RabbitMq\DI\RabbitMqExtension()); + }; $config->addConfig(__DIR__ . '/files/nette-reset.neon'); $config->addConfig(__DIR__ . '/files/default.neon'); return $config->createContainer(); } - - - public function testFunctional() + public function testFunctional(): void { $dic = $this->createContainer(); // foo was defined first in config - Assert::true($dic->getByType('Kdyby\RabbitMq\Connection') instanceof AMQPStreamConnection); - Assert::same($dic->getByType('Kdyby\RabbitMq\Connection'), $dic->getService('rabbitmq.foo_connection.connection')); + Assert::true($dic->getByType(\Kdyby\RabbitMq\Connection::class) instanceof AMQPStreamConnection); + Assert::same( + $dic->getByType(\Kdyby\RabbitMq\Connection::class), + $dic->getService('rabbitmq.foo_connection.connection') + ); // only the first defined connection is autowired Assert::true($dic->getService('rabbitmq.default.connection') instanceof AMQPStreamConnection); - Assert::notSame($dic->getByType('Kdyby\RabbitMq\Connection'), $dic->getService('rabbitmq.default.connection')); + Assert::notSame( + $dic->getByType(\Kdyby\RabbitMq\Connection::class), + $dic->getService('rabbitmq.default.connection') + ); Assert::true($dic->getService('rabbitmq.producer.foo_producer') instanceof Kdyby\RabbitMq\Producer); Assert::true($dic->getService('rabbitmq.producer.default_producer') instanceof Kdyby\RabbitMq\Producer); @@ -74,4 +72,4 @@ class ExtensionTest extends TestCase } -\run(new ExtensionTest()); +(new ExtensionTest())->run(); diff --git a/tests/KdybyTests/RabbitMq/Mock/Callback.php b/tests/KdybyTests/RabbitMq/Mock/Callback.php new file mode 100644 index 00000000..e5c8a24f --- /dev/null +++ b/tests/KdybyTests/RabbitMq/Mock/Callback.php @@ -0,0 +1,39 @@ + + */ + public static $accepted = []; + + /** + * @param mixed $message + */ + public function __invoke($message): void + { + self::$accepted[] = \func_get_args(); + } + + /** + * @param mixed $message + */ + public function process($message): void + { + self::$accepted[] = \func_get_args(); + } + + /** + * @param mixed $message + */ + public static function staticProcess($message): void + { + self::$accepted[] = \func_get_args(); + } + +} diff --git a/tests/KdybyTests/RabbitMq/Mock/ChannelMock.php b/tests/KdybyTests/RabbitMq/Mock/ChannelMock.php new file mode 100644 index 00000000..757aadc8 --- /dev/null +++ b/tests/KdybyTests/RabbitMq/Mock/ChannelMock.php @@ -0,0 +1,376 @@ + + */ + public $calls = []; + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + protected function channel_alert($args) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + parent::channel_alert($args); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + protected function channel_close($args) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + parent::channel_close($args); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + protected function channel_flow($args) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + parent::channel_flow($args); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function exchange_declare( + $exchange, + $type, + $passive = FALSE, + $durable = FALSE, + $autoDelete = TRUE, + $internal = FALSE, + $nowait = FALSE, + $arguments = NULL, + $ticket = NULL + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::exchange_declare( + $exchange, + $type, + $passive, + $durable, + $autoDelete, + $internal, + $nowait, + $arguments, + $ticket + ); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function exchange_delete( + $exchange, + $ifUnused = FALSE, + $nowait = FALSE, + $ticket = NULL + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::exchange_delete($exchange, $ifUnused, $nowait, $ticket); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function exchange_bind( + $destination, + $source, + $routingKey = '', + $nowait = FALSE, + $arguments = NULL, + $ticket = NULL + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::exchange_bind($destination, $source, $routingKey, $nowait, $arguments, $ticket); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function exchange_unbind( + $destination, + $source, + $routingKey = '', + $arguments = NULL, + $ticket = NULL + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::exchange_unbind($destination, $source, $routingKey, $arguments, $ticket); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function queue_bind( + $queue, + $exchange, + $routingKey = '', + $nowait = FALSE, + $arguments = NULL, + $ticket = NULL + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::queue_bind($queue, $exchange, $routingKey, $nowait, $arguments, $ticket); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function queue_unbind( + $queue, + $exchange, + $routingKey = '', + $arguments = NULL, + $ticket = NULL + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::queue_unbind($queue, $exchange, $routingKey, $arguments, $ticket); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function queue_declare( + $queue = '', + $passive = FALSE, + $durable = FALSE, + $exclusive = FALSE, + $autoDelete = TRUE, + $nowait = FALSE, + $arguments = NULL, + $ticket = NULL + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::queue_declare( + $queue, + $passive, + $durable, + $exclusive, + $autoDelete, + $nowait, + $arguments, + $ticket + ); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function queue_delete( + $queue = '', + $ifUnused = FALSE, + $ifEmpty = FALSE, + $nowait = FALSE, + $ticket = NULL + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::queue_delete($queue, $ifUnused, $ifEmpty, $nowait, $ticket); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function queue_purge( + $queue = '', + $nowait = FALSE, + $ticket = NULL + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::queue_purge($queue, $nowait, $ticket); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function basic_ack( + $deliveryTag, + $multiple = FALSE + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + parent::basic_ack($deliveryTag, $multiple); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function basic_nack( + $deliveryTag, + $multiple = FALSE, + $requeue = FALSE + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + parent::basic_nack($deliveryTag, $multiple, $requeue); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function basic_cancel( + $consumerTag, + $nowait = FALSE, + $noreturn = FALSE + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::basic_cancel($consumerTag, $nowait); + } + + /** + * Starts a queue consumer + * + * @param string $queue + * @param string $consumerTag + * @param bool $noLocal + * @param bool $noAck + * @param bool $exclusive + * @param bool $nowait + * @param callable|null $callback + * @param int|null $ticket + * @param array $arguments + * @return mixed|string + */ + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function basic_consume( + $queue = '', + $consumerTag = '', + $noLocal = FALSE, + $noAck = FALSE, + $exclusive = FALSE, + $nowait = FALSE, + $callback = NULL, + $ticket = NULL, + $arguments = [] + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::basic_consume( + $queue, + $consumerTag, + $noLocal, + $noAck, + $exclusive, + $nowait, + $callback, + $ticket, + $arguments + ); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function basic_get( + $queue = '', + $noAck = FALSE, + $ticket = NULL + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::basic_get( + $queue, + $noAck, + $ticket + ); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function basic_qos( + $prefetchSize, + $prefetchCount, + $AGlobal + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::basic_qos($prefetchSize, $prefetchCount, $AGlobal); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function basic_recover($requeue = FALSE) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::basic_recover($requeue); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function basic_reject( + $deliveryTag, + $requeue + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + parent::basic_reject($deliveryTag, $requeue); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + protected function basic_return( + $args, + $msg + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + parent::basic_return($args, $msg); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function tx_commit() + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::tx_commit(); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function tx_rollback() + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::tx_rollback(); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function confirm_select($nowait = FALSE) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::confirm_select($nowait); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function tx_select() + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::tx_select(); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint + public function dispatch( + $methodSig, + $args, + $content + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + + return parent::dispatch($methodSig, $args, $content); + } + + // phpcs:disable SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingReturnTypeHint,SlevomatCodingStandard.TypeHints.TypeHintDeclaration.MissingParameterTypeHint,PSR1.Methods.CamelCapsMethodName + public function basic_publish( + $msg, + $exchange = '', + $routingKey = '', + $mandatory = FALSE, + $immediate = FALSE, + $ticket = NULL + ) + { + $this->calls[] = [__FUNCTION__] + \get_defined_vars(); + parent::basic_publish($msg, $exchange, $routingKey, $mandatory, $immediate, $ticket); + } + +} diff --git a/tests/KdybyTests/RabbitMq/Mock/ConnectionMock.php b/tests/KdybyTests/RabbitMq/Mock/ConnectionMock.php new file mode 100644 index 00000000..0cb205d4 --- /dev/null +++ b/tests/KdybyTests/RabbitMq/Mock/ConnectionMock.php @@ -0,0 +1,23 @@ +getMockery('Kdyby\RabbitMq\Connection', ['127.0.0.1', 5672, 'guest', 'guest']) - ->makePartial(); - - /** @var Kdyby\RabbitMq\Channel|Mock $amqpChannel */ - $amqpChannel = $this->getMockery('Kdyby\RabbitMq\Channel', [$amqpConnection]) - ->makePartial(); - - $consumer = new MultipleConsumer($amqpConnection); - $consumer->setChannel($amqpChannel); - - $callback = function($msg) use (&$lastQueue, $processFlag) { return $processFlag; }; - $consumer->setQueues(['test-1' => ['callback' => $callback], 'test-2' => ['callback' => $callback]]); - - // Create a default message - $amqpMessage = new AMQPMessage('foo body'); - $amqpMessage->delivery_info['channel'] = $amqpChannel; - $amqpMessage->delivery_info['delivery_tag'] = 0; - - $amqpChannel->shouldReceive('basic_reject') - ->andReturnUsing(function ($delivery_tag, $requeue) use ($expectedMethod, $expectedRequeue) { - Assert::same($expectedMethod, 'basic_reject'); // Check if this function should be called. - Assert::same($requeue, $expectedRequeue); // Check if the message should be requeued. - }); - - $amqpChannel->shouldReceive('basic_ack') - ->andReturnUsing(function ($delivery_tag) use ($expectedMethod) { - Assert::same($expectedMethod, 'basic_ack'); // Check if this function should be called. - }); - - $consumer->processQueueMessage('test-1', $amqpMessage); - $consumer->processQueueMessage('test-2', $amqpMessage); - } - - - - public function processMessageProvider() - { - return [ - [null, 'basic_ack'], // Remove message from queue only if callback return not false - [true, 'basic_ack'], // Remove message from queue only if callback return not false - [false, 'basic_reject', true], // Reject and requeue message to RabbitMQ - [IConsumer::MSG_ACK, 'basic_ack'], // Remove message from queue only if callback return not false - [IConsumer::MSG_REJECT_REQUEUE, 'basic_reject', true], // Reject and requeue message to RabbitMQ - [IConsumer::MSG_REJECT, 'basic_reject', false], // Reject and drop - ]; - } - -} - -\run(new MultipleConsumerTest()); diff --git a/tests/KdybyTests/RabbitMq/MultipleConsumerTest.phpt b/tests/KdybyTests/RabbitMq/MultipleConsumerTest.phpt new file mode 100644 index 00000000..27b3c6cd --- /dev/null +++ b/tests/KdybyTests/RabbitMq/MultipleConsumerTest.phpt @@ -0,0 +1,101 @@ +getMockery(\Kdyby\RabbitMq\Connection::class, ['127.0.0.1', 5672, 'guest', 'guest']) + ->makePartial(); + + /** @var \Kdyby\RabbitMq\Channel|\Mockery\Mock $amqpChannel */ + $amqpChannel = $this->getMockery(\Kdyby\RabbitMq\Channel::class, [$amqpConnection]) + ->makePartial(); + $consumer = new MultipleConsumer($amqpConnection); + $consumer->setChannel($amqpChannel); + + $callback = static function ($msg) use ($processFlag) { + return $processFlag; + }; + $consumer->setQueues(['test-1' => ['callback' => $callback], 'test-2' => ['callback' => $callback]]); + + // Create a default message + $amqpMessage = new AMQPMessage('foo body'); + $amqpMessage->delivery_info['channel'] = $amqpChannel; + $amqpMessage->delivery_info['delivery_tag'] = 0; + + $amqpChannel->shouldReceive('basic_reject') + ->andReturnUsing( + static function ( + $deliveryTag, + $requeue + ) use ( + $expectedMethod, + $expectedRequeue + ): void { + Assert::same($expectedMethod, 'basic_reject'); // Check if this function should be called. + Assert::same($requeue, $expectedRequeue); // Check if the message should be requeued. + } + ); + + $amqpChannel->shouldReceive('basic_ack') + ->andReturnUsing( + static function ($deliveryTag) use ($expectedMethod): void { + Assert::same($expectedMethod, 'basic_ack'); // Check if this function should be called. + } + ); + + $consumer->processQueueMessage('test-1', $amqpMessage); + $consumer->processQueueMessage('test-2', $amqpMessage); + }); + } + + /** + * @return array + */ + public function processMessageProvider(): array + { + return [ + [NULL, 'basic_ack'], // Remove message from queue only if callback return not false + [TRUE, 'basic_ack'], // Remove message from queue only if callback return not false + [FALSE, 'basic_reject', TRUE], // Reject and requeue message to RabbitMQ + [IConsumer::MSG_ACK, 'basic_ack'], // Remove message from queue only if callback return not false + [IConsumer::MSG_REJECT_REQUEUE, 'basic_reject', TRUE], // Reject and requeue message to RabbitMQ + [IConsumer::MSG_REJECT, 'basic_reject', FALSE], // Reject and drop + ]; + } + +} + +(new MultipleConsumerTest())->run(); diff --git a/tests/KdybyTests/RabbitMq/TestCase.php b/tests/KdybyTests/RabbitMq/TestCase.php index e2c06d45..ebb91fad 100644 --- a/tests/KdybyTests/RabbitMq/TestCase.php +++ b/tests/KdybyTests/RabbitMq/TestCase.php @@ -1,5 +1,7 @@ - */ -abstract class TestCase extends Tester\TestCase +abstract class TestCase extends \Tester\TestCase { /** - * @var Mockery\Container + * @var \Mockery\Container */ private $mockery; - - /** * @param string $class * @throws \Mockery\Exception\RuntimeException - * @return Mockery\Container|Mockery\Mock + * @return \Mockery\Container|\Mockery\Mock */ - protected function getMockery($class = NULL) + protected function getMockery(?string $class = NULL) { if (!$this->mockery) { $this->mockery = new Mockery\Container(Mockery::getDefaultGenerator(), Mockery::getDefaultLoader()); @@ -51,9 +40,7 @@ protected function getMockery($class = NULL) return $this->mockery; } - - - protected function tearDown() + protected function tearDown(): void { if ($this->mockery) { $this->mockery->mockery_close(); diff --git a/tests/KdybyTests/RabbitMq/files/default.neon b/tests/KdybyTests/RabbitMq/files/default.neon index 1211379e..8d807397 100644 --- a/tests/KdybyTests/RabbitMq/files/default.neon +++ b/tests/KdybyTests/RabbitMq/files/default.neon @@ -59,7 +59,7 @@ rabbitmq: routing_keys: - 'android.#.upload' - 'iphone.upload' - callback: KdybyTests\RabbitMq\Callback() + callback: KdybyTests\RabbitMq\Mock\Callback() default_consumer: exchange: @@ -67,7 +67,7 @@ rabbitmq: type: direct queue: name: default_queue - callback: [KdybyTests\RabbitMq\Callback, staticProcess] + callback: [KdybyTests\RabbitMq\Mock\Callback, staticProcess] qos_test_consumer: connection: foo_connection @@ -90,7 +90,7 @@ rabbitmq: queues: multi-test-1: name: multi_test_1 - callback: [@KdybyTests\RabbitMq\Callback, process] + callback: [@KdybyTests\RabbitMq\Mock\Callback, process] multi-test-2: name: foo_bar_2 passive: true @@ -103,7 +103,7 @@ rabbitmq: routing_keys: - 'android.upload' - 'iphone.upload' - callback: KdybyTests\RabbitMq\Callback() + callback: KdybyTests\RabbitMq\Mock\Callback() foo_anon_consumer: connection: foo_connection @@ -117,13 +117,13 @@ rabbitmq: nowait: true arguments: null ticket: null - callback: KdybyTests\RabbitMq\Callback() + callback: KdybyTests\RabbitMq\Mock\Callback() default_anon_consumer: exchange: name: default_anon_exchange type: direct - callback: KdybyTests\RabbitMq\Callback() + callback: KdybyTests\RabbitMq\Mock\Callback() rpcClients: @@ -136,11 +136,11 @@ rabbitmq: rpcServers: foo_server: connection: foo_connection - callback: KdybyTests\RabbitMq\Callback() + callback: KdybyTests\RabbitMq\Mock\Callback() default_server: - callback: KdybyTests\RabbitMq\Callback() + callback: KdybyTests\RabbitMq\Mock\Callback() services: - collecting_callback: KdybyTests\RabbitMq\Callback + collecting_callback: KdybyTests\RabbitMq\Mock\Callback diff --git a/tests/KdybyTests/RabbitMq/mocks.php b/tests/KdybyTests/RabbitMq/mocks.php deleted file mode 100644 index e03c4699..00000000 --- a/tests/KdybyTests/RabbitMq/mocks.php +++ /dev/null @@ -1,335 +0,0 @@ -calls[] = [__FUNCTION__] + get_defined_vars(); - parent::channel_alert($args); - } - - - - protected function channel_close($args) - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - parent::channel_close($args); - } - - - - protected function channel_flow($args) - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - parent::channel_flow($args); - } - - - - public function exchange_declare( - $exchange, - $type, - $passive = FALSE, - $durable = FALSE, - $auto_delete = TRUE, - $internal = FALSE, - $nowait = FALSE, - $arguments = NULL, - $ticket = NULL - ) { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::exchange_declare($exchange, $type, $passive, $durable, $auto_delete, $internal, $nowait, $arguments, $ticket); - } - - - - public function exchange_delete( - $exchange, - $if_unused = FALSE, - $nowait = FALSE, - $ticket = NULL - ) { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::exchange_delete($exchange, $if_unused, $nowait, $ticket); - } - - - - public function exchange_bind( - $destination, - $source, - $routing_key = "", - $nowait = FALSE, - $arguments = NULL, - $ticket = NULL - ) { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::exchange_bind($destination, $source, $routing_key, $nowait, $arguments, $ticket); - } - - - - public function exchange_unbind($destination, $source, $routing_key = "", $arguments = NULL, $ticket = NULL) - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::exchange_unbind($destination, $source, $routing_key, $arguments, $ticket); - } - - - - public function queue_bind($queue, $exchange, $routing_key = "", $nowait = FALSE, $arguments = NULL, $ticket = NULL) - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::queue_bind($queue, $exchange, $routing_key, $nowait, $arguments, $ticket); - } - - - - public function queue_unbind($queue, $exchange, $routing_key = "", $arguments = NULL, $ticket = NULL) - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::queue_unbind($queue, $exchange, $routing_key, $arguments, $ticket); - } - - - - public function queue_declare( - $queue = "", - $passive = FALSE, - $durable = FALSE, - $exclusive = FALSE, - $auto_delete = TRUE, - $nowait = FALSE, - $arguments = NULL, - $ticket = NULL - ) { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::queue_declare($queue, $passive, $durable, $exclusive, $auto_delete, $nowait, $arguments, $ticket); - } - - - - public function queue_delete($queue = "", $if_unused = FALSE, $if_empty = FALSE, $nowait = FALSE, $ticket = NULL) - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::queue_delete($queue, $if_unused, $if_empty, $nowait, $ticket); - } - - - - public function queue_purge($queue = "", $nowait = FALSE, $ticket = NULL) - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::queue_purge($queue, $nowait, $ticket); - } - - - - public function basic_ack($delivery_tag, $multiple = FALSE) - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - parent::basic_ack($delivery_tag, $multiple); - } - - - - public function basic_nack($delivery_tag, $multiple = FALSE, $requeue = FALSE) - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - parent::basic_nack($delivery_tag, $multiple, $requeue); - } - - - - public function basic_cancel($consumer_tag, $nowait = FALSE, $noreturn = false) - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::basic_cancel($consumer_tag, $nowait); - } - - - - public function basic_consume( - $queue = "", - $consumer_tag = "", - $no_local = FALSE, - $no_ack = FALSE, - $exclusive = FALSE, - $nowait = FALSE, - $callback = NULL, - $ticket = NULL, - $arguments = [] - ) { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::basic_consume($queue, $consumer_tag, $no_local, $no_ack, $exclusive, $nowait, $callback, $ticket, $arguments); - } - - - - public function basic_get($queue = "", $no_ack = FALSE, $ticket = NULL) - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::basic_get($queue, $no_ack, $ticket); - } - - - - public function basic_qos($prefetch_size, $prefetch_count, $a_global) - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::basic_qos($prefetch_size, $prefetch_count, $a_global); - } - - - - public function basic_recover($requeue = FALSE) - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::basic_recover($requeue); - } - - - - public function basic_reject($delivery_tag, $requeue) - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - parent::basic_reject($delivery_tag, $requeue); - } - - - - protected function basic_return($args, $msg) - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - parent::basic_return($args, $msg); - } - - - - public function tx_commit() - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::tx_commit(); - } - - - - public function tx_rollback() - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::tx_rollback(); - } - - - - public function confirm_select($nowait = FALSE) - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::confirm_select($nowait); - } - - - - public function tx_select() - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::tx_select(); - } - - - - public function dispatch($method_sig, $args, $content) - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - - return parent::dispatch($method_sig, $args, $content); - } - - - - public function basic_publish($msg, $exchange = '', $routingKey = '', $mandatory = FALSE, $immediate = FALSE, $ticket = NULL) - { - $this->calls[] = [__FUNCTION__] + get_defined_vars(); - parent::basic_publish($msg, $exchange, $routingKey, $mandatory, $immediate, $ticket); - } - -} - - - -class Callback -{ - - public static $accepted = []; - - - - public function __invoke($message) - { - self::$accepted[] = func_get_args(); - } - - - - public function process($message) - { - self::$accepted[] = func_get_args(); - } - - - - public static function staticProcess($message) - { - self::$accepted[] = func_get_args(); - } - -} diff --git a/tests/KdybyTests/bootstrap.php b/tests/KdybyTests/bootstrap.php index b2c4aa81..82045134 100755 --- a/tests/KdybyTests/bootstrap.php +++ b/tests/KdybyTests/bootstrap.php @@ -1,5 +1,7 @@ run(isset($_SERVER['argv'][1]) ? $_SERVER['argv'][1] : NULL); -} +return $loader; diff --git a/tests/RabbitMq/MultipleConsumerTest.php b/tests/RabbitMq/MultipleConsumerTest.php index 25e851e8..5ca41b92 100644 --- a/tests/RabbitMq/MultipleConsumerTest.php +++ b/tests/RabbitMq/MultipleConsumerTest.php @@ -1,5 +1,7 @@ getMockBuilder('\PhpAmqpLib\Connection\AMQPConnection') ->disableOriginalConstructor() @@ -24,11 +27,11 @@ public function testProcessMessage($processFlag, $expectedMethod, $expectedReque ->getMock(); $consumer = new MultipleConsumer($amqpConnection, $amqpChannel); - $callback = function($msg) use (&$lastQueue, $processFlag) { + $callback = static function ($msg) use ($processFlag) { return $processFlag; }; - $consumer->setQueues(['test-1' => ['callback' => $callback], 'test-2' => ['callback' => $callback]]); + $consumer->setQueues(['test-1' => ['callback' => $callback], 'test-2' => ['callback' => $callback]]); // Create a default message $amqpMessage = new AMQPMessage('foo body'); @@ -36,14 +39,14 @@ public function testProcessMessage($processFlag, $expectedMethod, $expectedReque $amqpMessage->delivery_info['delivery_tag'] = 0; $amqpChannel->expects($this->any()) ->method('basic_reject') - ->will($this->returnCallback(function($delivery_tag, $requeue) use ($expectedMethod, $expectedRequeue) { + ->will($this->returnCallback(static function ($delivery_tag, $requeue) use ($expectedMethod, $expectedRequeue): void { \PHPUnit_Framework_Assert::assertSame($expectedMethod, 'basic_reject'); // Check if this function should be called. \PHPUnit_Framework_Assert::assertSame($requeue, $expectedRequeue); // Check if the message should be requeued. })); $amqpChannel->expects($this->any()) ->method('basic_ack') - ->will($this->returnCallback(function($delivery_tag) use ($expectedMethod) { + ->will($this->returnCallback(static function ($delivery_tag) use ($expectedMethod): void { \PHPUnit_Framework_Assert::assertSame($expectedMethod, 'basic_ack'); // Check if this function should be called. })); @@ -54,12 +57,13 @@ public function testProcessMessage($processFlag, $expectedMethod, $expectedReque public function processMessageProvider() { return [ - [null, 'basic_ack'], // Remove message from queue only if callback return not false - [true, 'basic_ack'], // Remove message from queue only if callback return not false - [false, 'basic_reject', true], // Reject and requeue message to RabbitMQ + [NULL, 'basic_ack'], // Remove message from queue only if callback return not false + [TRUE, 'basic_ack'], // Remove message from queue only if callback return not false + [FALSE, 'basic_reject', TRUE], // Reject and requeue message to RabbitMQ [ConsumerInterface::MSG_ACK, 'basic_ack'], // Remove message from queue only if callback return not false - [ConsumerInterface::MSG_REJECT_REQUEUE, 'basic_reject', true], // Reject and requeue message to RabbitMQ - [ConsumerInterface::MSG_REJECT, 'basic_reject', false], // Reject and drop + [ConsumerInterface::MSG_REJECT_REQUEUE, 'basic_reject', TRUE], // Reject and requeue message to RabbitMQ + [ConsumerInterface::MSG_REJECT, 'basic_reject', FALSE], // Reject and drop ]; } + }