From 64e88b94fd4975846d4ce4d2e94ff1a25c03bc14 Mon Sep 17 00:00:00 2001 From: Marko Date: Mon, 16 Nov 2020 11:55:31 +0100 Subject: [PATCH 01/14] Remove prestissimo --- docker/dev/php/Dockerfile | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docker/dev/php/Dockerfile b/docker/dev/php/Dockerfile index f950318..3865335 100644 --- a/docker/dev/php/Dockerfile +++ b/docker/dev/php/Dockerfile @@ -28,8 +28,7 @@ RUN echo "$HOST_USER:x:$HOST_USER_ID:82:Linux User,,,:/home/$HOST_USER:" >> /etc addgroup $HOST_USER www-data # COMPOSER: install binary and prestissimo -RUN curl -sS https://getcomposer.org/installer | php -- --install-dir=/usr/bin --filename=composer && \ - composer global require hirak/prestissimo +RUN curl -sS https://getcomposer.org/installer | php -- --install-dir=/usr/bin --filename=composer # PHP: Install php extensions RUN pecl channel-update pecl.php.net && \ From c85465d84d3345dcb37e1a5d9b24134f26f7536e Mon Sep 17 00:00:00 2001 From: Marko Date: Mon, 16 Nov 2020 13:02:38 +0100 Subject: [PATCH 02/14] avro-validator composer suggestion --- composer.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/composer.json b/composer.json index cb5a665..17454c4 100644 --- a/composer.json +++ b/composer.json @@ -38,7 +38,8 @@ } }, "suggest": { - "flix-tech/avro-serde-php": "Is needed for Avro support" + "flix-tech/avro-serde-php": "Is needed for Avro support", + "jobcloud/avro-validator": "Useful for debug purposes" }, "extra": { "branch-alias": { From 1c2ef77ce8487e2a36f90a9896b3b340fe5308f9 Mon Sep 17 00:00:00 2001 From: Marko Date: Mon, 16 Nov 2020 17:02:18 +0100 Subject: [PATCH 03/14] Revert prestissimo --- docker/dev/php/Dockerfile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docker/dev/php/Dockerfile b/docker/dev/php/Dockerfile index 3865335..f950318 100644 --- a/docker/dev/php/Dockerfile +++ b/docker/dev/php/Dockerfile @@ -28,7 +28,8 @@ RUN echo "$HOST_USER:x:$HOST_USER_ID:82:Linux User,,,:/home/$HOST_USER:" >> /etc addgroup $HOST_USER www-data # COMPOSER: install binary and prestissimo -RUN curl -sS https://getcomposer.org/installer | php -- --install-dir=/usr/bin --filename=composer +RUN curl -sS https://getcomposer.org/installer | php -- --install-dir=/usr/bin --filename=composer && \ + composer global require hirak/prestissimo # PHP: Install php extensions RUN pecl channel-update pecl.php.net && \ From 29aca1eca0586bc5e2b328599fc44cc780d4e4df Mon Sep 17 00:00:00 2001 From: Marko Date: Wed, 18 Nov 2020 16:20:36 +0100 Subject: [PATCH 04/14] Validate schema with avro validator --- src/Message/Encoder/AvroEncoder.php | 31 ++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/src/Message/Encoder/AvroEncoder.php b/src/Message/Encoder/AvroEncoder.php index bb86822..f53cdbe 100644 --- a/src/Message/Encoder/AvroEncoder.php +++ b/src/Message/Encoder/AvroEncoder.php @@ -45,7 +45,20 @@ public function __construct( */ public function encode(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface { - $producerMessage = $this->encodeBody($producerMessage); + try { + $producerMessage = $this->encodeBody($producerMessage); + } catch (\Exception $exception) { + if (class_exists(\Jobcloud\Avro\Validator\Validator::class)) { + $topicName = $producerMessage->getTopicName(); + $body = $producerMessage->getBody(); + + $avroSchema = $this->registry->getBodySchemaForTopic($topicName); + + $validationErrors = $this->validateSchema(json_encode($avroSchema->getDefinition()->to_avro()), $body, $topicName); + + var_dump($validationErrors); + } + } return $this->encodeKey($producerMessage); } @@ -131,4 +144,20 @@ public function getRegistry(): AvroSchemaRegistryInterface { return $this->registry; } + + /** + * @param string $avroSchema + * @param mixed $data + * @param string $topicName + * @return array + * @throws \Jobcloud\Avro\Validator\Exception\RecordRegistryException + * @throws \Jobcloud\Avro\Validator\Exception\ValidatorException + */ + private function validateSchema(string $avroSchema, $data, string $topicName): array + { + $recordRegistry = \Jobcloud\Avro\Validator\RecordRegistry::fromSchema((string) $avroSchema); + $validator = new \Jobcloud\Avro\Validator\Validator($recordRegistry); + + return $validator->validate(json_encode($data), $topicName); + } } From 2056086deb2829dfee4c743956579f4f02167bd2 Mon Sep 17 00:00:00 2001 From: Marko Date: Thu, 19 Nov 2020 11:35:07 +0100 Subject: [PATCH 05/14] Validate schema with avro validator --- composer.json | 2 +- src/Exception/AvroValidatorException.php | 10 +++ src/Message/Encoder/AvroEncoder.php | 85 +++++++++++++++--------- 3 files changed, 65 insertions(+), 32 deletions(-) create mode 100644 src/Exception/AvroValidatorException.php diff --git a/composer.json b/composer.json index 17454c4..c8846df 100644 --- a/composer.json +++ b/composer.json @@ -39,7 +39,7 @@ }, "suggest": { "flix-tech/avro-serde-php": "Is needed for Avro support", - "jobcloud/avro-validator": "Useful for debug purposes" + "jobcloud/avro-validator": "Useful for debug purposes in development, not recommended for production use" }, "extra": { "branch-alias": { diff --git a/src/Exception/AvroValidatorException.php b/src/Exception/AvroValidatorException.php new file mode 100644 index 0000000..b838517 --- /dev/null +++ b/src/Exception/AvroValidatorException.php @@ -0,0 +1,10 @@ +encodeBody($producerMessage); - } catch (\Exception $exception) { - if (class_exists(\Jobcloud\Avro\Validator\Validator::class)) { - $topicName = $producerMessage->getTopicName(); - $body = $producerMessage->getBody(); - - $avroSchema = $this->registry->getBodySchemaForTopic($topicName); - - $validationErrors = $this->validateSchema(json_encode($avroSchema->getDefinition()->to_avro()), $body, $topicName); - - var_dump($validationErrors); - } - } + $producerMessage = $this->encodeBody($producerMessage); return $this->encodeKey($producerMessage); } @@ -66,7 +59,10 @@ public function encode(KafkaProducerMessageInterface $producerMessage): KafkaPro /** * @param KafkaProducerMessageInterface $producerMessage * @return KafkaProducerMessageInterface + * @throws AvroValidatorException + * @throws RecordRegistryException * @throws SchemaRegistryException + * @throws ValidatorException */ private function encodeBody(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface { @@ -83,11 +79,23 @@ private function encodeBody(KafkaProducerMessageInterface $producerMessage): Kaf $avroSchema = $this->registry->getBodySchemaForTopic($topicName); - $encodedBody = $this->recordSerializer->encodeRecord( - $avroSchema->getName(), - $this->getAvroSchemaDefinition($avroSchema), - $body - ); + try { + $encodedBody = $this->recordSerializer->encodeRecord( + $avroSchema->getName(), + $this->getAvroSchemaDefinition($avroSchema), + $body + ); + } catch (AvroEncodingException $exception) { + if (class_exists(Validator::class)) { + $validationErrors = $this->validateSchema( + $avroSchema->getDefinition()->to_avro(), + $body, + $topicName + ); + + throw new AvroValidatorException(json_encode($validationErrors)); + } + } return $producerMessage->withBody($encodedBody); } @@ -95,7 +103,10 @@ private function encodeBody(KafkaProducerMessageInterface $producerMessage): Kaf /** * @param KafkaProducerMessageInterface $producerMessage * @return KafkaProducerMessageInterface + * @throws AvroValidatorException + * @throws RecordRegistryException * @throws SchemaRegistryException + * @throws ValidatorException */ private function encodeKey(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface { @@ -112,11 +123,23 @@ private function encodeKey(KafkaProducerMessageInterface $producerMessage): Kafk $avroSchema = $this->registry->getKeySchemaForTopic($topicName); - $encodedKey = $this->recordSerializer->encodeRecord( - $avroSchema->getName(), - $this->getAvroSchemaDefinition($avroSchema), - $key - ); + try { + $encodedKey = $this->recordSerializer->encodeRecord( + $avroSchema->getName(), + $this->getAvroSchemaDefinition($avroSchema), + $key + ); + } catch (AvroEncodingException $exception) { + if (class_exists(Validator::class)) { + $validationErrors = $this->validateSchema( + $avroSchema->getDefinition()->to_avro(), + $key, + $topicName + ); + + throw new AvroValidatorException(json_encode($validationErrors)); + } + } return $producerMessage->withKey($encodedKey); } @@ -146,17 +169,17 @@ public function getRegistry(): AvroSchemaRegistryInterface } /** - * @param string $avroSchema + * @param array $avroSchema * @param mixed $data * @param string $topicName * @return array - * @throws \Jobcloud\Avro\Validator\Exception\RecordRegistryException - * @throws \Jobcloud\Avro\Validator\Exception\ValidatorException + * @throws RecordRegistryException + * @throws ValidatorException */ - private function validateSchema(string $avroSchema, $data, string $topicName): array + private function validateSchema(array $avroSchema, $data, string $topicName): array { - $recordRegistry = \Jobcloud\Avro\Validator\RecordRegistry::fromSchema((string) $avroSchema); - $validator = new \Jobcloud\Avro\Validator\Validator($recordRegistry); + $recordRegistry = RecordRegistry::fromSchema(json_encode($avroSchema)); + $validator = new Validator($recordRegistry); return $validator->validate(json_encode($data), $topicName); } From 0cb4460b6a00753872aa5914b4b84bbd65b1e6ca Mon Sep 17 00:00:00 2001 From: Marko Date: Thu, 19 Nov 2020 16:04:30 +0100 Subject: [PATCH 06/14] AvroEncoder unit tests --- .../Unit/Message/Encoder/AvroEncoderTest.php | 110 +++++++++++++++++- 1 file changed, 108 insertions(+), 2 deletions(-) diff --git a/tests/Unit/Message/Encoder/AvroEncoderTest.php b/tests/Unit/Message/Encoder/AvroEncoderTest.php index 866e43a..8f34725 100644 --- a/tests/Unit/Message/Encoder/AvroEncoderTest.php +++ b/tests/Unit/Message/Encoder/AvroEncoderTest.php @@ -4,15 +4,14 @@ namespace Jobcloud\Kafka\Tests\Unit\Kafka\Message\Encoder; +use FlixTech\AvroSerializer\Objects\Exceptions\AvroEncodingException; use FlixTech\AvroSerializer\Objects\RecordSerializer; use Jobcloud\Kafka\Exception\AvroEncoderException; -use Jobcloud\Kafka\Message\Encoder\AvroEncoderInterface; use Jobcloud\Kafka\Message\KafkaAvroSchemaInterface; use Jobcloud\Kafka\Message\KafkaProducerMessageInterface; use Jobcloud\Kafka\Message\Encoder\AvroEncoder; use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistryInterface; use PHPStan\Testing\TestCase; -use \AvroSchema; /** * @covers \Jobcloud\Kafka\Message\Encoder\AvroEncoder @@ -169,4 +168,111 @@ public function testGetRegistry() self::assertSame($registry, $encoder->getRegistry()); } + + public function testAvroValidatorBodyException() + { + $schemaDefinition = $this->getMockBuilder(\AvroSchema::class)->disableOriginalConstructor()->getMock(); + + $avroSchema = $this->getMockForAbstractClass(KafkaAvroSchemaInterface::class); + $avroSchema->expects(self::exactly(2))->method('getName')->willReturn('schemaName'); + $avroSchema->expects(self::never())->method('getVersion'); + $avroSchema->expects(self::exactly(3))->method('getDefinition')->willReturn($schemaDefinition); + $schemaDefinition->method('to_avro')->willReturn([]); + + $registry = $this->getMockForAbstractClass(AvroSchemaRegistryInterface::class); + $registry->expects(self::once())->method('getBodySchemaForTopic')->willReturn($avroSchema); + $registry->expects(self::once())->method('hasBodySchemaForTopic')->willReturn(true); + + $producerMessage = $this->getMockForAbstractClass(KafkaProducerMessageInterface::class); + $producerMessage->expects(self::once())->method('getTopicName')->willReturn('test'); + $producerMessage->expects(self::once())->method('getBody')->willReturn([]); + + $avroEncodingException = $this->getMockBuilder(AvroEncodingException::class)->disableOriginalConstructor()->getMock(); + $recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock(); + $recordSerializer + ->expects(self::once()) + ->method('encodeRecord') + ->with($avroSchema->getName(), $avroSchema->getDefinition(), []) + ->willReturnOnConsecutiveCalls('encodedValue') + ->willThrowException($avroEncodingException); + + $encoder = new AvroEncoder($registry, $recordSerializer); + + $validator = $this->getMockBuilder('Jobcloud\Avro\Validator\Validator') + ->setMockClassName('Validator') + ->getMock(); + + $recordRegistry = $this->getMockBuilder('Jobcloud\Avro\Validator\RecordRegistry') + ->setMockClassName('RecordRegistry') + ->getMock(); + + $recordRegistry->expects($this->once()) + ->method('fromSchema') + ->with(json_encode([])) + ->willReturn($recordRegistry); + + $validator->expects($this->once()) + ->method('validate') + ->with(json_encode([]), '') + ->willReturn([]); + + self::assertNotSame($producerMessage, $encoder->encode($producerMessage)); + } + + public function testAvroValidatorKeyException() + { + $schemaDefinition = $this->getMockBuilder(\AvroSchema::class)->disableOriginalConstructor()->getMock(); + + $avroSchema = $this->getMockForAbstractClass(KafkaAvroSchemaInterface::class); + $avroSchema->expects(self::exactly(4))->method('getName')->willReturn('schemaName'); + $avroSchema->expects(self::never())->method('getVersion'); + $avroSchema->expects(self::exactly(5))->method('getDefinition')->willReturn($schemaDefinition); + $schemaDefinition->method('to_avro')->willReturn([]); + + $registry = $this->getMockForAbstractClass(AvroSchemaRegistryInterface::class); + $registry->expects(self::once())->method('getBodySchemaForTopic')->willReturn($avroSchema); + $registry->expects(self::once())->method('getKeySchemaForTopic')->willReturn($avroSchema); + $registry->expects(self::once())->method('hasBodySchemaForTopic')->willReturn(true); + $registry->expects(self::once())->method('hasKeySchemaForTopic')->willReturn(true); + + $producerMessage = $this->getMockForAbstractClass(KafkaProducerMessageInterface::class); + $producerMessage->expects(self::exactly(2))->method('getTopicName')->willReturn('test'); + $producerMessage->expects(self::once())->method('getBody')->willReturn([]); + $producerMessage->expects(self::once())->method('getKey')->willReturn('test-key'); + $producerMessage->expects(self::once())->method('withBody')->with('encodedValue')->willReturn($producerMessage); + + $avroEncodingException = $this->getMockBuilder(AvroEncodingException::class)->disableOriginalConstructor()->getMock(); + + $recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock(); + $recordSerializer + ->expects(self::exactly(2)) + ->method('encodeRecord') + ->withConsecutive( + [$avroSchema->getName(), $avroSchema->getDefinition(), []], + [$avroSchema->getName(), $avroSchema->getDefinition(), 'test-key'] + ) + ->willReturnOnConsecutiveCalls('encodedValue', $this->throwException($avroEncodingException)); + + $encoder = new AvroEncoder($registry, $recordSerializer); + + $validator = $this->getMockBuilder('Jobcloud\Avro\Validator\Validator') + ->setMockClassName('Validator') + ->getMock(); + + $recordRegistry = $this->getMockBuilder('Jobcloud\Avro\Validator\RecordRegistry') + ->setMockClassName('RecordRegistry') + ->getMock(); + + $recordRegistry->expects($this->once()) + ->method('fromSchema') + ->with(json_encode([])) + ->willReturn($recordRegistry); + + $validator->expects($this->once()) + ->method('validate') + ->with(json_encode([]), '') + ->willReturn([]); + + self::assertNotSame($producerMessage, $encoder->encode($producerMessage)); + } } From 861665a58e4f270535d2acc60b75c78664272873 Mon Sep 17 00:00:00 2001 From: Marko Date: Mon, 23 Nov 2020 14:40:10 +0100 Subject: [PATCH 07/14] AvroEncoder unit tests --- .../Unit/Message/Encoder/AvroEncoderTest.php | 107 +++++++++++------- 1 file changed, 64 insertions(+), 43 deletions(-) diff --git a/tests/Unit/Message/Encoder/AvroEncoderTest.php b/tests/Unit/Message/Encoder/AvroEncoderTest.php index 8f34725..3adefbc 100644 --- a/tests/Unit/Message/Encoder/AvroEncoderTest.php +++ b/tests/Unit/Message/Encoder/AvroEncoderTest.php @@ -7,6 +7,7 @@ use FlixTech\AvroSerializer\Objects\Exceptions\AvroEncodingException; use FlixTech\AvroSerializer\Objects\RecordSerializer; use Jobcloud\Kafka\Exception\AvroEncoderException; +use Jobcloud\Kafka\Exception\AvroValidatorException; use Jobcloud\Kafka\Message\KafkaAvroSchemaInterface; use Jobcloud\Kafka\Message\KafkaProducerMessageInterface; use Jobcloud\Kafka\Message\Encoder\AvroEncoder; @@ -18,6 +19,36 @@ */ class AvroEncoderTest extends TestCase { + private $avroValidatorClass = "./src/Message/Encoder/AvroEncoder.php"; + + private $originalNamespaces = [ + "Jobcloud\Avro\Validator\RecordRegistry", + "Jobcloud\Avro\Validator\Validator" + ]; + + private $replacedNamespaces = [ + "Jobcloud\Kafka\Tests\Unit\Kafka\Message\Encoder\RecordRegistry", + "Jobcloud\Kafka\Tests\Unit\Kafka\Message\Encoder\Validator" + ]; + + protected function setUp(): void + { + $avroEncoderContent = file_get_contents($this->avroValidatorClass); + + $avroEncoderContent = str_replace($this->originalNamespaces, $this->replacedNamespaces, $avroEncoderContent); + + file_put_contents($this->avroValidatorClass, $avroEncoderContent); + } + + protected function tearDown(): void + { + $avroEncoderContent = file_get_contents($this->avroValidatorClass); + + $avroEncoderContent = str_replace($this->replacedNamespaces, $this->originalNamespaces, $avroEncoderContent); + + file_put_contents($this->avroValidatorClass, $avroEncoderContent); + } + public function testEncodeTombstone() { $producerMessage = $this->getMockForAbstractClass(KafkaProducerMessageInterface::class); @@ -174,10 +205,9 @@ public function testAvroValidatorBodyException() $schemaDefinition = $this->getMockBuilder(\AvroSchema::class)->disableOriginalConstructor()->getMock(); $avroSchema = $this->getMockForAbstractClass(KafkaAvroSchemaInterface::class); - $avroSchema->expects(self::exactly(2))->method('getName')->willReturn('schemaName'); - $avroSchema->expects(self::never())->method('getVersion'); - $avroSchema->expects(self::exactly(3))->method('getDefinition')->willReturn($schemaDefinition); - $schemaDefinition->method('to_avro')->willReturn([]); + $avroSchema->expects(self::once())->method('getName')->willReturn('schemaName'); + $avroSchema->expects(self::exactly(2))->method('getDefinition')->willReturn($schemaDefinition); + $schemaDefinition->method('to_avro')->willReturn(['type' => 'record']); $registry = $this->getMockForAbstractClass(AvroSchemaRegistryInterface::class); $registry->expects(self::once())->method('getBodySchemaForTopic')->willReturn($avroSchema); @@ -185,37 +215,24 @@ public function testAvroValidatorBodyException() $producerMessage = $this->getMockForAbstractClass(KafkaProducerMessageInterface::class); $producerMessage->expects(self::once())->method('getTopicName')->willReturn('test'); - $producerMessage->expects(self::once())->method('getBody')->willReturn([]); + $producerMessage->expects(self::once())->method('getBody')->willReturn(['id' => 123]); - $avroEncodingException = $this->getMockBuilder(AvroEncodingException::class)->disableOriginalConstructor()->getMock(); - $recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock(); + $avroEncodingException = $this->getMockBuilder(AvroEncodingException::class) + ->disableOriginalConstructor() + ->getMock(); + $recordSerializer = $this->getMockBuilder(RecordSerializer::class) + ->disableOriginalConstructor() + ->getMock(); $recordSerializer ->expects(self::once()) ->method('encodeRecord') - ->with($avroSchema->getName(), $avroSchema->getDefinition(), []) ->willReturnOnConsecutiveCalls('encodedValue') - ->willThrowException($avroEncodingException); + ->willThrowException($avroEncodingException); $encoder = new AvroEncoder($registry, $recordSerializer); - $validator = $this->getMockBuilder('Jobcloud\Avro\Validator\Validator') - ->setMockClassName('Validator') - ->getMock(); - - $recordRegistry = $this->getMockBuilder('Jobcloud\Avro\Validator\RecordRegistry') - ->setMockClassName('RecordRegistry') - ->getMock(); - - $recordRegistry->expects($this->once()) - ->method('fromSchema') - ->with(json_encode([])) - ->willReturn($recordRegistry); - - $validator->expects($this->once()) - ->method('validate') - ->with(json_encode([]), '') - ->willReturn([]); - + self::expectException(AvroValidatorException::class); + self::expectExceptionMessage(json_encode(['test' => 'test'])); self::assertNotSame($producerMessage, $encoder->encode($producerMessage)); } @@ -255,24 +272,28 @@ public function testAvroValidatorKeyException() $encoder = new AvroEncoder($registry, $recordSerializer); - $validator = $this->getMockBuilder('Jobcloud\Avro\Validator\Validator') - ->setMockClassName('Validator') - ->getMock(); + self::expectException(AvroValidatorException::class); + self::expectExceptionMessage(json_encode(['test' => 'test'])); + self::assertNotSame($producerMessage, $encoder->encode($producerMessage)); + } +} - $recordRegistry = $this->getMockBuilder('Jobcloud\Avro\Validator\RecordRegistry') - ->setMockClassName('RecordRegistry') - ->getMock(); +class RecordRegistry { + public function fromSchema(string $schema): string + { + return $schema; + } +} - $recordRegistry->expects($this->once()) - ->method('fromSchema') - ->with(json_encode([])) - ->willReturn($recordRegistry); +class Validator { + public function validate(): array + { + return [ + 'test' => 'test', + ]; + } +} - $validator->expects($this->once()) - ->method('validate') - ->with(json_encode([]), '') - ->willReturn([]); +class AvroValidationException { - self::assertNotSame($producerMessage, $encoder->encode($producerMessage)); - } } From 48b800edda0f022b2f5b64ee251f8b529d7b5503 Mon Sep 17 00:00:00 2001 From: Marko Date: Mon, 23 Nov 2020 17:20:50 +0100 Subject: [PATCH 08/14] Fix phpstan errors --- src/Message/Encoder/AvroEncoder.php | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/Message/Encoder/AvroEncoder.php b/src/Message/Encoder/AvroEncoder.php index fcab6bf..41e6079 100644 --- a/src/Message/Encoder/AvroEncoder.php +++ b/src/Message/Encoder/AvroEncoder.php @@ -46,8 +46,10 @@ public function __construct( /** * @param KafkaProducerMessageInterface $producerMessage * @return KafkaProducerMessageInterface - * @throws SchemaRegistryException * @throws AvroValidatorException + * @throws RecordRegistryException + * @throws SchemaRegistryException + * @throws ValidatorException */ public function encode(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface { @@ -79,6 +81,7 @@ private function encodeBody(KafkaProducerMessageInterface $producerMessage): Kaf $avroSchema = $this->registry->getBodySchemaForTopic($topicName); + $encodedBody = null; try { $encodedBody = $this->recordSerializer->encodeRecord( $avroSchema->getName(), @@ -123,6 +126,7 @@ private function encodeKey(KafkaProducerMessageInterface $producerMessage): Kafk $avroSchema = $this->registry->getKeySchemaForTopic($topicName); + $encodedKey = null; try { $encodedKey = $this->recordSerializer->encodeRecord( $avroSchema->getName(), @@ -169,7 +173,7 @@ public function getRegistry(): AvroSchemaRegistryInterface } /** - * @param array $avroSchema + * @param array $avroSchema * @param mixed $data * @param string $topicName * @return array From 1667f9cd52c8f258c76d3f273685c9b4ad28bf53 Mon Sep 17 00:00:00 2001 From: Marko Date: Wed, 25 Nov 2020 12:04:47 +0100 Subject: [PATCH 09/14] Ignore phpstan errors --- phpstan.neon | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/phpstan.neon b/phpstan.neon index c7b1758..d15f039 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -1,3 +1,12 @@ parameters: level: 8 paths: [ src ] + ignoreErrors: + - '#Comparison operation ">" between int and RdKafka\\TopicPartition results in an error.#' + - '#Call to method validate\(\) on an unknown class Jobcloud\\Avro\\Validator\\Validator#' + - '#Call to static method fromSchema\(\) on an unknown class Jobcloud\\Avro\\Validator\\RecordRegistry#' + - '#PHPDoc tag @throws with type [a-zA-Z0-9\\_]+#' + - '#Parameter \#[0-9] [a-z\$]+ of method [a-zA-Z0-9\\_]+::setOffset\(\) expects string, int given.#' + - '#Parameter \#[0-9] [a-z\$]+ of class Jobcloud\\Kafka\\Exception\\AvroValidatorException constructor expects string, string\|false given.#' + - '#Cannot call method to_avro\(\) on AvroSchema\|null.#' + - '#Instantiated class Jobcloud\\Avro\\Validator\\Validator not found.#' \ No newline at end of file From e8b6ae9f92b41432f92ddb46c52863cff8cb1b55 Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 25 Nov 2020 14:59:49 +0100 Subject: [PATCH 10/14] Update phpstan.neon --- phpstan.neon | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/phpstan.neon b/phpstan.neon index d15f039..e9c6515 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -9,4 +9,4 @@ parameters: - '#Parameter \#[0-9] [a-z\$]+ of method [a-zA-Z0-9\\_]+::setOffset\(\) expects string, int given.#' - '#Parameter \#[0-9] [a-z\$]+ of class Jobcloud\\Kafka\\Exception\\AvroValidatorException constructor expects string, string\|false given.#' - '#Cannot call method to_avro\(\) on AvroSchema\|null.#' - - '#Instantiated class Jobcloud\\Avro\\Validator\\Validator not found.#' \ No newline at end of file + - '#Instantiated class Jobcloud\\Avro\\Validator\\Validator not found.#' From 4a7393d1654e2119b9f4c498edce16a8dc611f94 Mon Sep 17 00:00:00 2001 From: Marko Date: Wed, 25 Nov 2020 15:06:44 +0100 Subject: [PATCH 11/14] Ignore phpstan errors --- phpstan.neon | 2 -- 1 file changed, 2 deletions(-) diff --git a/phpstan.neon b/phpstan.neon index e9c6515..fe18403 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -3,10 +3,8 @@ parameters: paths: [ src ] ignoreErrors: - '#Comparison operation ">" between int and RdKafka\\TopicPartition results in an error.#' - - '#Call to method validate\(\) on an unknown class Jobcloud\\Avro\\Validator\\Validator#' - '#Call to static method fromSchema\(\) on an unknown class Jobcloud\\Avro\\Validator\\RecordRegistry#' - '#PHPDoc tag @throws with type [a-zA-Z0-9\\_]+#' - '#Parameter \#[0-9] [a-z\$]+ of method [a-zA-Z0-9\\_]+::setOffset\(\) expects string, int given.#' - '#Parameter \#[0-9] [a-z\$]+ of class Jobcloud\\Kafka\\Exception\\AvroValidatorException constructor expects string, string\|false given.#' - '#Cannot call method to_avro\(\) on AvroSchema\|null.#' - - '#Instantiated class Jobcloud\\Avro\\Validator\\Validator not found.#' From 030054c54092910fc75136ea4342faf58f04c1be Mon Sep 17 00:00:00 2001 From: Marko Date: Wed, 25 Nov 2020 15:07:24 +0100 Subject: [PATCH 12/14] AvroEncoder changes --- src/Message/Encoder/AvroEncoder.php | 68 +++++++++++------------------ 1 file changed, 26 insertions(+), 42 deletions(-) diff --git a/src/Message/Encoder/AvroEncoder.php b/src/Message/Encoder/AvroEncoder.php index 41e6079..1793a43 100644 --- a/src/Message/Encoder/AvroEncoder.php +++ b/src/Message/Encoder/AvroEncoder.php @@ -81,24 +81,7 @@ private function encodeBody(KafkaProducerMessageInterface $producerMessage): Kaf $avroSchema = $this->registry->getBodySchemaForTopic($topicName); - $encodedBody = null; - try { - $encodedBody = $this->recordSerializer->encodeRecord( - $avroSchema->getName(), - $this->getAvroSchemaDefinition($avroSchema), - $body - ); - } catch (AvroEncodingException $exception) { - if (class_exists(Validator::class)) { - $validationErrors = $this->validateSchema( - $avroSchema->getDefinition()->to_avro(), - $body, - $topicName - ); - - throw new AvroValidatorException(json_encode($validationErrors)); - } - } + $encodedBody = $this->encodeRecord($avroSchema, $body, $topicName); return $producerMessage->withBody($encodedBody); } @@ -126,24 +109,7 @@ private function encodeKey(KafkaProducerMessageInterface $producerMessage): Kafk $avroSchema = $this->registry->getKeySchemaForTopic($topicName); - $encodedKey = null; - try { - $encodedKey = $this->recordSerializer->encodeRecord( - $avroSchema->getName(), - $this->getAvroSchemaDefinition($avroSchema), - $key - ); - } catch (AvroEncodingException $exception) { - if (class_exists(Validator::class)) { - $validationErrors = $this->validateSchema( - $avroSchema->getDefinition()->to_avro(), - $key, - $topicName - ); - - throw new AvroValidatorException(json_encode($validationErrors)); - } - } + $encodedKey = $this->encodeRecord($avroSchema, $key, $topicName); return $producerMessage->withKey($encodedKey); } @@ -173,18 +139,36 @@ public function getRegistry(): AvroSchemaRegistryInterface } /** - * @param array $avroSchema + * @param KafkaAvroSchemaInterface $avroSchema * @param mixed $data * @param string $topicName - * @return array + * @return string * @throws RecordRegistryException + * @throws SchemaRegistryException * @throws ValidatorException + * @throws AvroValidatorException */ - private function validateSchema(array $avroSchema, $data, string $topicName): array + private function encodeRecord(KafkaAvroSchemaInterface $avroSchema, $data, string $topicName): string { - $recordRegistry = RecordRegistry::fromSchema(json_encode($avroSchema)); - $validator = new Validator($recordRegistry); + try { + $encodedData = $this->recordSerializer->encodeRecord( + $avroSchema->getName(), + $this->getAvroSchemaDefinition($avroSchema), + $data + ); + } catch (AvroEncodingException $exception) { + if (class_exists(Validator::class)) { + $recordRegistry = RecordRegistry::fromSchema(json_encode($avroSchema->getDefinition()->to_avro())); + $validator = new Validator($recordRegistry); + + $validationErrors = $validator->validate(json_encode($data), $topicName); + + throw new AvroValidatorException(json_encode($validationErrors)); + } + + throw $exception; + } - return $validator->validate(json_encode($data), $topicName); + return $encodedData; } } From ed2fb9cbec34909cfa77627c374ac5ac039ecb70 Mon Sep 17 00:00:00 2001 From: Marko Date: Wed, 25 Nov 2020 16:49:55 +0100 Subject: [PATCH 13/14] Fix phpstan discussions --- phpstan.neon | 3 --- src/Message/Encoder/AvroEncoder.php | 14 ++++---------- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/phpstan.neon b/phpstan.neon index fe18403..f1f6a07 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -4,7 +4,4 @@ parameters: ignoreErrors: - '#Comparison operation ">" between int and RdKafka\\TopicPartition results in an error.#' - '#Call to static method fromSchema\(\) on an unknown class Jobcloud\\Avro\\Validator\\RecordRegistry#' - - '#PHPDoc tag @throws with type [a-zA-Z0-9\\_]+#' - '#Parameter \#[0-9] [a-z\$]+ of method [a-zA-Z0-9\\_]+::setOffset\(\) expects string, int given.#' - - '#Parameter \#[0-9] [a-z\$]+ of class Jobcloud\\Kafka\\Exception\\AvroValidatorException constructor expects string, string\|false given.#' - - '#Cannot call method to_avro\(\) on AvroSchema\|null.#' diff --git a/src/Message/Encoder/AvroEncoder.php b/src/Message/Encoder/AvroEncoder.php index 1793a43..17be042 100644 --- a/src/Message/Encoder/AvroEncoder.php +++ b/src/Message/Encoder/AvroEncoder.php @@ -47,9 +47,7 @@ public function __construct( * @param KafkaProducerMessageInterface $producerMessage * @return KafkaProducerMessageInterface * @throws AvroValidatorException - * @throws RecordRegistryException * @throws SchemaRegistryException - * @throws ValidatorException */ public function encode(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface { @@ -62,9 +60,7 @@ public function encode(KafkaProducerMessageInterface $producerMessage): KafkaPro * @param KafkaProducerMessageInterface $producerMessage * @return KafkaProducerMessageInterface * @throws AvroValidatorException - * @throws RecordRegistryException * @throws SchemaRegistryException - * @throws ValidatorException */ private function encodeBody(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface { @@ -90,9 +86,7 @@ private function encodeBody(KafkaProducerMessageInterface $producerMessage): Kaf * @param KafkaProducerMessageInterface $producerMessage * @return KafkaProducerMessageInterface * @throws AvroValidatorException - * @throws RecordRegistryException * @throws SchemaRegistryException - * @throws ValidatorException */ private function encodeKey(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface { @@ -143,9 +137,7 @@ public function getRegistry(): AvroSchemaRegistryInterface * @param mixed $data * @param string $topicName * @return string - * @throws RecordRegistryException * @throws SchemaRegistryException - * @throws ValidatorException * @throws AvroValidatorException */ private function encodeRecord(KafkaAvroSchemaInterface $avroSchema, $data, string $topicName): string @@ -158,12 +150,14 @@ private function encodeRecord(KafkaAvroSchemaInterface $avroSchema, $data, strin ); } catch (AvroEncodingException $exception) { if (class_exists(Validator::class)) { - $recordRegistry = RecordRegistry::fromSchema(json_encode($avroSchema->getDefinition()->to_avro())); + /** @var AvroSchema $schemaDefinition */ + $schemaDefinition = $avroSchema->getDefinition(); + $recordRegistry = RecordRegistry::fromSchema(json_encode($schemaDefinition->to_avro())); $validator = new Validator($recordRegistry); $validationErrors = $validator->validate(json_encode($data), $topicName); - throw new AvroValidatorException(json_encode($validationErrors)); + throw new AvroValidatorException((string) json_encode($validationErrors)); } throw $exception; From 67d0cb4c419ed7d506a0eecadec8ba748dd9362c Mon Sep 17 00:00:00 2001 From: Marko Date: Thu, 26 Nov 2020 11:21:15 +0100 Subject: [PATCH 14/14] AvroEncoder unit test --- .../Unit/Message/Encoder/AvroEncoderTest.php | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/tests/Unit/Message/Encoder/AvroEncoderTest.php b/tests/Unit/Message/Encoder/AvroEncoderTest.php index 3adefbc..c844930 100644 --- a/tests/Unit/Message/Encoder/AvroEncoderTest.php +++ b/tests/Unit/Message/Encoder/AvroEncoderTest.php @@ -200,6 +200,45 @@ public function testGetRegistry() self::assertSame($registry, $encoder->getRegistry()); } + public function testAvroEncodingException() + { + $schemaDefinition = $this->getMockBuilder(\AvroSchema::class)->disableOriginalConstructor()->getMock(); + + $avroSchema = $this->getMockForAbstractClass(KafkaAvroSchemaInterface::class); + $avroSchema->expects(self::exactly(4))->method('getName')->willReturn('schemaName'); + $avroSchema->expects(self::never())->method('getVersion'); + $avroSchema->expects(self::exactly(4))->method('getDefinition')->willReturn($schemaDefinition); + + $registry = $this->getMockForAbstractClass(AvroSchemaRegistryInterface::class); + $registry->expects(self::once())->method('getBodySchemaForTopic')->willReturn($avroSchema); + $registry->expects(self::once())->method('getKeySchemaForTopic')->willReturn($avroSchema); + $registry->expects(self::once())->method('hasBodySchemaForTopic')->willReturn(true); + $registry->expects(self::once())->method('hasKeySchemaForTopic')->willReturn(true); + + $producerMessage = $this->getMockForAbstractClass(KafkaProducerMessageInterface::class); + $producerMessage->expects(self::exactly(2))->method('getTopicName')->willReturn('test'); + $producerMessage->expects(self::once())->method('getBody')->willReturn([]); + $producerMessage->expects(self::once())->method('getKey')->willReturn('test-key'); + $producerMessage->expects(self::once())->method('withBody')->with('encodedValue')->willReturn($producerMessage); + + $avroEncodingException = $this->getMockBuilder(AvroEncodingException::class)->disableOriginalConstructor()->getMock(); + + $recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock(); + $recordSerializer + ->expects(self::exactly(2)) + ->method('encodeRecord') + ->withConsecutive( + [$avroSchema->getName(), $avroSchema->getDefinition(), []], + [$avroSchema->getName(), $avroSchema->getDefinition(), 'test-key'] + ) + ->willReturnOnConsecutiveCalls('encodedValue', $this->throwException($avroEncodingException)); + + $encoder = new AvroEncoder($registry, $recordSerializer); + + self::expectException(AvroEncodingException::class); + self::assertNotSame($producerMessage, $encoder->encode($producerMessage)); + } + public function testAvroValidatorBodyException() { $schemaDefinition = $this->getMockBuilder(\AvroSchema::class)->disableOriginalConstructor()->getMock();