From 20381c9153cddade4f7f17e18ffe6e2024a566bd Mon Sep 17 00:00:00 2001 From: Lei Lu Date: Mon, 7 Oct 2024 19:08:00 -0700 Subject: [PATCH 1/2] [compat][server][client][test] Global RT DIV improvement (part 2): Chunking support for DIV message This change mainly focuses on adding chunking support for DIV messages when they are produced to Kafka topics, as the size of DIV message can be large. We leverage today's chunking mechanism for regular records and extend it to support DIV with the following modifications: 1. All the DIV messages are of type {@link MessageType#CONTROL_MESSAGE_DIV} in its KafkaKey and their corresponding KafkaMessageEnvelope uses Put as the payload. 2. Inside the Put payload, the actual message is stored in the putValue field and the schemaId can have the following cases: - If the DIV message is non-chunked, the schemaId is set to GLOBAL_DIV_STATE. - If the DIV message is chunk message, the schemaId is set to CHUNK. - If the DIV message is a chunk manifest message, the schemaId is set to CHUNKED_VALUE_MANIFEST. 3. ChunkAssembler is adapted, on the receiver side, to buffer, assemble, and deserialize DIV messages (chunked/non-chunked). --- .../LeaderFollowerStoreIngestionTask.java | 14 + .../kafka/consumer/StoreIngestionTask.java | 53 ++++ .../davinci/utils/ChunkAssembler.java | 85 ++++-- .../consumer/StoreIngestionTaskTest.java | 110 ++++++++ .../kafka/protocol/enums/MessageType.java | 5 +- .../com/linkedin/venice/message/KafkaKey.java | 7 + .../pubsub/api/PubSubMessageDeserializer.java | 1 + .../avro/AvroProtocolDefinition.java | 5 +- .../linkedin/venice/writer/VeniceWriter.java | 262 ++++++++++++++---- .../venice/writer/WriterChunkingHelper.java | 32 ++- .../GlobalDivState/v-15/GlobalDivState.avsc | 85 ++++++ .../kafka/protocol/enums/MessageTypeTest.java | 1 + .../venice/writer/VeniceWriterUnitTest.java | 60 ++++ .../venice/endToEnd/TestGlobalDiv.java | 197 +++++++++++++ 14 files changed, 841 insertions(+), 76 deletions(-) create mode 100644 internal/venice-common/src/main/resources/avro/GlobalDivState/v-15/GlobalDivState.avsc create mode 100644 internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestGlobalDiv.java diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index aad9b9be71..105bce6938 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -1803,6 +1803,20 @@ protected boolean shouldProcessRecord(PubSubMessage waitForStateVersion(kafkaVersionTopic)); this.chunkAssembler = new ChunkAssembler(storeName); + this.divChunkAssembler = new ChunkAssembler(storeName); this.cacheBackend = cacheBackend; // Ensure getRecordTransformer does not return null @@ -1075,6 +1080,13 @@ private int handleSingleMessage( record.getTopicPartition().getPartitionNumber(), partitionConsumptionStateMap.get(topicPartition.getPartitionNumber())); } + } else if (record.getKey().isDivControlMessage()) { + // This is a control message from the DIV topic, process it and return early. + // TODO: This is a placeholder for the actual implementation. + if (isGlobalRtDivEnabled) { + processDivControlMessage(record); + } + return 0; } // This function may modify the original record in KME and it is unsafe to use the payload from KME directly after @@ -1119,6 +1131,28 @@ private int handleSingleMessage( return record.getPayloadSize(); } + void processDivControlMessage(PubSubMessage record) { + KafkaKey key = record.getKey(); + KafkaMessageEnvelope value = record.getValue(); + Put put = (Put) value.getPayloadUnion(); + + Object assembledObject = divChunkAssembler.bufferAndAssembleRecord( + record.getTopicPartition(), + put.getSchemaId(), + key.getKey(), + put.getPutValue(), + record.getOffset(), + GLOBAL_DIV_STATE, + put.getSchemaId(), + new NoopCompressor()); + + // If the assembled object is null, it means that the object is not yet fully assembled, so we can return early. + if (assembledObject == null) { + return; + } + // TODO: We will add the code to process DIV control message later in here. + } + /** * This function is in charge of producing the consumer records to the writer buffers maintained by {@link StoreBufferService}. * @@ -2319,6 +2353,15 @@ protected boolean shouldProcessRecord(PubSubMessage T bufferAndAssembleRecord( PubSubTopicPartition pubSubTopicPartition, int schemaId, byte[] keyBytes, ByteBuffer valueBytes, long recordOffset, - Lazy> recordDeserializer, + Object deserializer, int readerSchemaId, VeniceCompressor compressor) { T assembledRecord = null; @@ -60,33 +75,33 @@ public T bufferAndAssembleRecord( } // If this is a record chunk, store the chunk and return null for processing this record if (schemaId == AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion()) { + // We need to extract data from valueBytes, otherwise it could contain non-data in the array. inMemoryStorageEngine.put( pubSubTopicPartition.getPartitionNumber(), keyBytes, - ValueRecord.create(schemaId, valueBytes.array()).serialize()); + ValueRecord.create(schemaId, ByteUtils.extractByteArray(valueBytes)).serialize()); return null; - } else if (schemaId == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) { + } + + if (schemaId == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) { // This is the last value. Store it, and now read it from the in memory store as a fully assembled value inMemoryStorageEngine.put( pubSubTopicPartition.getPartitionNumber(), keyBytes, - ValueRecord.create(schemaId, valueBytes.array()).serialize()); + ValueRecord.create(schemaId, ByteUtils.extractByteArray(valueBytes)).serialize()); try { - assembledRecord = decompressAndDeserialize( - recordDeserializer.get(), + valueBytes = RawBytesChunkingAdapter.INSTANCE.get( + inMemoryStorageEngine, + pubSubTopicPartition.getPartitionNumber(), + ByteBuffer.wrap(keyBytes), + false, + null, + null, + NoOpReadResponseStats.SINGLETON, + readerSchemaId, + RawBytesStoreDeserializerCache.getInstance(), compressor, - RawBytesChunkingAdapter.INSTANCE.get( - inMemoryStorageEngine, - pubSubTopicPartition.getPartitionNumber(), - ByteBuffer.wrap(keyBytes), - false, - null, - null, - NoOpReadResponseStats.SINGLETON, - readerSchemaId, - RawBytesStoreDeserializerCache.getInstance(), - compressor, - null)); + null); } catch (Exception ex) { // We might get an exception if we haven't persisted all the chunks for a given key. This // can actually happen if the client seeks to the middle of a chunked record either by @@ -95,17 +110,26 @@ public T bufferAndAssembleRecord( LOGGER.warn( "Encountered error assembling chunked record, this can happen when seeking between chunked records. Skipping offset {} on topic {}", recordOffset, - pubSubTopicPartition.getPubSubTopic().getName()); - } - } else { - // this is a fully specified record, no need to buffer and assemble it, just decompress and deserialize it - try { - assembledRecord = decompressAndDeserialize(recordDeserializer.get(), compressor, valueBytes); - } catch (Exception e) { - throw new RuntimeException(e); + pubSubTopicPartition.getPubSubTopic().getName(), + ex); } } + /** + * We have two types of deserializers that we support here. One is the lazy deserializer which is used for + * chunked records, and the other is the AvroProtocolDefinition which is currently used for Global DIV. + */ + try { + if (deserializer instanceof Lazy) { + assembledRecord = + decompressAndDeserialize(((Lazy>) deserializer).get(), compressor, valueBytes); + } else if (deserializer instanceof AvroProtocolDefinition) { + AvroProtocolDefinition protocol = (AvroProtocolDefinition) deserializer; + assembledRecord = decompressAndDeserialize(protocol, compressor, valueBytes); + } + } catch (Exception e) { + throw new RuntimeException(e); + } // We only buffer one record at a time for a given partition. If we've made it this far // we either just finished assembling a large record, or, didn't specify anything. So we'll clear // the cache. Kafka might give duplicate delivery, but it won't give out of order delivery, so @@ -121,6 +145,15 @@ protected T decompressAndDeserialize( return deserializer.deserialize(compressor.decompress(value)); } + protected T decompressAndDeserialize( + AvroProtocolDefinition protocol, + VeniceCompressor compressor, + ByteBuffer value) throws IOException { + InternalAvroSpecificSerializer deserializer = protocol.getSerializer(); + return deserializer + .deserialize(ByteUtils.extractByteArray(compressor.decompress(value)), protocol.getCurrentProtocolVersion()); + } + public void clearInMemoryDB() { inMemoryStorageEngine.drop(); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index ba8a2f0d1f..cba7902a9a 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -103,6 +103,7 @@ import com.linkedin.davinci.store.rocksdb.RocksDBServerConfig; import com.linkedin.davinci.transformer.TestAvroRecordTransformer; import com.linkedin.davinci.transformer.TestStringRecordTransformer; +import com.linkedin.davinci.utils.ChunkAssembler; import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.exceptions.MemoryLimitExhaustedException; import com.linkedin.venice.exceptions.VeniceException; @@ -4904,6 +4905,115 @@ public void testProcessConsumerActionsError() throws Exception { }, AA_OFF); } + @Test + public void testShouldProcessRecordForDivMessage() throws Exception { + // Set up the environment. + StoreIngestionTaskFactory.Builder builder = mock(StoreIngestionTaskFactory.Builder.class); + StorageEngineRepository mockStorageEngineRepository = mock(StorageEngineRepository.class); + doReturn(new DeepCopyStorageEngine(mockAbstractStorageEngine)).when(mockStorageEngineRepository) + .getLocalStorageEngine(anyString()); + doReturn(mockStorageEngineRepository).when(builder).getStorageEngineRepository(); + VeniceServerConfig veniceServerConfig = mock(VeniceServerConfig.class); + doReturn(VeniceProperties.empty()).when(veniceServerConfig).getClusterProperties(); + doReturn(VeniceProperties.empty()).when(veniceServerConfig).getKafkaConsumerConfigsForLocalConsumption(); + doReturn(VeniceProperties.empty()).when(veniceServerConfig).getKafkaConsumerConfigsForRemoteConsumption(); + doReturn(Object2IntMaps.emptyMap()).when(veniceServerConfig).getKafkaClusterUrlToIdMap(); + doReturn(veniceServerConfig).when(builder).getServerConfig(); + doReturn(mock(ReadOnlyStoreRepository.class)).when(builder).getMetadataRepo(); + doReturn(mock(ReadOnlySchemaRepository.class)).when(builder).getSchemaRepo(); + doReturn(mock(AggKafkaConsumerService.class)).when(builder).getAggKafkaConsumerService(); + doReturn(mockAggStoreIngestionStats).when(builder).getIngestionStats(); + doReturn(pubSubTopicRepository).when(builder).getPubSubTopicRepository(); + + Version version = mock(Version.class); + doReturn(1).when(version).getPartitionCount(); + doReturn(null).when(version).getPartitionerConfig(); + doReturn(VersionStatus.ONLINE).when(version).getStatus(); + doReturn(true).when(version).isNativeReplicationEnabled(); + doReturn("localhost").when(version).getPushStreamSourceAddress(); + + Store store = mock(Store.class); + doReturn(version).when(store).getVersion(eq(1)); + + String versionTopicName = "testStore_v1"; + String rtTopicName = "testStore_rt"; + VeniceStoreVersionConfig storeConfig = mock(VeniceStoreVersionConfig.class); + doReturn(Version.parseStoreFromVersionTopic(versionTopicName)).when(store).getName(); + doReturn(versionTopicName).when(storeConfig).getStoreVersionName(); + + LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask = spy( + new LeaderFollowerStoreIngestionTask( + builder, + store, + version, + mock(Properties.class), + mock(BooleanSupplier.class), + storeConfig, + -1, + false, + Optional.empty(), + null)); + + // Create a DIV record. + KafkaKey key = new KafkaKey(MessageType.CONTROL_MESSAGE_DIV, "test_key".getBytes()); + KafkaMessageEnvelope value = new KafkaMessageEnvelope(); + Put put = new Put(); + value.payloadUnion = put; + value.messageType = MessageType.PUT.getValue(); + PubSubTopic verstionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic("testStore", 1)); + PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic("testStore")); + + PubSubTopicPartition versionTopicPartition = new PubSubTopicPartitionImpl(verstionTopic, PARTITION_FOO); + PubSubTopicPartition rtPartition = new PubSubTopicPartitionImpl(rtTopic, PARTITION_FOO); + PubSubMessage remoteVTRecord = + new ImmutablePubSubMessage<>(key, value, versionTopicPartition, 0, 0, 0); + + PartitionConsumptionState pcsFoo = mock(PartitionConsumptionState.class); + when(pcsFoo.getLeaderFollowerState()).thenReturn(LeaderFollowerStateType.LEADER); + doReturn(true).when(pcsFoo).consumeRemotely(); + doReturn(false).when(pcsFoo).skipKafkaMessage(); + + OffsetRecord offsetRecord = mock(OffsetRecord.class); + doReturn(offsetRecord).when(pcsFoo).getOffsetRecord(); + doReturn(pubSubTopicRepository.getTopic(versionTopicName)).when(offsetRecord).getLeaderTopic(any()); + + // 1. Verify LeaderFollowerStoreIngestionTask.shouldProcessRecord() for consuming DIV records from remote VT topic. + leaderFollowerStoreIngestionTask.setPartitionConsumptionState(PARTITION_FOO, pcsFoo); + // remotely consume a VT topic and get a DIV record, should not process the record. + Assert.assertFalse(leaderFollowerStoreIngestionTask.shouldProcessRecord(remoteVTRecord)); + + // 2. Verify StoreIngestionTask.shouldProcessRecord() for consuming DIV records from local RT topic. + PubSubMessage rtRecord = + new ImmutablePubSubMessage<>(key, value, rtPartition, 0, 0, 0); + // consume a RT topic and get a DIV record, should process the record. + doReturn(false).when(pcsFoo).consumeRemotely(); + doReturn(pubSubTopicRepository.getTopic(rtTopicName)).when(offsetRecord).getLeaderTopic(any()); + Assert.assertFalse(leaderFollowerStoreIngestionTask.shouldProcessRecord(rtRecord)); + } + + @Test + public void testDivProcessing() throws Exception { + ChunkAssembler divChunkAssembler = mock(ChunkAssembler.class); + + runTest(Collections.singleton(PARTITION_FOO), () -> { + storeIngestionTaskUnderTest.setDivChunkAssembler(divChunkAssembler); + + // Arrange + KafkaKey key = new KafkaKey(MessageType.CONTROL_MESSAGE_DIV, "test_key".getBytes()); + KafkaMessageEnvelope value = new KafkaMessageEnvelope(); + Put put = new Put(); + value.payloadUnion = put; + value.messageType = MessageType.PUT.getValue(); + PubSubMessage record = + new ImmutablePubSubMessage<>(key, value, new PubSubTopicPartitionImpl(pubSubTopic, PARTITION_FOO), 0, 0, 0); + // Act + storeIngestionTaskUnderTest.processDivControlMessage(record); + // Assert + verify(divChunkAssembler) + .bufferAndAssembleRecord(any(), anyInt(), any(), any(), anyLong(), any(), anyInt(), any()); + }, AA_OFF); + } + private VeniceStoreVersionConfig getDefaultMockVeniceStoreVersionConfig( Consumer storeVersionConfigOverride) { // mock the store config diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java index 68e99e5346..267a3ad7c1 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java @@ -21,7 +21,8 @@ */ public enum MessageType implements VeniceEnumValue { PUT(0, Constants.PUT_KEY_HEADER_BYTE), DELETE(1, Constants.PUT_KEY_HEADER_BYTE), - CONTROL_MESSAGE(2, Constants.CONTROL_MESSAGE_KEY_HEADER_BYTE), UPDATE(3, Constants.UPDATE_KEY_HEADER_BYTE); + CONTROL_MESSAGE(2, Constants.CONTROL_MESSAGE_KEY_HEADER_BYTE), UPDATE(3, Constants.UPDATE_KEY_HEADER_BYTE), + CONTROL_MESSAGE_DIV(4, Constants.DIV_KEY_HEADER_BYTE); private static final List TYPES = EnumUtils.getEnumValuesList(MessageType.class); @@ -68,6 +69,7 @@ public Object getNewInstance() { case CONTROL_MESSAGE: return new ControlMessage(); case UPDATE: + case CONTROL_MESSAGE_DIV: return new Update(); default: throw new VeniceException("Unsupported " + getClass().getSimpleName() + " value: " + value); @@ -86,5 +88,6 @@ public static class Constants { public static final byte PUT_KEY_HEADER_BYTE = 0; public static final byte CONTROL_MESSAGE_KEY_HEADER_BYTE = 2; public static final byte UPDATE_KEY_HEADER_BYTE = 4; + public static final byte DIV_KEY_HEADER_BYTE = 8; } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java b/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java index 9723e72a0b..35c817f305 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java @@ -62,6 +62,13 @@ public boolean isControlMessage() { return keyHeaderByte == MessageType.CONTROL_MESSAGE.getKeyHeaderByte(); } + /** + * @return true if this key corresponds to a DIV control message, and false otherwise. + */ + public boolean isDivControlMessage() { + return keyHeaderByte == MessageType.CONTROL_MESSAGE_DIV.getKeyHeaderByte(); + } + /** * @return the content of the key (everything beyond the first byte) */ diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java index 88c539da53..a2050dbf90 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java @@ -96,6 +96,7 @@ private KafkaMessageEnvelope getEnvelope(byte keyHeaderByte) { return putEnvelopePool.get(); // No need to pool control messages since there are so few of them, and they are varied anyway, limiting reuse. case MessageType.Constants.CONTROL_MESSAGE_KEY_HEADER_BYTE: + case MessageType.Constants.DIV_KEY_HEADER_BYTE: return new KafkaMessageEnvelope(); case MessageType.Constants.UPDATE_KEY_HEADER_BYTE: return updateEnvelopePool.get(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java index 6719bbd442..31dd3d7230 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java @@ -12,6 +12,7 @@ import com.linkedin.venice.ingestion.protocol.ProcessShutdownCommand; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.Put; +import com.linkedin.venice.kafka.protocol.state.GlobalDivState; import com.linkedin.venice.kafka.protocol.state.PartitionState; import com.linkedin.venice.kafka.protocol.state.StoreVersionState; import com.linkedin.venice.meta.Store; @@ -174,7 +175,9 @@ public enum AvroProtocolDefinition { * Value schema for change capture event. * TODO: Figure out a way to pull in protocol from different view class. */ - RECORD_CHANGE_EVENT(1, RecordChangeEvent.class); + RECORD_CHANGE_EVENT(1, RecordChangeEvent.class), + + GLOBAL_DIV_STATE(-15, GlobalDivState.class); private static final Set magicByteSet = validateMagicBytes(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index 28d28ffc4d..96d5ff5d26 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -5,6 +5,7 @@ import static com.linkedin.venice.message.KafkaKey.CONTROL_MESSAGE_KAFKA_KEY_LENGTH; import static com.linkedin.venice.pubsub.api.PubSubMessageHeaders.VENICE_LEADER_COMPLETION_STATE_HEADER; import static com.linkedin.venice.pubsub.api.PubSubMessageHeaders.VENICE_TRANSPORT_PROTOCOL_HEADER; +import static com.linkedin.venice.serialization.avro.AvroProtocolDefinition.GLOBAL_DIV_STATE; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.venice.annotation.Threadsafe; @@ -939,17 +940,7 @@ public CompletableFuture put( KafkaKey kafkaKey = new KafkaKey(MessageType.PUT, serializedKey); // Initialize the SpecificRecord instances used by the Avro-based Kafka protocol - Put putPayload = new Put(); - putPayload.putValue = ByteBuffer.wrap(serializedValue); - putPayload.schemaId = valueSchemaId; - - if (putMetadata == null) { - putPayload.replicationMetadataVersionId = VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID; - putPayload.replicationMetadataPayload = EMPTY_BYTE_BUFFER; - } else { - putPayload.replicationMetadataVersionId = putMetadata.getRmdVersionId(); - putPayload.replicationMetadataPayload = putMetadata.getRmdPayload(); - } + Put putPayload = buildPutPayload(serializedValue, valueSchemaId, putMetadata); CompletableFuture produceResultFuture = sendMessage( producerMetadata -> kafkaKey, MessageType.PUT, @@ -972,6 +963,141 @@ public CompletableFuture put( return produceResultFuture; } + /** + * This is the main method to send DIV messages to a kafka topic through VeniceWriter. The method decides whether to + * send the messages in chunked or non-chunked mode based on the size of the message. Today, DIV is the only user of + * this method, but it can be extended easily to support other class types in the future. + * + * All the messages sent through this method are of type {@link MessageType#CONTROL_MESSAGE_DIV} in its KafkaKey and + * all their corresponding {@link KafkaMessageEnvelope} uses {@link Put} as the payload. Inside the Put payload, the + * actual message is stored in the putValue field and the schema id has 3 cases: + * + * 1. If the message is non-chunked, the schema id is set to {@link AvroProtocolDefinition#GLOBAL_DIV_STATE}. + * 2. If the message is chunk message, the schema id is set to {@link AvroProtocolDefinition#CHUNK}. + * 3. If the message is a chunk manifest message, the schema id is set to {@link AvroProtocolDefinition#CHUNKED_VALUE_MANIFEST}. + */ + public CompletableFuture sendChunkSupportedDivMessage(int partition, K key, V value) { + if (partition < 0 || partition >= numberOfPartitions) { + throw new VeniceException("Invalid partition: " + partition); + } + + byte[] serializedKey = keySerializer.serialize(topicName, key); + byte[] serializedValue = valueSerializer.serialize(topicName, value); + int totalRecordSize = calculateTotalRecordSize(serializedKey, serializedValue, null); + + if (isChunkingNeededForRecord(totalRecordSize)) { + return sendDivMessageChunked( + partition, + serializedKey, + serializedValue, + GLOBAL_DIV_STATE.getCurrentProtocolVersion(), + null); + } else { + return sendDivMessageNonChunked( + partition, + serializedKey, + serializedValue, + GLOBAL_DIV_STATE.getCurrentProtocolVersion(), + null); + } + } + + private CompletableFuture sendDivMessageChunked( + int partition, + byte[] serializedKey, + byte[] serializedValue, + int valueSchemaId, + PutMetadata putMetadata) { + int replicationMetadataPayloadSize = putMetadata == null ? 0 : putMetadata.getSerializedSize(); + final Supplier reportSizeGenerator = + () -> getSizeReport(serializedKey.length, serializedValue.length, replicationMetadataPayloadSize); + // TODO: this needs to be changed later to adapt to div purpose. + final CompletableFuture completableFuture = new CompletableFuture<>(); + PubSubProducerCallback callback = new ErrorPropagationCallback(new CompletableFutureCallback(completableFuture)); + BiConsumer sendMessageFunction = (keyProvider, putPayload) -> sendMessage( + keyProvider, + MessageType.PUT, + putPayload, + partition, + callback, + DEFAULT_LEADER_METADATA_WRAPPER, + VENICE_DEFAULT_LOGICAL_TS); + + ChunkedPayloadAndManifest valueChunksAndManifest = WriterChunkingHelper.chunkPayloadAndSend( + serializedKey, + serializedValue, + MessageType.CONTROL_MESSAGE_DIV, + true, + valueSchemaId, + 0, + false, + reportSizeGenerator, + maxSizeForUserPayloadPerMessageInBytes, + keyWithChunkingSuffixSerializer, + sendMessageFunction); + + final int sizeAvailablePerMessage = maxSizeForUserPayloadPerMessageInBytes - serializedKey.length; + Put putManifestsPayload = + buildManifestPayload(null, putMetadata, valueChunksAndManifest, sizeAvailablePerMessage, reportSizeGenerator); + return sendManifestMessage( + putManifestsPayload, + serializedKey, + MessageType.CONTROL_MESSAGE_DIV, + valueChunksAndManifest, + callback, + null, + partition, + null, + null, + DEFAULT_LEADER_METADATA_WRAPPER, + APP_DEFAULT_LOGICAL_TS); + } + + private CompletableFuture sendDivMessageNonChunked( + int partition, + byte[] serializedKey, + byte[] serializedValue, + int valueSchemaId, + PutMetadata putMetadata) { + serializedKey = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(serializedKey); + KafkaKey divKey = new KafkaKey(MessageType.CONTROL_MESSAGE_DIV, serializedKey); + + // Initialize the SpecificRecord instances used by the Avro-based Kafka protocol + Put putPayload = buildPutPayload(serializedValue, valueSchemaId, putMetadata); + + // TODO: this needs to be changed later to adapt to div purpose. + final CompletableFuture completableFuture = new CompletableFuture<>(); + PubSubProducerCallback callback = new CompletableFutureCallback(completableFuture); + + return sendMessage( + producerMetadata -> divKey, + MessageType.PUT, + putPayload, + partition, + callback, + DEFAULT_LEADER_METADATA_WRAPPER, + APP_DEFAULT_LOGICAL_TS); + } + + private Put buildPutPayload(byte[] serializedValue, int valueSchemaId, PutMetadata putMetadata) { + Put putPayload = new Put(); + putPayload.putValue = ByteBuffer.wrap(serializedValue); + putPayload.schemaId = valueSchemaId; + + if (putMetadata == null) { + putPayload.replicationMetadataVersionId = VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID; + putPayload.replicationMetadataPayload = EMPTY_BYTE_BUFFER; + } else { + putPayload.replicationMetadataVersionId = putMetadata.getRmdVersionId(); + putPayload.replicationMetadataPayload = putMetadata.getRmdPayload(); + } + return putPayload; + } + + private int calculateTotalRecordSize(byte[] serializedKey, byte[] serializedValue, PutMetadata putMetadata) { + return serializedKey.length + serializedValue.length + (putMetadata == null ? 0 : putMetadata.getSerializedSize()); + } + /** * Write a message with the kafka message envelope (KME) passed in. This allows users re-using existing KME to * speed up the performance. If this is called, VeniceWriter will also reuse the existing DIV data (producer @@ -1516,31 +1642,58 @@ private CompletableFuture putLargeValue( keyWithChunkingSuffixSerializer, sendMessageFunction) : EMPTY_CHUNKED_PAYLOAD_AND_MANIFEST; + + final int sizeAvailablePerMessage = maxSizeForUserPayloadPerMessageInBytes - serializedKey.length; + Put putManifestsPayload = buildManifestPayload( + rmdChunksAndManifest, + putMetadata, + valueChunksAndManifest, + sizeAvailablePerMessage, + reportSizeGenerator); + CompletableFuture manifestProduceFuture = sendManifestMessage( + putManifestsPayload, + serializedKey, + MessageType.PUT, + valueChunksAndManifest, + callback, + rmdChunksAndManifest, + partition, + oldValueManifest, + oldRmdManifest, + leaderMetadataWrapper, + logicalTs); + + DeleteMetadata deleteMetadata = new DeleteMetadata( + valueSchemaId, + putManifestsPayload.replicationMetadataVersionId, + VeniceWriter.EMPTY_BYTE_BUFFER); + deleteDeprecatedChunksFromManifest( + oldValueManifest, + partition, + chunkCallback, + leaderMetadataWrapper, + deleteMetadata); + deleteDeprecatedChunksFromManifest(oldRmdManifest, partition, chunkCallback, leaderMetadataWrapper, deleteMetadata); + + return manifestProduceFuture; + } + + private CompletableFuture sendManifestMessage( + Put putManifestsPayload, + byte[] serializedKey, + MessageType keyType, + ChunkedPayloadAndManifest valueChunksAndManifest, + PubSubProducerCallback callback, + ChunkedPayloadAndManifest rmdChunksAndManifest, + int partition, + ChunkedValueManifest oldValueManifest, + ChunkedValueManifest oldRmdManifest, + LeaderMetadataWrapper leaderMetadataWrapper, + long logicalTs) { // Now that we've sent all the chunks, we can take care of the final value, the manifest. byte[] topLevelKey = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(serializedKey); - KeyProvider manifestKeyProvider = producerMetadata -> new KafkaKey(MessageType.PUT, topLevelKey); + KeyProvider manifestKeyProvider = producerMetadata -> new KafkaKey(keyType, topLevelKey); - Put putManifestsPayload = new Put(); - putManifestsPayload.putValue = - chunkedValueManifestSerializer.serialize(valueChunksAndManifest.getChunkedValueManifest()); - putManifestsPayload.schemaId = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(); - if (putMetadata == null) { - putManifestsPayload.replicationMetadataVersionId = VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID; - putManifestsPayload.replicationMetadataPayload = EMPTY_BYTE_BUFFER; - } else { - putManifestsPayload.replicationMetadataVersionId = putMetadata.getRmdVersionId(); - putManifestsPayload.replicationMetadataPayload = isRmdChunkingEnabled - ? chunkedValueManifestSerializer.serialize(rmdChunksAndManifest.getChunkedValueManifest()) - : putMetadata.getRmdPayload(); - } - final int sizeAvailablePerMessage = maxSizeForUserPayloadPerMessageInBytes - serializedKey.length; - if (putManifestsPayload.putValue.remaining() - + putManifestsPayload.replicationMetadataPayload.remaining() > sizeAvailablePerMessage) { - // This is a very desperate edge case... - throw new VeniceException( - "This message cannot be chunked, because even its manifest is too big to go through. " - + "Please reconsider your life choices. " + reportSizeGenerator.get()); - } if (callback instanceof ChunkAwareCallback) { /** We leave a handle to the key, chunks and manifests so that the {@link ChunkAwareCallback} can act on them */ ((ChunkAwareCallback) callback).setChunkingInfo( @@ -1555,7 +1708,7 @@ private CompletableFuture putLargeValue( // We only return the last future (the one for the manifest) and assume that once this one is finished, // all the chunks should also be finished, since they were sent first, and ordering should be guaranteed. - CompletableFuture manifestProduceFuture = sendMessage( + return sendMessage( manifestKeyProvider, MessageType.PUT, putManifestsPayload, @@ -1563,20 +1716,35 @@ private CompletableFuture putLargeValue( callback, leaderMetadataWrapper, logicalTs); + } - DeleteMetadata deleteMetadata = new DeleteMetadata( - valueSchemaId, - putManifestsPayload.replicationMetadataVersionId, - VeniceWriter.EMPTY_BYTE_BUFFER); - deleteDeprecatedChunksFromManifest( - oldValueManifest, - partition, - chunkCallback, - leaderMetadataWrapper, - deleteMetadata); - deleteDeprecatedChunksFromManifest(oldRmdManifest, partition, chunkCallback, leaderMetadataWrapper, deleteMetadata); - - return manifestProduceFuture; + private Put buildManifestPayload( + ChunkedPayloadAndManifest rmdChunksAndManifest, + PutMetadata putMetadata, + ChunkedPayloadAndManifest valueChunksAndManifest, + int sizeAvailablePerMessage, + Supplier reportSizeGenerator) { + Put putManifestsPayload = new Put(); + putManifestsPayload.putValue = + chunkedValueManifestSerializer.serialize(valueChunksAndManifest.getChunkedValueManifest()); + putManifestsPayload.schemaId = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(); + if (putMetadata == null) { + putManifestsPayload.replicationMetadataVersionId = VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID; + putManifestsPayload.replicationMetadataPayload = EMPTY_BYTE_BUFFER; + } else { + putManifestsPayload.replicationMetadataVersionId = putMetadata.getRmdVersionId(); + putManifestsPayload.replicationMetadataPayload = isRmdChunkingEnabled + ? chunkedValueManifestSerializer.serialize(rmdChunksAndManifest.getChunkedValueManifest()) + : putMetadata.getRmdPayload(); + } + if (putManifestsPayload.putValue.remaining() + + putManifestsPayload.replicationMetadataPayload.remaining() > sizeAvailablePerMessage) { + // This is a very desperate edge case... + throw new VeniceException( + "This message cannot be chunked, because even its manifest is too big to go through. " + + "Please reconsider your life choices. " + reportSizeGenerator.get()); + } + return putManifestsPayload; } /** diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java index 240043521e..8a6472f691 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java @@ -16,12 +16,15 @@ import java.util.ArrayList; import java.util.function.BiConsumer; import java.util.function.Supplier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * This class is a helper class that contains writer side chunking logics. */ public class WriterChunkingHelper { + private static final Logger LOGGER = LogManager.getLogger(WriterChunkingHelper.class); public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0); /** @@ -47,6 +50,32 @@ public static ChunkedPayloadAndManifest chunkPayloadAndSend( int maxSizeForUserPayloadPerMessageInBytes, KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer, BiConsumer sendMessageFunction) { + return chunkPayloadAndSend( + serializedKey, + payload, + MessageType.PUT, + isValuePayload, + schemaId, + chunkedKeySuffixStartingIndex, + isChunkAwareCallback, + sizeReport, + maxSizeForUserPayloadPerMessageInBytes, + keyWithChunkingSuffixSerializer, + sendMessageFunction); + } + + public static ChunkedPayloadAndManifest chunkPayloadAndSend( + byte[] serializedKey, + byte[] payload, + MessageType keyType, + boolean isValuePayload, + int schemaId, + int chunkedKeySuffixStartingIndex, + boolean isChunkAwareCallback, + Supplier sizeReport, + int maxSizeForUserPayloadPerMessageInBytes, + KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer, + BiConsumer sendMessageFunction) { int sizeAvailablePerMessage = maxSizeForUserPayloadPerMessageInBytes - serializedKey.length; validateAvailableSizePerMessage(maxSizeForUserPayloadPerMessageInBytes, sizeAvailablePerMessage, sizeReport); int numberOfChunks = (int) Math.ceil((double) payload.length / (double) sizeAvailablePerMessage); @@ -75,7 +104,7 @@ public static ChunkedPayloadAndManifest chunkPayloadAndSend( subsequentKeyProvider = producerMetadata -> { ByteBuffer keyWithSuffix = keyWithChunkingSuffixSerializer.serializeChunkedKey(serializedKey, chunkedKeySuffix); chunkedValueManifest.keysWithChunkIdSuffix.add(keyWithSuffix); - return new KafkaKey(MessageType.PUT, keyWithSuffix.array()); + return new KafkaKey(keyType, keyWithSuffix.array()); }; firstKeyProvider = producerMetadata -> { chunkedKeySuffix.chunkId.producerGUID = producerMetadata.producerGUID; @@ -83,6 +112,7 @@ public static ChunkedPayloadAndManifest chunkPayloadAndSend( chunkedKeySuffix.chunkId.messageSequenceNumber = producerMetadata.messageSequenceNumber; return subsequentKeyProvider.getKey(producerMetadata); }; + for (int chunkIndex = 0; chunkIndex < numberOfChunks; chunkIndex++) { int chunkStartByteIndex = chunkIndex * sizeAvailablePerMessage; int chunkEndByteIndex = Math.min((chunkIndex + 1) * sizeAvailablePerMessage, payload.length); diff --git a/internal/venice-common/src/main/resources/avro/GlobalDivState/v-15/GlobalDivState.avsc b/internal/venice-common/src/main/resources/avro/GlobalDivState/v-15/GlobalDivState.avsc new file mode 100644 index 0000000000..7677bfa326 --- /dev/null +++ b/internal/venice-common/src/main/resources/avro/GlobalDivState/v-15/GlobalDivState.avsc @@ -0,0 +1,85 @@ +{ + "name": "GlobalDivState", + "namespace": "com.linkedin.venice.kafka.protocol.state", + "doc": "", + "type": "record", + "fields": [ + { + "name": "srcUrl", + "doc": "Upstream Kafka bootstrap server url.", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "producerStates", + "type": { + "type": "map", + "doc": "A map that maps producer GUID -> producer state for real-time data.", + "values": { + "name": "ProducerPartitionState", + "namespace": "com.linkedin.venice.kafka.protocol.state", + "doc": "A record containing the state pertaining to the data sent by one upstream producer into one partition.", + "type": "record", + "fields": [ + { + "name": "segmentNumber", + "doc": "The current segment number corresponds to the last (highest) segment number for which we have seen a StartOfSegment control message.", + "type": "int" + }, + { + "name": "segmentStatus", + "doc": "The status of the current segment: 0 => NOT_STARTED, 1 => IN_PROGRESS, 2 => END_OF_INTERMEDIATE_SEGMENT, 3 => END_OF_FINAL_SEGMENT.", + "type": "int" + }, + { + "name": "isRegistered", + "doc": "Whether the segment is registered. i.e. received Start_Of_Segment to initialize the segment.", + "type": "boolean", + "default": false + }, + { + "name": "messageSequenceNumber", + "doc": "The current message sequence number, within the current segment, which we have seen for this partition/producer pair.", + "type": "int" + }, + { + "name": "messageTimestamp", + "doc": "The timestamp included in the last message we have seen for this partition/producer pair.", + "type": "long" + }, + { + "name": "checksumType", + "doc": "The current mapping is the following: 0 => None, 1 => MD5, 2 => Adler32, 3 => CRC32.", + "type": "int" + }, + { + "name": "checksumState", + "doc": "The value of the checksum computed since the last StartOfSegment ControlMessage.", + "type": "bytes" + }, + { + "name": "aggregates", + "doc": "The aggregates that have been computed so far since the last StartOfSegment ControlMessage.", + "type": { + "type": "map", + "values": "long" + } + }, + { + "name": "debugInfo", + "doc": "The debug info received as part of the last StartOfSegment ControlMessage.", + "type": { + "type": "map", + "values": "string" + } + } + ] + } + }, + "default": {} + } + ] +} \ No newline at end of file diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/kafka/protocol/enums/MessageTypeTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/kafka/protocol/enums/MessageTypeTest.java index 2477b2ef83..af0c721720 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/kafka/protocol/enums/MessageTypeTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/kafka/protocol/enums/MessageTypeTest.java @@ -17,6 +17,7 @@ protected Map expectedMapping() { .put(1, MessageType.DELETE) .put(2, MessageType.CONTROL_MESSAGE) .put(3, MessageType.UPDATE) + .put(4, MessageType.CONTROL_MESSAGE_DIV) .build(); } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java index f3a2d5ca07..9820607b8a 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java @@ -666,4 +666,64 @@ public void testPutTooLargeRecord(boolean isChunkingEnabled) { } } } + + @Test + public void testGlobalDivChunking() { + final int maxRecordSizeBytes = BYTES_PER_MB; + CompletableFuture mockedFuture = mock(CompletableFuture.class); + PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class); + when(mockedProducer.sendMessage(any(), any(), any(), any(), any(), any())).thenReturn(mockedFuture); + final VeniceKafkaSerializer serializer = new VeniceAvroKafkaSerializer(TestWriteUtils.STRING_SCHEMA); + final VeniceWriterOptions options = new VeniceWriterOptions.Builder("testTopic").setPartitionCount(1) + .setKeySerializer(serializer) + .setValueSerializer(serializer) + .setMaxRecordSizeBytes(maxRecordSizeBytes) + .build(); + VeniceProperties props = VeniceProperties.empty(); + final VeniceWriter writer = new VeniceWriter<>(options, props, mockedProducer); + + final int SMALL_VALUE_SIZE = maxRecordSizeBytes / 2; + final int TOO_LARGE_VALUE_SIZE = maxRecordSizeBytes * 2; + for (int size: Arrays.asList(SMALL_VALUE_SIZE, TOO_LARGE_VALUE_SIZE)) { + char[] valueChars = new char[size]; + Arrays.fill(valueChars, '*'); + try { + writer.sendChunkSupportedDivMessage(0, "test-key", new String(valueChars)); + } catch (Exception e) { + Assert.fail("Shouldn't have thrown any exception"); + } + + ArgumentCaptor kmeArgumentCaptor = ArgumentCaptor.forClass(KafkaMessageEnvelope.class); + ArgumentCaptor kafkaKeyArgumentCaptor = ArgumentCaptor.forClass(KafkaKey.class); + + if (size == SMALL_VALUE_SIZE) { + // 1 SOS, 1 DivControlMessage + verify(mockedProducer, times(2)) + .sendMessage(any(), any(), kafkaKeyArgumentCaptor.capture(), kmeArgumentCaptor.capture(), any(), any()); + } else { // TOO_LARGE_VALUE_SIZE + // 1 SOS, 4 DivChunk, 1 DivManifest + verify(mockedProducer, times(6)) + .sendMessage(any(), any(), kafkaKeyArgumentCaptor.capture(), kmeArgumentCaptor.capture(), any(), any()); + } + + for (KafkaKey key: kafkaKeyArgumentCaptor.getAllValues()) { + Assert.assertTrue(key.isDivControlMessage() || key.isControlMessage()); + } + + for (KafkaMessageEnvelope kme: kmeArgumentCaptor.getAllValues()) { + if (kme.messageType == MessageType.CONTROL_MESSAGE.getValue()) { + Assert.assertTrue( + ((ControlMessage) kme.getPayloadUnion()).getControlMessageType() == ControlMessageType.START_OF_SEGMENT + .getValue()); + } else { + Assert.assertTrue(kme.messageType == MessageType.PUT.getValue()); + Put put = (Put) kme.payloadUnion; + Assert.assertTrue( + put.getSchemaId() == AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion() + || put.getSchemaId() == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion() + || put.getSchemaId() == AvroProtocolDefinition.GLOBAL_DIV_STATE.getCurrentProtocolVersion()); + } + } + } + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestGlobalDiv.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestGlobalDiv.java new file mode 100644 index 0000000000..12c79a7027 --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestGlobalDiv.java @@ -0,0 +1,197 @@ +package com.linkedin.venice.endToEnd; + +import static com.linkedin.venice.ConfigKeys.*; +import static com.linkedin.venice.integration.utils.VeniceClusterWrapper.DEFAULT_KEY_SCHEMA; +import static com.linkedin.venice.integration.utils.VeniceClusterWrapper.DEFAULT_VALUE_SCHEMA; +import static com.linkedin.venice.utils.Utils.*; + +import com.linkedin.davinci.kafka.consumer.KafkaConsumerService; +import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.guid.GuidUtils; +import com.linkedin.venice.integration.utils.PubSubBrokerWrapper; +import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceClusterWrapper; +import com.linkedin.venice.kafka.protocol.GUID; +import com.linkedin.venice.kafka.protocol.state.GlobalDivState; +import com.linkedin.venice.kafka.protocol.state.ProducerPartitionState; +import com.linkedin.venice.kafka.validation.SegmentStatus; +import com.linkedin.venice.kafka.validation.checksum.CheckSumType; +import com.linkedin.venice.meta.PersistenceType; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer; +import com.linkedin.venice.utils.ByteUtils; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.Time; +import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.writer.VeniceWriter; +import com.linkedin.venice.writer.VeniceWriterOptions; +import java.nio.ByteBuffer; +import java.security.SecureRandom; +import java.util.AbstractMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.stream.IntStream; +import org.apache.avro.util.Utf8; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestGlobalDiv { + private static final Logger LOGGER = LogManager.getLogger(TestGlobalDiv.class); + + private VeniceClusterWrapper sharedVenice; + + SecureRandom random = new SecureRandom(); + + @BeforeClass + public void setUp() { + Properties extraProperties = new Properties(); + extraProperties.setProperty(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB.name()); + extraProperties.setProperty(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1L)); + + // N.B.: RF 2 with 3 servers is important, in order to test both the leader and follower code paths + sharedVenice = ServiceFactory.getVeniceCluster(1, 0, 0, 2, 1000000, false, false, extraProperties); + + Properties routerProperties = new Properties(); + + sharedVenice.addVeniceRouter(routerProperties); + // Added a server with shared consumer enabled. + Properties serverPropertiesWithSharedConsumer = new Properties(); + serverPropertiesWithSharedConsumer.setProperty(SSL_TO_KAFKA_LEGACY, "false"); + extraProperties.setProperty(SERVER_CONSUMER_POOL_SIZE_PER_KAFKA_CLUSTER, "3"); + extraProperties.setProperty(DEFAULT_MAX_NUMBER_OF_PARTITIONS, "4"); + extraProperties.setProperty( + SERVER_SHARED_CONSUMER_ASSIGNMENT_STRATEGY, + KafkaConsumerService.ConsumerAssignmentStrategy.PARTITION_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY.name()); + // Enable global div feature in the integration test. + extraProperties.setProperty(SERVER_GLOBAL_RT_DIV_ENABLED, "true"); + + sharedVenice.addVeniceServer(serverPropertiesWithSharedConsumer, extraProperties); + sharedVenice.addVeniceServer(serverPropertiesWithSharedConsumer, extraProperties); + sharedVenice.addVeniceServer(serverPropertiesWithSharedConsumer, extraProperties); + LOGGER.info("Finished creating VeniceClusterWrapper"); + } + + @AfterClass + public void cleanUp() { + Utils.closeQuietlyWithErrorLogged(sharedVenice); + } + + /** + * This test verifies functionality of sending chunked/non-chunked div messages: + * + * 1. Create a hybrid store and create a store version. + * 2. Send a non-chunked div message to the version topic. + * 3. Send a chunked div message to the version topic. + * 4. Verify the messages are sent successfully. + * 5. TODO: Add more verification steps on the server side later. + */ + @Test(timeOut = 180 * Time.MS_PER_SECOND) + public void testChunkedDiv() { + String storeName = Utils.getUniqueString("store"); + final int partitionCount = 1; + final int keyCount = 10; + + UpdateStoreQueryParams params = new UpdateStoreQueryParams() + // set hybridRewindSecond to a big number so following versions won't ignore old records in RT + .setHybridRewindSeconds(2000000) + .setHybridOffsetLagThreshold(10) + .setPartitionCount(partitionCount); + + sharedVenice.useControllerClient(client -> { + client.createNewStore(storeName, "owner", DEFAULT_KEY_SCHEMA, DEFAULT_VALUE_SCHEMA); + client.updateStore(storeName, params); + }); + + // Create store version 1 by writing keyCount records. + sharedVenice.createVersion( + storeName, + DEFAULT_KEY_SCHEMA, + DEFAULT_VALUE_SCHEMA, + IntStream.range(0, keyCount).mapToObj(i -> new AbstractMap.SimpleEntry<>(i, i))); + + Properties veniceWriterProperties = new Properties(); + veniceWriterProperties.put(KAFKA_BOOTSTRAP_SERVERS, sharedVenice.getPubSubBrokerWrapper().getAddress()); + + // Set max segment elapsed time to 0 to enforce creating small segments aggressively + veniceWriterProperties.put(VeniceWriter.MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS, "0"); + veniceWriterProperties.putAll( + PubSubBrokerWrapper + .getBrokerDetailsForClients(Collections.singletonList(sharedVenice.getPubSubBrokerWrapper()))); + PubSubProducerAdapterFactory pubSubProducerAdapterFactory = + sharedVenice.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); + + try (VeniceWriter verstionTopicWriter = + TestUtils.getVeniceWriterFactory(veniceWriterProperties, pubSubProducerAdapterFactory) + .createVeniceWriter(new VeniceWriterOptions.Builder(Version.composeKafkaTopic(storeName, 1)).build())) { + + InternalAvroSpecificSerializer serializer = + AvroProtocolDefinition.GLOBAL_DIV_STATE.getSerializer(); + + GlobalDivState state = createGlobalDivState("localhost:9090", false); + verstionTopicWriter + .sendChunkSupportedDivMessage( + 0, + "NonChunkedKey".getBytes(), + ByteUtils.extractByteArray(serializer.serialize(state))) + .get(); + LOGGER.info("Sent non-chunked div message"); + + state = createGlobalDivState("localhost:9092", true); + verstionTopicWriter + .sendChunkSupportedDivMessage( + 0, + "ChunkedKey".getBytes(), + ByteUtils.extractByteArray(serializer.serialize(state))) + .get(); + LOGGER.info("Sent chunked div message"); + + // TODO: Add more verification steps later. + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private GlobalDivState createGlobalDivState(String srcUrl, boolean isChunked) { + GlobalDivState state = new GlobalDivState(); + state.producerStates = new HashMap<>(); + state.setSrcUrl(srcUrl); + + if (isChunked) { + // Create a large state with 20k entries. + for (int i = 0; i < 20000; i++) { + byte[] bytes = new byte[256]; + random.nextBytes(bytes); + GUID guid = new GUID(bytes); + state.producerStates.put(guidToUtf8(guid), createProducerPartitionState(i, i)); + } + } else { + state.producerStates = Collections.emptyMap(); + } + return state; + } + + private CharSequence guidToUtf8(GUID guid) { + return new Utf8(GuidUtils.getCharSequenceFromGuid(guid)); + } + + private ProducerPartitionState createProducerPartitionState(int segment, int sequence) { + ProducerPartitionState ppState = new ProducerPartitionState(); + ppState.segmentNumber = segment; + ppState.segmentStatus = SegmentStatus.IN_PROGRESS.getValue(); + ppState.messageSequenceNumber = sequence; + ppState.messageTimestamp = System.currentTimeMillis(); + ppState.checksumType = CheckSumType.NONE.getValue(); + ppState.checksumState = ByteBuffer.allocate(0); + ppState.aggregates = new HashMap<>(); + ppState.debugInfo = new HashMap<>(); + return ppState; + } +} From feac7f27f017a884995683fe3adc8417f09d21ae Mon Sep 17 00:00:00 2001 From: Lei Lu Date: Thu, 31 Oct 2024 21:53:17 -0700 Subject: [PATCH 2/2] addressed reviewer's comments --- .../davinci/kafka/consumer/StoreIngestionTask.java | 10 +++++----- .../kafka/consumer/StoreIngestionTaskFactory.java | 11 +++++++++++ .../kafka/consumer/StoreIngestionTaskTest.java | 13 +++++++------ 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index fbff0b875b..e3c77fe04f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -235,7 +235,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { * flushed to the metadata partition of the storage engine regularly in {@link #syncOffset(String, PartitionConsumptionState)} */ private final KafkaDataIntegrityValidator kafkaDataIntegrityValidator; - private ChunkAssembler divChunkAssembler; + private final ChunkAssembler divChunkAssembler; protected final HostLevelIngestionStats hostLevelIngestionStats; protected final AggVersionedDIVStats versionedDIVStats; @@ -445,7 +445,8 @@ public StoreIngestionTask( new IngestionNotificationDispatcher(notifiers, kafkaVersionTopic, isCurrentVersion); this.missingSOPCheckExecutor.execute(() -> waitForStateVersion(kafkaVersionTopic)); this.chunkAssembler = new ChunkAssembler(storeName); - this.divChunkAssembler = new ChunkAssembler(storeName); + this.divChunkAssembler = + builder.getDivChunkAssembler() != null ? builder.getDivChunkAssembler() : new ChunkAssembler(storeName); this.cacheBackend = cacheBackend; // Ensure getRecordTransformer does not return null @@ -4373,8 +4374,7 @@ void setVersionRole(PartitionReplicaIngestionContext.VersionRole versionRole) { this.versionRole = versionRole; } - // Only for testing purpose. - void setDivChunkAssembler(ChunkAssembler divChunkAssembler) { - this.divChunkAssembler = divChunkAssembler; + ChunkAssembler getDivChunkAssembler() { + return this.divChunkAssembler; } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java index 21b480af85..9c3208e05d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java @@ -13,6 +13,7 @@ import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; import com.linkedin.davinci.store.view.VeniceViewWriterFactory; +import com.linkedin.davinci.utils.ChunkAssembler; import com.linkedin.venice.kafka.protocol.state.PartitionState; import com.linkedin.venice.meta.ReadOnlySchemaRepository; import com.linkedin.venice.meta.ReadOnlyStoreRepository; @@ -119,6 +120,8 @@ public static class Builder { private Runnable runnableForKillIngestionTasksForNonCurrentVersions; private ExecutorService aaWCWorkLoadProcessingThreadPool; + private ChunkAssembler divChunkAssembler; + private interface Setter { void apply(); } @@ -328,5 +331,13 @@ public Builder setAAWCWorkLoadProcessingThreadPool(ExecutorService executorServi public ExecutorService getAAWCWorkLoadProcessingThreadPool() { return this.aaWCWorkLoadProcessingThreadPool; } + + public Builder setDivChunkAssembler(ChunkAssembler divChunkAssembler) { + return set(() -> this.divChunkAssembler = divChunkAssembler); + } + + public ChunkAssembler getDivChunkAssembler() { + return divChunkAssembler; + } } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 195869948d..342faa14ac 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -359,6 +359,8 @@ public static Object[][] sortedInputAndAAConfigProvider() { private HostLevelIngestionStats mockStoreIngestionStats; private AggVersionedDIVStats mockVersionedDIVStats; private AggVersionedIngestionStats mockVersionedStorageIngestionStats; + + private ChunkAssembler divChunkAssembler; private StoreIngestionTask storeIngestionTaskUnderTest; private ExecutorService taskPollingService; private StoreBufferService storeBufferService; @@ -539,6 +541,7 @@ public void methodSetUp() throws Exception { mockRemoteKafkaConsumer = mock(PubSubConsumerAdapter.class); kafkaUrlToRecordsThrottler = new HashMap<>(); kafkaClusterBasedRecordThrottler = new KafkaClusterBasedRecordThrottler(kafkaUrlToRecordsThrottler); + divChunkAssembler = mock(ChunkAssembler.class); mockTopicManager = mock(TopicManager.class); mockTopicManagerRepository = mock(TopicManagerRepository.class); @@ -1071,6 +1074,7 @@ private StoreIngestionTaskFactory.Builder getIngestionTaskFactoryBuilder( .setPubSubTopicRepository(pubSubTopicRepository) .setPartitionStateSerializer(partitionStateSerializer) .setRunnableForKillIngestionTasksForNonCurrentVersions(runnableForKillNonCurrentVersion) + .setDivChunkAssembler(divChunkAssembler) .setAAWCWorkLoadProcessingThreadPool( Executors.newFixedThreadPool(2, new DaemonThreadFactory("AA_WC_PARALLEL_PROCESSING"))); } @@ -4966,10 +4970,10 @@ public void testShouldProcessRecordForDivMessage() throws Exception { Put put = new Put(); value.payloadUnion = put; value.messageType = MessageType.PUT.getValue(); - PubSubTopic verstionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic("testStore", 1)); + PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic("testStore", 1)); PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic("testStore")); - PubSubTopicPartition versionTopicPartition = new PubSubTopicPartitionImpl(verstionTopic, PARTITION_FOO); + PubSubTopicPartition versionTopicPartition = new PubSubTopicPartitionImpl(versionTopic, PARTITION_FOO); PubSubTopicPartition rtPartition = new PubSubTopicPartitionImpl(rtTopic, PARTITION_FOO); PubSubMessage remoteVTRecord = new ImmutablePubSubMessage<>(key, value, versionTopicPartition, 0, 0, 0); @@ -4999,10 +5003,7 @@ public void testShouldProcessRecordForDivMessage() throws Exception { @Test public void testDivProcessing() throws Exception { - ChunkAssembler divChunkAssembler = mock(ChunkAssembler.class); - runTest(Collections.singleton(PARTITION_FOO), () -> { - storeIngestionTaskUnderTest.setDivChunkAssembler(divChunkAssembler); // Arrange KafkaKey key = new KafkaKey(MessageType.CONTROL_MESSAGE_DIV, "test_key".getBytes()); @@ -5015,7 +5016,7 @@ public void testDivProcessing() throws Exception { // Act storeIngestionTaskUnderTest.processDivControlMessage(record); // Assert - verify(divChunkAssembler) + verify(storeIngestionTaskUnderTest.getDivChunkAssembler()) .bufferAndAssembleRecord(any(), anyInt(), any(), any(), anyLong(), any(), anyInt(), any()); }, AA_OFF); }