diff --git a/composer.json b/composer.json index d53e9bb..44982a6 100644 --- a/composer.json +++ b/composer.json @@ -38,7 +38,8 @@ } }, "suggest": { - "flix-tech/avro-serde-php": "Is needed for Avro support" + "flix-tech/avro-serde-php": "Is needed for Avro support", + "jobcloud/avro-validator": "Useful for debug purposes in development, not recommended for production use" }, "extra": { "branch-alias": { diff --git a/phpstan.neon b/phpstan.neon index c7b1758..f1f6a07 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -1,3 +1,7 @@ parameters: level: 8 paths: [ src ] + ignoreErrors: + - '#Comparison operation ">" between int and RdKafka\\TopicPartition results in an error.#' + - '#Call to static method fromSchema\(\) on an unknown class Jobcloud\\Avro\\Validator\\RecordRegistry#' + - '#Parameter \#[0-9] [a-z\$]+ of method [a-zA-Z0-9\\_]+::setOffset\(\) expects string, int given.#' diff --git a/src/Exception/AvroValidatorException.php b/src/Exception/AvroValidatorException.php new file mode 100644 index 0000000..b838517 --- /dev/null +++ b/src/Exception/AvroValidatorException.php @@ -0,0 +1,10 @@ +registry->getBodySchemaForTopic($topicName); - $encodedBody = $this->recordSerializer->encodeRecord( - $avroSchema->getName(), - $this->getAvroSchemaDefinition($avroSchema), - $body - ); + $encodedBody = $this->encodeRecord($avroSchema, $body, $topicName); return $producerMessage->withBody($encodedBody); } @@ -82,6 +85,7 @@ private function encodeBody(KafkaProducerMessageInterface $producerMessage): Kaf /** * @param KafkaProducerMessageInterface $producerMessage * @return KafkaProducerMessageInterface + * @throws AvroValidatorException * @throws SchemaRegistryException */ private function encodeKey(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface @@ -99,11 +103,7 @@ private function encodeKey(KafkaProducerMessageInterface $producerMessage): Kafk $avroSchema = $this->registry->getKeySchemaForTopic($topicName); - $encodedKey = $this->recordSerializer->encodeRecord( - $avroSchema->getName(), - $this->getAvroSchemaDefinition($avroSchema), - $key - ); + $encodedKey = $this->encodeRecord($avroSchema, $key, $topicName); return $producerMessage->withKey($encodedKey); } @@ -131,4 +131,38 @@ public function getRegistry(): AvroSchemaRegistryInterface { return $this->registry; } + + /** + * @param KafkaAvroSchemaInterface $avroSchema + * @param mixed $data + * @param string $topicName + * @return string + * @throws SchemaRegistryException + * @throws AvroValidatorException + */ + private function encodeRecord(KafkaAvroSchemaInterface $avroSchema, $data, string $topicName): string + { + try { + $encodedData = $this->recordSerializer->encodeRecord( + $avroSchema->getName(), + $this->getAvroSchemaDefinition($avroSchema), + $data + ); + } catch (AvroEncodingException $exception) { + if (class_exists(Validator::class)) { + /** @var AvroSchema $schemaDefinition */ + $schemaDefinition = $avroSchema->getDefinition(); + $recordRegistry = RecordRegistry::fromSchema(json_encode($schemaDefinition->to_avro())); + $validator = new Validator($recordRegistry); + + $validationErrors = $validator->validate(json_encode($data), $topicName); + + throw new AvroValidatorException((string) json_encode($validationErrors)); + } + + throw $exception; + } + + return $encodedData; + } } diff --git a/tests/Unit/Message/Encoder/AvroEncoderTest.php b/tests/Unit/Message/Encoder/AvroEncoderTest.php index 866e43a..c844930 100644 --- a/tests/Unit/Message/Encoder/AvroEncoderTest.php +++ b/tests/Unit/Message/Encoder/AvroEncoderTest.php @@ -4,21 +4,51 @@ namespace Jobcloud\Kafka\Tests\Unit\Kafka\Message\Encoder; +use FlixTech\AvroSerializer\Objects\Exceptions\AvroEncodingException; use FlixTech\AvroSerializer\Objects\RecordSerializer; use Jobcloud\Kafka\Exception\AvroEncoderException; -use Jobcloud\Kafka\Message\Encoder\AvroEncoderInterface; +use Jobcloud\Kafka\Exception\AvroValidatorException; use Jobcloud\Kafka\Message\KafkaAvroSchemaInterface; use Jobcloud\Kafka\Message\KafkaProducerMessageInterface; use Jobcloud\Kafka\Message\Encoder\AvroEncoder; use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistryInterface; use PHPStan\Testing\TestCase; -use \AvroSchema; /** * @covers \Jobcloud\Kafka\Message\Encoder\AvroEncoder */ class AvroEncoderTest extends TestCase { + private $avroValidatorClass = "./src/Message/Encoder/AvroEncoder.php"; + + private $originalNamespaces = [ + "Jobcloud\Avro\Validator\RecordRegistry", + "Jobcloud\Avro\Validator\Validator" + ]; + + private $replacedNamespaces = [ + "Jobcloud\Kafka\Tests\Unit\Kafka\Message\Encoder\RecordRegistry", + "Jobcloud\Kafka\Tests\Unit\Kafka\Message\Encoder\Validator" + ]; + + protected function setUp(): void + { + $avroEncoderContent = file_get_contents($this->avroValidatorClass); + + $avroEncoderContent = str_replace($this->originalNamespaces, $this->replacedNamespaces, $avroEncoderContent); + + file_put_contents($this->avroValidatorClass, $avroEncoderContent); + } + + protected function tearDown(): void + { + $avroEncoderContent = file_get_contents($this->avroValidatorClass); + + $avroEncoderContent = str_replace($this->replacedNamespaces, $this->originalNamespaces, $avroEncoderContent); + + file_put_contents($this->avroValidatorClass, $avroEncoderContent); + } + public function testEncodeTombstone() { $producerMessage = $this->getMockForAbstractClass(KafkaProducerMessageInterface::class); @@ -169,4 +199,140 @@ public function testGetRegistry() self::assertSame($registry, $encoder->getRegistry()); } + + public function testAvroEncodingException() + { + $schemaDefinition = $this->getMockBuilder(\AvroSchema::class)->disableOriginalConstructor()->getMock(); + + $avroSchema = $this->getMockForAbstractClass(KafkaAvroSchemaInterface::class); + $avroSchema->expects(self::exactly(4))->method('getName')->willReturn('schemaName'); + $avroSchema->expects(self::never())->method('getVersion'); + $avroSchema->expects(self::exactly(4))->method('getDefinition')->willReturn($schemaDefinition); + + $registry = $this->getMockForAbstractClass(AvroSchemaRegistryInterface::class); + $registry->expects(self::once())->method('getBodySchemaForTopic')->willReturn($avroSchema); + $registry->expects(self::once())->method('getKeySchemaForTopic')->willReturn($avroSchema); + $registry->expects(self::once())->method('hasBodySchemaForTopic')->willReturn(true); + $registry->expects(self::once())->method('hasKeySchemaForTopic')->willReturn(true); + + $producerMessage = $this->getMockForAbstractClass(KafkaProducerMessageInterface::class); + $producerMessage->expects(self::exactly(2))->method('getTopicName')->willReturn('test'); + $producerMessage->expects(self::once())->method('getBody')->willReturn([]); + $producerMessage->expects(self::once())->method('getKey')->willReturn('test-key'); + $producerMessage->expects(self::once())->method('withBody')->with('encodedValue')->willReturn($producerMessage); + + $avroEncodingException = $this->getMockBuilder(AvroEncodingException::class)->disableOriginalConstructor()->getMock(); + + $recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock(); + $recordSerializer + ->expects(self::exactly(2)) + ->method('encodeRecord') + ->withConsecutive( + [$avroSchema->getName(), $avroSchema->getDefinition(), []], + [$avroSchema->getName(), $avroSchema->getDefinition(), 'test-key'] + ) + ->willReturnOnConsecutiveCalls('encodedValue', $this->throwException($avroEncodingException)); + + $encoder = new AvroEncoder($registry, $recordSerializer); + + self::expectException(AvroEncodingException::class); + self::assertNotSame($producerMessage, $encoder->encode($producerMessage)); + } + + public function testAvroValidatorBodyException() + { + $schemaDefinition = $this->getMockBuilder(\AvroSchema::class)->disableOriginalConstructor()->getMock(); + + $avroSchema = $this->getMockForAbstractClass(KafkaAvroSchemaInterface::class); + $avroSchema->expects(self::once())->method('getName')->willReturn('schemaName'); + $avroSchema->expects(self::exactly(2))->method('getDefinition')->willReturn($schemaDefinition); + $schemaDefinition->method('to_avro')->willReturn(['type' => 'record']); + + $registry = $this->getMockForAbstractClass(AvroSchemaRegistryInterface::class); + $registry->expects(self::once())->method('getBodySchemaForTopic')->willReturn($avroSchema); + $registry->expects(self::once())->method('hasBodySchemaForTopic')->willReturn(true); + + $producerMessage = $this->getMockForAbstractClass(KafkaProducerMessageInterface::class); + $producerMessage->expects(self::once())->method('getTopicName')->willReturn('test'); + $producerMessage->expects(self::once())->method('getBody')->willReturn(['id' => 123]); + + $avroEncodingException = $this->getMockBuilder(AvroEncodingException::class) + ->disableOriginalConstructor() + ->getMock(); + $recordSerializer = $this->getMockBuilder(RecordSerializer::class) + ->disableOriginalConstructor() + ->getMock(); + $recordSerializer + ->expects(self::once()) + ->method('encodeRecord') + ->willReturnOnConsecutiveCalls('encodedValue') + ->willThrowException($avroEncodingException); + + $encoder = new AvroEncoder($registry, $recordSerializer); + + self::expectException(AvroValidatorException::class); + self::expectExceptionMessage(json_encode(['test' => 'test'])); + self::assertNotSame($producerMessage, $encoder->encode($producerMessage)); + } + + public function testAvroValidatorKeyException() + { + $schemaDefinition = $this->getMockBuilder(\AvroSchema::class)->disableOriginalConstructor()->getMock(); + + $avroSchema = $this->getMockForAbstractClass(KafkaAvroSchemaInterface::class); + $avroSchema->expects(self::exactly(4))->method('getName')->willReturn('schemaName'); + $avroSchema->expects(self::never())->method('getVersion'); + $avroSchema->expects(self::exactly(5))->method('getDefinition')->willReturn($schemaDefinition); + $schemaDefinition->method('to_avro')->willReturn([]); + + $registry = $this->getMockForAbstractClass(AvroSchemaRegistryInterface::class); + $registry->expects(self::once())->method('getBodySchemaForTopic')->willReturn($avroSchema); + $registry->expects(self::once())->method('getKeySchemaForTopic')->willReturn($avroSchema); + $registry->expects(self::once())->method('hasBodySchemaForTopic')->willReturn(true); + $registry->expects(self::once())->method('hasKeySchemaForTopic')->willReturn(true); + + $producerMessage = $this->getMockForAbstractClass(KafkaProducerMessageInterface::class); + $producerMessage->expects(self::exactly(2))->method('getTopicName')->willReturn('test'); + $producerMessage->expects(self::once())->method('getBody')->willReturn([]); + $producerMessage->expects(self::once())->method('getKey')->willReturn('test-key'); + $producerMessage->expects(self::once())->method('withBody')->with('encodedValue')->willReturn($producerMessage); + + $avroEncodingException = $this->getMockBuilder(AvroEncodingException::class)->disableOriginalConstructor()->getMock(); + + $recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock(); + $recordSerializer + ->expects(self::exactly(2)) + ->method('encodeRecord') + ->withConsecutive( + [$avroSchema->getName(), $avroSchema->getDefinition(), []], + [$avroSchema->getName(), $avroSchema->getDefinition(), 'test-key'] + ) + ->willReturnOnConsecutiveCalls('encodedValue', $this->throwException($avroEncodingException)); + + $encoder = new AvroEncoder($registry, $recordSerializer); + + self::expectException(AvroValidatorException::class); + self::expectExceptionMessage(json_encode(['test' => 'test'])); + self::assertNotSame($producerMessage, $encoder->encode($producerMessage)); + } +} + +class RecordRegistry { + public function fromSchema(string $schema): string + { + return $schema; + } +} + +class Validator { + public function validate(): array + { + return [ + 'test' => 'test', + ]; + } +} + +class AvroValidationException { + }