diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index e83fa0bfeb..7d934e6e73 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -1895,6 +1895,20 @@ protected boolean shouldProcessRecord(PubSubMessage waitForStateVersion(kafkaVersionTopic)); this.chunkAssembler = new ChunkAssembler(storeName); + this.divChunkAssembler = + builder.getDivChunkAssembler() != null ? builder.getDivChunkAssembler() : new ChunkAssembler(storeName); this.cacheBackend = cacheBackend; if (recordTransformerFunction != null) { @@ -1083,6 +1089,13 @@ private int handleSingleMessage( record.getTopicPartition().getPartitionNumber(), partitionConsumptionStateMap.get(topicPartition.getPartitionNumber())); } + } else if (record.getKey().isDivControlMessage()) { + // This is a control message from the DIV topic, process it and return early. + // TODO: This is a placeholder for the actual implementation. + if (isGlobalRtDivEnabled) { + processDivControlMessage(record); + } + return 0; } // This function may modify the original record in KME and it is unsafe to use the payload from KME directly after @@ -1127,6 +1140,28 @@ private int handleSingleMessage( return record.getPayloadSize(); } + void processDivControlMessage(PubSubMessage record) { + KafkaKey key = record.getKey(); + KafkaMessageEnvelope value = record.getValue(); + Put put = (Put) value.getPayloadUnion(); + + Object assembledObject = divChunkAssembler.bufferAndAssembleRecord( + record.getTopicPartition(), + put.getSchemaId(), + key.getKey(), + put.getPutValue(), + record.getOffset(), + GLOBAL_DIV_STATE, + put.getSchemaId(), + new NoopCompressor()); + + // If the assembled object is null, it means that the object is not yet fully assembled, so we can return early. + if (assembledObject == null) { + return; + } + // TODO: We will add the code to process DIV control message later in here. + } + /** * This function is in charge of producing the consumer records to the writer buffers maintained by {@link StoreBufferService}. * @@ -2316,6 +2351,15 @@ protected boolean shouldProcessRecord(PubSubMessage this.divChunkAssembler = divChunkAssembler); + } + + public ChunkAssembler getDivChunkAssembler() { + return divChunkAssembler; + } } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/utils/ChunkAssembler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/utils/ChunkAssembler.java index 3000207a31..974c0f7cd6 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/utils/ChunkAssembler.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/utils/ChunkAssembler.java @@ -8,10 +8,13 @@ import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.serialization.RawBytesStoreDeserializerCache; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer; import com.linkedin.venice.serializer.RecordDeserializer; +import com.linkedin.venice.utils.ByteUtils; import com.linkedin.venice.utils.lazy.Lazy; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.avro.specific.SpecificRecord; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -84,6 +87,51 @@ public T bufferAndAssembleRecord( return decompressedAndDeserializedRecord; } + /** + * This method is used to buffer and assemble chunking records consumed from a Kafka topic. For chunked records, we + * buffer the chunks in memory until we have all the chunks for a given key. Once we have all the chunks indicated by + * receiving the chunk manifest record, we assemble the chunks and deserialize it from binary back into an object + * by using the provided deserializer and return the fully assembled record. + * + * The provided deserializer is associated with an AvroProtocolDefinition to select the appropriate + * protocol serializer for deserialization. + * + * Note that if the passed-in record is a regular record (not chunked), we will return the record after + * deserializing it without buffering it in memory. + */ + public T bufferAndAssembleRecord( + PubSubTopicPartition pubSubTopicPartition, + int schemaId, + byte[] keyBytes, + ByteBuffer valueBytes, + long recordOffset, + AvroProtocolDefinition protocol, + int readerSchemaId, + VeniceCompressor compressor) { + ByteBuffer assembledRecord = bufferAndAssembleRecord( + pubSubTopicPartition, + schemaId, + keyBytes, + valueBytes, + recordOffset, + readerSchemaId, + compressor); + T decompressedAndDeserializedRecord = null; + + // Record is a chunk. Return null + if (assembledRecord == null) { + return decompressedAndDeserializedRecord; + } + + try { + decompressedAndDeserializedRecord = decompressAndDeserialize(protocol, compressor, assembledRecord); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return decompressedAndDeserializedRecord; + } + /** * Buffers and assembles chunks of a record. * @@ -105,17 +153,20 @@ public ByteBuffer bufferAndAssembleRecord( } // If this is a record chunk, store the chunk and return null for processing this record if (schemaId == AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion()) { + // We need to extract data from valueBytes, otherwise it could contain non-data in the array. inMemoryStorageEngine.put( pubSubTopicPartition.getPartitionNumber(), keyBytes, - ValueRecord.create(schemaId, valueBytes.array()).serialize()); + ValueRecord.create(schemaId, ByteUtils.extractByteArray(valueBytes)).serialize()); return null; - } else if (schemaId == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) { + } + + if (schemaId == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) { // This is the last value. Store it, and now read it from the in memory store as a fully assembled value inMemoryStorageEngine.put( pubSubTopicPartition.getPartitionNumber(), keyBytes, - ValueRecord.create(schemaId, valueBytes.array()).serialize()); + ValueRecord.create(schemaId, ByteUtils.extractByteArray(valueBytes)).serialize()); try { assembledRecord = RawBytesChunkingAdapter.INSTANCE.get( inMemoryStorageEngine, @@ -137,7 +188,8 @@ public ByteBuffer bufferAndAssembleRecord( LOGGER.warn( "Encountered error assembling chunked record, this can happen when seeking between chunked records. Skipping offset {} on topic {}", recordOffset, - pubSubTopicPartition.getPubSubTopic().getName()); + pubSubTopicPartition.getPubSubTopic().getName(), + ex); } } else { // this is a fully specified record, no need to buffer and assemble it, just return the valueBytes @@ -163,6 +215,15 @@ protected T decompressAndDeserialize( return deserializer.deserialize(compressor.decompress(value)); } + protected T decompressAndDeserialize( + AvroProtocolDefinition protocol, + VeniceCompressor compressor, + ByteBuffer value) throws IOException { + InternalAvroSpecificSerializer deserializer = protocol.getSerializer(); + return deserializer + .deserialize(ByteUtils.extractByteArray(compressor.decompress(value)), protocol.getCurrentProtocolVersion()); + } + public void clearInMemoryDB() { inMemoryStorageEngine.drop(); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 31e94689c5..cf068152d5 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -103,6 +103,7 @@ import com.linkedin.davinci.store.record.ValueRecord; import com.linkedin.davinci.store.rocksdb.RocksDBServerConfig; import com.linkedin.davinci.transformer.TestStringRecordTransformer; +import com.linkedin.davinci.utils.ChunkAssembler; import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.exceptions.MemoryLimitExhaustedException; import com.linkedin.venice.exceptions.VeniceException; @@ -358,6 +359,8 @@ public static Object[][] sortedInputAndAAConfigProvider() { private HostLevelIngestionStats mockStoreIngestionStats; private AggVersionedDIVStats mockVersionedDIVStats; private AggVersionedIngestionStats mockVersionedStorageIngestionStats; + + private ChunkAssembler divChunkAssembler; private StoreIngestionTask storeIngestionTaskUnderTest; private ExecutorService taskPollingService; private StoreBufferService storeBufferService; @@ -538,6 +541,7 @@ public void methodSetUp() throws Exception { mockRemoteKafkaConsumer = mock(PubSubConsumerAdapter.class); kafkaUrlToRecordsThrottler = new HashMap<>(); kafkaClusterBasedRecordThrottler = new KafkaClusterBasedRecordThrottler(kafkaUrlToRecordsThrottler); + divChunkAssembler = mock(ChunkAssembler.class); mockTopicManager = mock(TopicManager.class); mockTopicManagerRepository = mock(TopicManagerRepository.class); @@ -1087,6 +1091,7 @@ private StoreIngestionTaskFactory.Builder getIngestionTaskFactoryBuilder( .setPubSubTopicRepository(pubSubTopicRepository) .setPartitionStateSerializer(partitionStateSerializer) .setRunnableForKillIngestionTasksForNonCurrentVersions(runnableForKillNonCurrentVersion) + .setDivChunkAssembler(divChunkAssembler) .setAAWCWorkLoadProcessingThreadPool( Executors.newFixedThreadPool(2, new DaemonThreadFactory("AA_WC_PARALLEL_PROCESSING"))); } @@ -4944,6 +4949,119 @@ public void testProcessConsumerActionsError() throws Exception { }, AA_OFF); } + @Test + public void testShouldProcessRecordForDivMessage() throws Exception { + // Set up the environment. + StoreIngestionTaskFactory.Builder builder = mock(StoreIngestionTaskFactory.Builder.class); + StorageEngineRepository mockStorageEngineRepository = mock(StorageEngineRepository.class); + doReturn(new DeepCopyStorageEngine(mockAbstractStorageEngine)).when(mockStorageEngineRepository) + .getLocalStorageEngine(anyString()); + doReturn(mockStorageEngineRepository).when(builder).getStorageEngineRepository(); + VeniceServerConfig veniceServerConfig = mock(VeniceServerConfig.class); + doReturn(VeniceProperties.empty()).when(veniceServerConfig).getClusterProperties(); + doReturn(VeniceProperties.empty()).when(veniceServerConfig).getKafkaConsumerConfigsForLocalConsumption(); + doReturn(VeniceProperties.empty()).when(veniceServerConfig).getKafkaConsumerConfigsForRemoteConsumption(); + doReturn(Object2IntMaps.emptyMap()).when(veniceServerConfig).getKafkaClusterUrlToIdMap(); + doReturn(veniceServerConfig).when(builder).getServerConfig(); + doReturn(mock(ReadOnlyStoreRepository.class)).when(builder).getMetadataRepo(); + doReturn(mock(ReadOnlySchemaRepository.class)).when(builder).getSchemaRepo(); + doReturn(mock(AggKafkaConsumerService.class)).when(builder).getAggKafkaConsumerService(); + doReturn(mockAggStoreIngestionStats).when(builder).getIngestionStats(); + doReturn(pubSubTopicRepository).when(builder).getPubSubTopicRepository(); + + Version version = mock(Version.class); + doReturn(1).when(version).getPartitionCount(); + doReturn(null).when(version).getPartitionerConfig(); + doReturn(VersionStatus.ONLINE).when(version).getStatus(); + doReturn(true).when(version).isNativeReplicationEnabled(); + doReturn("localhost").when(version).getPushStreamSourceAddress(); + + Store store = mock(Store.class); + doReturn(version).when(store).getVersion(eq(1)); + + String versionTopicName = "testStore_v1"; + String rtTopicName = "testStore_rt"; + VeniceStoreVersionConfig storeConfig = mock(VeniceStoreVersionConfig.class); + doReturn(Version.parseStoreFromVersionTopic(versionTopicName)).when(store).getName(); + doReturn(versionTopicName).when(storeConfig).getStoreVersionName(); + + LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask = spy( + new LeaderFollowerStoreIngestionTask( + builder, + store, + version, + mock(Properties.class), + mock(BooleanSupplier.class), + storeConfig, + -1, + false, + Optional.empty(), + null)); + + // Create a DIV record. + KafkaKey key = new KafkaKey(MessageType.CONTROL_MESSAGE_DIV, "test_key".getBytes()); + KafkaMessageEnvelope value = new KafkaMessageEnvelope(); + Put put = new Put(); + value.payloadUnion = put; + value.messageType = MessageType.PUT.getValue(); + PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic("testStore", 1)); + PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic("testStore")); + + PubSubTopicPartition versionTopicPartition = new PubSubTopicPartitionImpl(versionTopic, PARTITION_FOO); + PubSubTopicPartition rtPartition = new PubSubTopicPartitionImpl(rtTopic, PARTITION_FOO); + PubSubMessage remoteVTRecord = + new ImmutablePubSubMessage<>(key, value, versionTopicPartition, 0, 0, 0); + + PartitionConsumptionState pcsFoo = mock(PartitionConsumptionState.class); + when(pcsFoo.getLeaderFollowerState()).thenReturn(LeaderFollowerStateType.LEADER); + doReturn(true).when(pcsFoo).consumeRemotely(); + doReturn(false).when(pcsFoo).skipKafkaMessage(); + + OffsetRecord offsetRecord = mock(OffsetRecord.class); + doReturn(offsetRecord).when(pcsFoo).getOffsetRecord(); + doReturn(pubSubTopicRepository.getTopic(versionTopicName)).when(offsetRecord).getLeaderTopic(any()); + + // 1. Verify LeaderFollowerStoreIngestionTask.shouldProcessRecord() for consuming DIV records from remote VT topic. + leaderFollowerStoreIngestionTask.setPartitionConsumptionState(PARTITION_FOO, pcsFoo); + // remotely consume a VT topic and get a DIV record, should not process the record. + Assert.assertFalse(leaderFollowerStoreIngestionTask.shouldProcessRecord(remoteVTRecord)); + + // 2. Verify StoreIngestionTask.shouldProcessRecord() for consuming DIV records from local RT topic. + PubSubMessage rtRecord = + new ImmutablePubSubMessage<>(key, value, rtPartition, 0, 0, 0); + // consume a RT topic and get a DIV record, should process the record. + doReturn(false).when(pcsFoo).consumeRemotely(); + doReturn(pubSubTopicRepository.getTopic(rtTopicName)).when(offsetRecord).getLeaderTopic(any()); + Assert.assertFalse(leaderFollowerStoreIngestionTask.shouldProcessRecord(rtRecord)); + } + + @Test + public void testDivProcessing() throws Exception { + runTest(Collections.singleton(PARTITION_FOO), () -> { + + // Arrange + KafkaKey key = new KafkaKey(MessageType.CONTROL_MESSAGE_DIV, "test_key".getBytes()); + KafkaMessageEnvelope value = new KafkaMessageEnvelope(); + Put put = new Put(); + value.payloadUnion = put; + value.messageType = MessageType.PUT.getValue(); + PubSubMessage record = + new ImmutablePubSubMessage<>(key, value, new PubSubTopicPartitionImpl(pubSubTopic, PARTITION_FOO), 0, 0, 0); + // Act + storeIngestionTaskUnderTest.processDivControlMessage(record); + // Assert + verify(storeIngestionTaskUnderTest.getDivChunkAssembler()).bufferAndAssembleRecord( + any(), + anyInt(), + any(), + any(), + anyLong(), + (AvroProtocolDefinition) any(), + anyInt(), + any()); + }, AA_OFF); + } + private VeniceStoreVersionConfig getDefaultMockVeniceStoreVersionConfig( Consumer storeVersionConfigOverride) { // mock the store config diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java index 68e99e5346..267a3ad7c1 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java @@ -21,7 +21,8 @@ */ public enum MessageType implements VeniceEnumValue { PUT(0, Constants.PUT_KEY_HEADER_BYTE), DELETE(1, Constants.PUT_KEY_HEADER_BYTE), - CONTROL_MESSAGE(2, Constants.CONTROL_MESSAGE_KEY_HEADER_BYTE), UPDATE(3, Constants.UPDATE_KEY_HEADER_BYTE); + CONTROL_MESSAGE(2, Constants.CONTROL_MESSAGE_KEY_HEADER_BYTE), UPDATE(3, Constants.UPDATE_KEY_HEADER_BYTE), + CONTROL_MESSAGE_DIV(4, Constants.DIV_KEY_HEADER_BYTE); private static final List TYPES = EnumUtils.getEnumValuesList(MessageType.class); @@ -68,6 +69,7 @@ public Object getNewInstance() { case CONTROL_MESSAGE: return new ControlMessage(); case UPDATE: + case CONTROL_MESSAGE_DIV: return new Update(); default: throw new VeniceException("Unsupported " + getClass().getSimpleName() + " value: " + value); @@ -86,5 +88,6 @@ public static class Constants { public static final byte PUT_KEY_HEADER_BYTE = 0; public static final byte CONTROL_MESSAGE_KEY_HEADER_BYTE = 2; public static final byte UPDATE_KEY_HEADER_BYTE = 4; + public static final byte DIV_KEY_HEADER_BYTE = 8; } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java b/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java index 9723e72a0b..35c817f305 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java @@ -62,6 +62,13 @@ public boolean isControlMessage() { return keyHeaderByte == MessageType.CONTROL_MESSAGE.getKeyHeaderByte(); } + /** + * @return true if this key corresponds to a DIV control message, and false otherwise. + */ + public boolean isDivControlMessage() { + return keyHeaderByte == MessageType.CONTROL_MESSAGE_DIV.getKeyHeaderByte(); + } + /** * @return the content of the key (everything beyond the first byte) */ diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java index 88c539da53..a2050dbf90 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java @@ -96,6 +96,7 @@ private KafkaMessageEnvelope getEnvelope(byte keyHeaderByte) { return putEnvelopePool.get(); // No need to pool control messages since there are so few of them, and they are varied anyway, limiting reuse. case MessageType.Constants.CONTROL_MESSAGE_KEY_HEADER_BYTE: + case MessageType.Constants.DIV_KEY_HEADER_BYTE: return new KafkaMessageEnvelope(); case MessageType.Constants.UPDATE_KEY_HEADER_BYTE: return updateEnvelopePool.get(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java index 654e774a97..231a102308 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java @@ -12,6 +12,7 @@ import com.linkedin.venice.ingestion.protocol.ProcessShutdownCommand; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.Put; +import com.linkedin.venice.kafka.protocol.state.GlobalDivState; import com.linkedin.venice.kafka.protocol.state.PartitionState; import com.linkedin.venice.kafka.protocol.state.StoreVersionState; import com.linkedin.venice.meta.Store; @@ -174,7 +175,9 @@ public enum AvroProtocolDefinition { * Value schema for change capture event. * TODO: Figure out a way to pull in protocol from different view class. */ - RECORD_CHANGE_EVENT(1, RecordChangeEvent.class); + RECORD_CHANGE_EVENT(1, RecordChangeEvent.class), + + GLOBAL_DIV_STATE(-15, GlobalDivState.class); private static final Set magicByteSet = validateMagicBytes(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index 28d28ffc4d..96d5ff5d26 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -5,6 +5,7 @@ import static com.linkedin.venice.message.KafkaKey.CONTROL_MESSAGE_KAFKA_KEY_LENGTH; import static com.linkedin.venice.pubsub.api.PubSubMessageHeaders.VENICE_LEADER_COMPLETION_STATE_HEADER; import static com.linkedin.venice.pubsub.api.PubSubMessageHeaders.VENICE_TRANSPORT_PROTOCOL_HEADER; +import static com.linkedin.venice.serialization.avro.AvroProtocolDefinition.GLOBAL_DIV_STATE; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.venice.annotation.Threadsafe; @@ -939,17 +940,7 @@ public CompletableFuture put( KafkaKey kafkaKey = new KafkaKey(MessageType.PUT, serializedKey); // Initialize the SpecificRecord instances used by the Avro-based Kafka protocol - Put putPayload = new Put(); - putPayload.putValue = ByteBuffer.wrap(serializedValue); - putPayload.schemaId = valueSchemaId; - - if (putMetadata == null) { - putPayload.replicationMetadataVersionId = VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID; - putPayload.replicationMetadataPayload = EMPTY_BYTE_BUFFER; - } else { - putPayload.replicationMetadataVersionId = putMetadata.getRmdVersionId(); - putPayload.replicationMetadataPayload = putMetadata.getRmdPayload(); - } + Put putPayload = buildPutPayload(serializedValue, valueSchemaId, putMetadata); CompletableFuture produceResultFuture = sendMessage( producerMetadata -> kafkaKey, MessageType.PUT, @@ -972,6 +963,141 @@ public CompletableFuture put( return produceResultFuture; } + /** + * This is the main method to send DIV messages to a kafka topic through VeniceWriter. The method decides whether to + * send the messages in chunked or non-chunked mode based on the size of the message. Today, DIV is the only user of + * this method, but it can be extended easily to support other class types in the future. + * + * All the messages sent through this method are of type {@link MessageType#CONTROL_MESSAGE_DIV} in its KafkaKey and + * all their corresponding {@link KafkaMessageEnvelope} uses {@link Put} as the payload. Inside the Put payload, the + * actual message is stored in the putValue field and the schema id has 3 cases: + * + * 1. If the message is non-chunked, the schema id is set to {@link AvroProtocolDefinition#GLOBAL_DIV_STATE}. + * 2. If the message is chunk message, the schema id is set to {@link AvroProtocolDefinition#CHUNK}. + * 3. If the message is a chunk manifest message, the schema id is set to {@link AvroProtocolDefinition#CHUNKED_VALUE_MANIFEST}. + */ + public CompletableFuture sendChunkSupportedDivMessage(int partition, K key, V value) { + if (partition < 0 || partition >= numberOfPartitions) { + throw new VeniceException("Invalid partition: " + partition); + } + + byte[] serializedKey = keySerializer.serialize(topicName, key); + byte[] serializedValue = valueSerializer.serialize(topicName, value); + int totalRecordSize = calculateTotalRecordSize(serializedKey, serializedValue, null); + + if (isChunkingNeededForRecord(totalRecordSize)) { + return sendDivMessageChunked( + partition, + serializedKey, + serializedValue, + GLOBAL_DIV_STATE.getCurrentProtocolVersion(), + null); + } else { + return sendDivMessageNonChunked( + partition, + serializedKey, + serializedValue, + GLOBAL_DIV_STATE.getCurrentProtocolVersion(), + null); + } + } + + private CompletableFuture sendDivMessageChunked( + int partition, + byte[] serializedKey, + byte[] serializedValue, + int valueSchemaId, + PutMetadata putMetadata) { + int replicationMetadataPayloadSize = putMetadata == null ? 0 : putMetadata.getSerializedSize(); + final Supplier reportSizeGenerator = + () -> getSizeReport(serializedKey.length, serializedValue.length, replicationMetadataPayloadSize); + // TODO: this needs to be changed later to adapt to div purpose. + final CompletableFuture completableFuture = new CompletableFuture<>(); + PubSubProducerCallback callback = new ErrorPropagationCallback(new CompletableFutureCallback(completableFuture)); + BiConsumer sendMessageFunction = (keyProvider, putPayload) -> sendMessage( + keyProvider, + MessageType.PUT, + putPayload, + partition, + callback, + DEFAULT_LEADER_METADATA_WRAPPER, + VENICE_DEFAULT_LOGICAL_TS); + + ChunkedPayloadAndManifest valueChunksAndManifest = WriterChunkingHelper.chunkPayloadAndSend( + serializedKey, + serializedValue, + MessageType.CONTROL_MESSAGE_DIV, + true, + valueSchemaId, + 0, + false, + reportSizeGenerator, + maxSizeForUserPayloadPerMessageInBytes, + keyWithChunkingSuffixSerializer, + sendMessageFunction); + + final int sizeAvailablePerMessage = maxSizeForUserPayloadPerMessageInBytes - serializedKey.length; + Put putManifestsPayload = + buildManifestPayload(null, putMetadata, valueChunksAndManifest, sizeAvailablePerMessage, reportSizeGenerator); + return sendManifestMessage( + putManifestsPayload, + serializedKey, + MessageType.CONTROL_MESSAGE_DIV, + valueChunksAndManifest, + callback, + null, + partition, + null, + null, + DEFAULT_LEADER_METADATA_WRAPPER, + APP_DEFAULT_LOGICAL_TS); + } + + private CompletableFuture sendDivMessageNonChunked( + int partition, + byte[] serializedKey, + byte[] serializedValue, + int valueSchemaId, + PutMetadata putMetadata) { + serializedKey = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(serializedKey); + KafkaKey divKey = new KafkaKey(MessageType.CONTROL_MESSAGE_DIV, serializedKey); + + // Initialize the SpecificRecord instances used by the Avro-based Kafka protocol + Put putPayload = buildPutPayload(serializedValue, valueSchemaId, putMetadata); + + // TODO: this needs to be changed later to adapt to div purpose. + final CompletableFuture completableFuture = new CompletableFuture<>(); + PubSubProducerCallback callback = new CompletableFutureCallback(completableFuture); + + return sendMessage( + producerMetadata -> divKey, + MessageType.PUT, + putPayload, + partition, + callback, + DEFAULT_LEADER_METADATA_WRAPPER, + APP_DEFAULT_LOGICAL_TS); + } + + private Put buildPutPayload(byte[] serializedValue, int valueSchemaId, PutMetadata putMetadata) { + Put putPayload = new Put(); + putPayload.putValue = ByteBuffer.wrap(serializedValue); + putPayload.schemaId = valueSchemaId; + + if (putMetadata == null) { + putPayload.replicationMetadataVersionId = VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID; + putPayload.replicationMetadataPayload = EMPTY_BYTE_BUFFER; + } else { + putPayload.replicationMetadataVersionId = putMetadata.getRmdVersionId(); + putPayload.replicationMetadataPayload = putMetadata.getRmdPayload(); + } + return putPayload; + } + + private int calculateTotalRecordSize(byte[] serializedKey, byte[] serializedValue, PutMetadata putMetadata) { + return serializedKey.length + serializedValue.length + (putMetadata == null ? 0 : putMetadata.getSerializedSize()); + } + /** * Write a message with the kafka message envelope (KME) passed in. This allows users re-using existing KME to * speed up the performance. If this is called, VeniceWriter will also reuse the existing DIV data (producer @@ -1516,31 +1642,58 @@ private CompletableFuture putLargeValue( keyWithChunkingSuffixSerializer, sendMessageFunction) : EMPTY_CHUNKED_PAYLOAD_AND_MANIFEST; + + final int sizeAvailablePerMessage = maxSizeForUserPayloadPerMessageInBytes - serializedKey.length; + Put putManifestsPayload = buildManifestPayload( + rmdChunksAndManifest, + putMetadata, + valueChunksAndManifest, + sizeAvailablePerMessage, + reportSizeGenerator); + CompletableFuture manifestProduceFuture = sendManifestMessage( + putManifestsPayload, + serializedKey, + MessageType.PUT, + valueChunksAndManifest, + callback, + rmdChunksAndManifest, + partition, + oldValueManifest, + oldRmdManifest, + leaderMetadataWrapper, + logicalTs); + + DeleteMetadata deleteMetadata = new DeleteMetadata( + valueSchemaId, + putManifestsPayload.replicationMetadataVersionId, + VeniceWriter.EMPTY_BYTE_BUFFER); + deleteDeprecatedChunksFromManifest( + oldValueManifest, + partition, + chunkCallback, + leaderMetadataWrapper, + deleteMetadata); + deleteDeprecatedChunksFromManifest(oldRmdManifest, partition, chunkCallback, leaderMetadataWrapper, deleteMetadata); + + return manifestProduceFuture; + } + + private CompletableFuture sendManifestMessage( + Put putManifestsPayload, + byte[] serializedKey, + MessageType keyType, + ChunkedPayloadAndManifest valueChunksAndManifest, + PubSubProducerCallback callback, + ChunkedPayloadAndManifest rmdChunksAndManifest, + int partition, + ChunkedValueManifest oldValueManifest, + ChunkedValueManifest oldRmdManifest, + LeaderMetadataWrapper leaderMetadataWrapper, + long logicalTs) { // Now that we've sent all the chunks, we can take care of the final value, the manifest. byte[] topLevelKey = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(serializedKey); - KeyProvider manifestKeyProvider = producerMetadata -> new KafkaKey(MessageType.PUT, topLevelKey); + KeyProvider manifestKeyProvider = producerMetadata -> new KafkaKey(keyType, topLevelKey); - Put putManifestsPayload = new Put(); - putManifestsPayload.putValue = - chunkedValueManifestSerializer.serialize(valueChunksAndManifest.getChunkedValueManifest()); - putManifestsPayload.schemaId = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(); - if (putMetadata == null) { - putManifestsPayload.replicationMetadataVersionId = VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID; - putManifestsPayload.replicationMetadataPayload = EMPTY_BYTE_BUFFER; - } else { - putManifestsPayload.replicationMetadataVersionId = putMetadata.getRmdVersionId(); - putManifestsPayload.replicationMetadataPayload = isRmdChunkingEnabled - ? chunkedValueManifestSerializer.serialize(rmdChunksAndManifest.getChunkedValueManifest()) - : putMetadata.getRmdPayload(); - } - final int sizeAvailablePerMessage = maxSizeForUserPayloadPerMessageInBytes - serializedKey.length; - if (putManifestsPayload.putValue.remaining() - + putManifestsPayload.replicationMetadataPayload.remaining() > sizeAvailablePerMessage) { - // This is a very desperate edge case... - throw new VeniceException( - "This message cannot be chunked, because even its manifest is too big to go through. " - + "Please reconsider your life choices. " + reportSizeGenerator.get()); - } if (callback instanceof ChunkAwareCallback) { /** We leave a handle to the key, chunks and manifests so that the {@link ChunkAwareCallback} can act on them */ ((ChunkAwareCallback) callback).setChunkingInfo( @@ -1555,7 +1708,7 @@ private CompletableFuture putLargeValue( // We only return the last future (the one for the manifest) and assume that once this one is finished, // all the chunks should also be finished, since they were sent first, and ordering should be guaranteed. - CompletableFuture manifestProduceFuture = sendMessage( + return sendMessage( manifestKeyProvider, MessageType.PUT, putManifestsPayload, @@ -1563,20 +1716,35 @@ private CompletableFuture putLargeValue( callback, leaderMetadataWrapper, logicalTs); + } - DeleteMetadata deleteMetadata = new DeleteMetadata( - valueSchemaId, - putManifestsPayload.replicationMetadataVersionId, - VeniceWriter.EMPTY_BYTE_BUFFER); - deleteDeprecatedChunksFromManifest( - oldValueManifest, - partition, - chunkCallback, - leaderMetadataWrapper, - deleteMetadata); - deleteDeprecatedChunksFromManifest(oldRmdManifest, partition, chunkCallback, leaderMetadataWrapper, deleteMetadata); - - return manifestProduceFuture; + private Put buildManifestPayload( + ChunkedPayloadAndManifest rmdChunksAndManifest, + PutMetadata putMetadata, + ChunkedPayloadAndManifest valueChunksAndManifest, + int sizeAvailablePerMessage, + Supplier reportSizeGenerator) { + Put putManifestsPayload = new Put(); + putManifestsPayload.putValue = + chunkedValueManifestSerializer.serialize(valueChunksAndManifest.getChunkedValueManifest()); + putManifestsPayload.schemaId = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(); + if (putMetadata == null) { + putManifestsPayload.replicationMetadataVersionId = VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID; + putManifestsPayload.replicationMetadataPayload = EMPTY_BYTE_BUFFER; + } else { + putManifestsPayload.replicationMetadataVersionId = putMetadata.getRmdVersionId(); + putManifestsPayload.replicationMetadataPayload = isRmdChunkingEnabled + ? chunkedValueManifestSerializer.serialize(rmdChunksAndManifest.getChunkedValueManifest()) + : putMetadata.getRmdPayload(); + } + if (putManifestsPayload.putValue.remaining() + + putManifestsPayload.replicationMetadataPayload.remaining() > sizeAvailablePerMessage) { + // This is a very desperate edge case... + throw new VeniceException( + "This message cannot be chunked, because even its manifest is too big to go through. " + + "Please reconsider your life choices. " + reportSizeGenerator.get()); + } + return putManifestsPayload; } /** diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java index 240043521e..8a6472f691 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java @@ -16,12 +16,15 @@ import java.util.ArrayList; import java.util.function.BiConsumer; import java.util.function.Supplier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * This class is a helper class that contains writer side chunking logics. */ public class WriterChunkingHelper { + private static final Logger LOGGER = LogManager.getLogger(WriterChunkingHelper.class); public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0); /** @@ -47,6 +50,32 @@ public static ChunkedPayloadAndManifest chunkPayloadAndSend( int maxSizeForUserPayloadPerMessageInBytes, KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer, BiConsumer sendMessageFunction) { + return chunkPayloadAndSend( + serializedKey, + payload, + MessageType.PUT, + isValuePayload, + schemaId, + chunkedKeySuffixStartingIndex, + isChunkAwareCallback, + sizeReport, + maxSizeForUserPayloadPerMessageInBytes, + keyWithChunkingSuffixSerializer, + sendMessageFunction); + } + + public static ChunkedPayloadAndManifest chunkPayloadAndSend( + byte[] serializedKey, + byte[] payload, + MessageType keyType, + boolean isValuePayload, + int schemaId, + int chunkedKeySuffixStartingIndex, + boolean isChunkAwareCallback, + Supplier sizeReport, + int maxSizeForUserPayloadPerMessageInBytes, + KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer, + BiConsumer sendMessageFunction) { int sizeAvailablePerMessage = maxSizeForUserPayloadPerMessageInBytes - serializedKey.length; validateAvailableSizePerMessage(maxSizeForUserPayloadPerMessageInBytes, sizeAvailablePerMessage, sizeReport); int numberOfChunks = (int) Math.ceil((double) payload.length / (double) sizeAvailablePerMessage); @@ -75,7 +104,7 @@ public static ChunkedPayloadAndManifest chunkPayloadAndSend( subsequentKeyProvider = producerMetadata -> { ByteBuffer keyWithSuffix = keyWithChunkingSuffixSerializer.serializeChunkedKey(serializedKey, chunkedKeySuffix); chunkedValueManifest.keysWithChunkIdSuffix.add(keyWithSuffix); - return new KafkaKey(MessageType.PUT, keyWithSuffix.array()); + return new KafkaKey(keyType, keyWithSuffix.array()); }; firstKeyProvider = producerMetadata -> { chunkedKeySuffix.chunkId.producerGUID = producerMetadata.producerGUID; @@ -83,6 +112,7 @@ public static ChunkedPayloadAndManifest chunkPayloadAndSend( chunkedKeySuffix.chunkId.messageSequenceNumber = producerMetadata.messageSequenceNumber; return subsequentKeyProvider.getKey(producerMetadata); }; + for (int chunkIndex = 0; chunkIndex < numberOfChunks; chunkIndex++) { int chunkStartByteIndex = chunkIndex * sizeAvailablePerMessage; int chunkEndByteIndex = Math.min((chunkIndex + 1) * sizeAvailablePerMessage, payload.length); diff --git a/internal/venice-common/src/main/resources/avro/GlobalDivState/v-15/GlobalDivState.avsc b/internal/venice-common/src/main/resources/avro/GlobalDivState/v-15/GlobalDivState.avsc new file mode 100644 index 0000000000..7677bfa326 --- /dev/null +++ b/internal/venice-common/src/main/resources/avro/GlobalDivState/v-15/GlobalDivState.avsc @@ -0,0 +1,85 @@ +{ + "name": "GlobalDivState", + "namespace": "com.linkedin.venice.kafka.protocol.state", + "doc": "", + "type": "record", + "fields": [ + { + "name": "srcUrl", + "doc": "Upstream Kafka bootstrap server url.", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "producerStates", + "type": { + "type": "map", + "doc": "A map that maps producer GUID -> producer state for real-time data.", + "values": { + "name": "ProducerPartitionState", + "namespace": "com.linkedin.venice.kafka.protocol.state", + "doc": "A record containing the state pertaining to the data sent by one upstream producer into one partition.", + "type": "record", + "fields": [ + { + "name": "segmentNumber", + "doc": "The current segment number corresponds to the last (highest) segment number for which we have seen a StartOfSegment control message.", + "type": "int" + }, + { + "name": "segmentStatus", + "doc": "The status of the current segment: 0 => NOT_STARTED, 1 => IN_PROGRESS, 2 => END_OF_INTERMEDIATE_SEGMENT, 3 => END_OF_FINAL_SEGMENT.", + "type": "int" + }, + { + "name": "isRegistered", + "doc": "Whether the segment is registered. i.e. received Start_Of_Segment to initialize the segment.", + "type": "boolean", + "default": false + }, + { + "name": "messageSequenceNumber", + "doc": "The current message sequence number, within the current segment, which we have seen for this partition/producer pair.", + "type": "int" + }, + { + "name": "messageTimestamp", + "doc": "The timestamp included in the last message we have seen for this partition/producer pair.", + "type": "long" + }, + { + "name": "checksumType", + "doc": "The current mapping is the following: 0 => None, 1 => MD5, 2 => Adler32, 3 => CRC32.", + "type": "int" + }, + { + "name": "checksumState", + "doc": "The value of the checksum computed since the last StartOfSegment ControlMessage.", + "type": "bytes" + }, + { + "name": "aggregates", + "doc": "The aggregates that have been computed so far since the last StartOfSegment ControlMessage.", + "type": { + "type": "map", + "values": "long" + } + }, + { + "name": "debugInfo", + "doc": "The debug info received as part of the last StartOfSegment ControlMessage.", + "type": { + "type": "map", + "values": "string" + } + } + ] + } + }, + "default": {} + } + ] +} \ No newline at end of file diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/kafka/protocol/enums/MessageTypeTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/kafka/protocol/enums/MessageTypeTest.java index 2477b2ef83..af0c721720 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/kafka/protocol/enums/MessageTypeTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/kafka/protocol/enums/MessageTypeTest.java @@ -17,6 +17,7 @@ protected Map expectedMapping() { .put(1, MessageType.DELETE) .put(2, MessageType.CONTROL_MESSAGE) .put(3, MessageType.UPDATE) + .put(4, MessageType.CONTROL_MESSAGE_DIV) .build(); } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java index f3a2d5ca07..9820607b8a 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java @@ -666,4 +666,64 @@ public void testPutTooLargeRecord(boolean isChunkingEnabled) { } } } + + @Test + public void testGlobalDivChunking() { + final int maxRecordSizeBytes = BYTES_PER_MB; + CompletableFuture mockedFuture = mock(CompletableFuture.class); + PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class); + when(mockedProducer.sendMessage(any(), any(), any(), any(), any(), any())).thenReturn(mockedFuture); + final VeniceKafkaSerializer serializer = new VeniceAvroKafkaSerializer(TestWriteUtils.STRING_SCHEMA); + final VeniceWriterOptions options = new VeniceWriterOptions.Builder("testTopic").setPartitionCount(1) + .setKeySerializer(serializer) + .setValueSerializer(serializer) + .setMaxRecordSizeBytes(maxRecordSizeBytes) + .build(); + VeniceProperties props = VeniceProperties.empty(); + final VeniceWriter writer = new VeniceWriter<>(options, props, mockedProducer); + + final int SMALL_VALUE_SIZE = maxRecordSizeBytes / 2; + final int TOO_LARGE_VALUE_SIZE = maxRecordSizeBytes * 2; + for (int size: Arrays.asList(SMALL_VALUE_SIZE, TOO_LARGE_VALUE_SIZE)) { + char[] valueChars = new char[size]; + Arrays.fill(valueChars, '*'); + try { + writer.sendChunkSupportedDivMessage(0, "test-key", new String(valueChars)); + } catch (Exception e) { + Assert.fail("Shouldn't have thrown any exception"); + } + + ArgumentCaptor kmeArgumentCaptor = ArgumentCaptor.forClass(KafkaMessageEnvelope.class); + ArgumentCaptor kafkaKeyArgumentCaptor = ArgumentCaptor.forClass(KafkaKey.class); + + if (size == SMALL_VALUE_SIZE) { + // 1 SOS, 1 DivControlMessage + verify(mockedProducer, times(2)) + .sendMessage(any(), any(), kafkaKeyArgumentCaptor.capture(), kmeArgumentCaptor.capture(), any(), any()); + } else { // TOO_LARGE_VALUE_SIZE + // 1 SOS, 4 DivChunk, 1 DivManifest + verify(mockedProducer, times(6)) + .sendMessage(any(), any(), kafkaKeyArgumentCaptor.capture(), kmeArgumentCaptor.capture(), any(), any()); + } + + for (KafkaKey key: kafkaKeyArgumentCaptor.getAllValues()) { + Assert.assertTrue(key.isDivControlMessage() || key.isControlMessage()); + } + + for (KafkaMessageEnvelope kme: kmeArgumentCaptor.getAllValues()) { + if (kme.messageType == MessageType.CONTROL_MESSAGE.getValue()) { + Assert.assertTrue( + ((ControlMessage) kme.getPayloadUnion()).getControlMessageType() == ControlMessageType.START_OF_SEGMENT + .getValue()); + } else { + Assert.assertTrue(kme.messageType == MessageType.PUT.getValue()); + Put put = (Put) kme.payloadUnion; + Assert.assertTrue( + put.getSchemaId() == AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion() + || put.getSchemaId() == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion() + || put.getSchemaId() == AvroProtocolDefinition.GLOBAL_DIV_STATE.getCurrentProtocolVersion()); + } + } + } + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestGlobalDiv.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestGlobalDiv.java new file mode 100644 index 0000000000..12c79a7027 --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestGlobalDiv.java @@ -0,0 +1,197 @@ +package com.linkedin.venice.endToEnd; + +import static com.linkedin.venice.ConfigKeys.*; +import static com.linkedin.venice.integration.utils.VeniceClusterWrapper.DEFAULT_KEY_SCHEMA; +import static com.linkedin.venice.integration.utils.VeniceClusterWrapper.DEFAULT_VALUE_SCHEMA; +import static com.linkedin.venice.utils.Utils.*; + +import com.linkedin.davinci.kafka.consumer.KafkaConsumerService; +import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.guid.GuidUtils; +import com.linkedin.venice.integration.utils.PubSubBrokerWrapper; +import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceClusterWrapper; +import com.linkedin.venice.kafka.protocol.GUID; +import com.linkedin.venice.kafka.protocol.state.GlobalDivState; +import com.linkedin.venice.kafka.protocol.state.ProducerPartitionState; +import com.linkedin.venice.kafka.validation.SegmentStatus; +import com.linkedin.venice.kafka.validation.checksum.CheckSumType; +import com.linkedin.venice.meta.PersistenceType; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer; +import com.linkedin.venice.utils.ByteUtils; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.Time; +import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.writer.VeniceWriter; +import com.linkedin.venice.writer.VeniceWriterOptions; +import java.nio.ByteBuffer; +import java.security.SecureRandom; +import java.util.AbstractMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.stream.IntStream; +import org.apache.avro.util.Utf8; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestGlobalDiv { + private static final Logger LOGGER = LogManager.getLogger(TestGlobalDiv.class); + + private VeniceClusterWrapper sharedVenice; + + SecureRandom random = new SecureRandom(); + + @BeforeClass + public void setUp() { + Properties extraProperties = new Properties(); + extraProperties.setProperty(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB.name()); + extraProperties.setProperty(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1L)); + + // N.B.: RF 2 with 3 servers is important, in order to test both the leader and follower code paths + sharedVenice = ServiceFactory.getVeniceCluster(1, 0, 0, 2, 1000000, false, false, extraProperties); + + Properties routerProperties = new Properties(); + + sharedVenice.addVeniceRouter(routerProperties); + // Added a server with shared consumer enabled. + Properties serverPropertiesWithSharedConsumer = new Properties(); + serverPropertiesWithSharedConsumer.setProperty(SSL_TO_KAFKA_LEGACY, "false"); + extraProperties.setProperty(SERVER_CONSUMER_POOL_SIZE_PER_KAFKA_CLUSTER, "3"); + extraProperties.setProperty(DEFAULT_MAX_NUMBER_OF_PARTITIONS, "4"); + extraProperties.setProperty( + SERVER_SHARED_CONSUMER_ASSIGNMENT_STRATEGY, + KafkaConsumerService.ConsumerAssignmentStrategy.PARTITION_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY.name()); + // Enable global div feature in the integration test. + extraProperties.setProperty(SERVER_GLOBAL_RT_DIV_ENABLED, "true"); + + sharedVenice.addVeniceServer(serverPropertiesWithSharedConsumer, extraProperties); + sharedVenice.addVeniceServer(serverPropertiesWithSharedConsumer, extraProperties); + sharedVenice.addVeniceServer(serverPropertiesWithSharedConsumer, extraProperties); + LOGGER.info("Finished creating VeniceClusterWrapper"); + } + + @AfterClass + public void cleanUp() { + Utils.closeQuietlyWithErrorLogged(sharedVenice); + } + + /** + * This test verifies functionality of sending chunked/non-chunked div messages: + * + * 1. Create a hybrid store and create a store version. + * 2. Send a non-chunked div message to the version topic. + * 3. Send a chunked div message to the version topic. + * 4. Verify the messages are sent successfully. + * 5. TODO: Add more verification steps on the server side later. + */ + @Test(timeOut = 180 * Time.MS_PER_SECOND) + public void testChunkedDiv() { + String storeName = Utils.getUniqueString("store"); + final int partitionCount = 1; + final int keyCount = 10; + + UpdateStoreQueryParams params = new UpdateStoreQueryParams() + // set hybridRewindSecond to a big number so following versions won't ignore old records in RT + .setHybridRewindSeconds(2000000) + .setHybridOffsetLagThreshold(10) + .setPartitionCount(partitionCount); + + sharedVenice.useControllerClient(client -> { + client.createNewStore(storeName, "owner", DEFAULT_KEY_SCHEMA, DEFAULT_VALUE_SCHEMA); + client.updateStore(storeName, params); + }); + + // Create store version 1 by writing keyCount records. + sharedVenice.createVersion( + storeName, + DEFAULT_KEY_SCHEMA, + DEFAULT_VALUE_SCHEMA, + IntStream.range(0, keyCount).mapToObj(i -> new AbstractMap.SimpleEntry<>(i, i))); + + Properties veniceWriterProperties = new Properties(); + veniceWriterProperties.put(KAFKA_BOOTSTRAP_SERVERS, sharedVenice.getPubSubBrokerWrapper().getAddress()); + + // Set max segment elapsed time to 0 to enforce creating small segments aggressively + veniceWriterProperties.put(VeniceWriter.MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS, "0"); + veniceWriterProperties.putAll( + PubSubBrokerWrapper + .getBrokerDetailsForClients(Collections.singletonList(sharedVenice.getPubSubBrokerWrapper()))); + PubSubProducerAdapterFactory pubSubProducerAdapterFactory = + sharedVenice.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); + + try (VeniceWriter verstionTopicWriter = + TestUtils.getVeniceWriterFactory(veniceWriterProperties, pubSubProducerAdapterFactory) + .createVeniceWriter(new VeniceWriterOptions.Builder(Version.composeKafkaTopic(storeName, 1)).build())) { + + InternalAvroSpecificSerializer serializer = + AvroProtocolDefinition.GLOBAL_DIV_STATE.getSerializer(); + + GlobalDivState state = createGlobalDivState("localhost:9090", false); + verstionTopicWriter + .sendChunkSupportedDivMessage( + 0, + "NonChunkedKey".getBytes(), + ByteUtils.extractByteArray(serializer.serialize(state))) + .get(); + LOGGER.info("Sent non-chunked div message"); + + state = createGlobalDivState("localhost:9092", true); + verstionTopicWriter + .sendChunkSupportedDivMessage( + 0, + "ChunkedKey".getBytes(), + ByteUtils.extractByteArray(serializer.serialize(state))) + .get(); + LOGGER.info("Sent chunked div message"); + + // TODO: Add more verification steps later. + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private GlobalDivState createGlobalDivState(String srcUrl, boolean isChunked) { + GlobalDivState state = new GlobalDivState(); + state.producerStates = new HashMap<>(); + state.setSrcUrl(srcUrl); + + if (isChunked) { + // Create a large state with 20k entries. + for (int i = 0; i < 20000; i++) { + byte[] bytes = new byte[256]; + random.nextBytes(bytes); + GUID guid = new GUID(bytes); + state.producerStates.put(guidToUtf8(guid), createProducerPartitionState(i, i)); + } + } else { + state.producerStates = Collections.emptyMap(); + } + return state; + } + + private CharSequence guidToUtf8(GUID guid) { + return new Utf8(GuidUtils.getCharSequenceFromGuid(guid)); + } + + private ProducerPartitionState createProducerPartitionState(int segment, int sequence) { + ProducerPartitionState ppState = new ProducerPartitionState(); + ppState.segmentNumber = segment; + ppState.segmentStatus = SegmentStatus.IN_PROGRESS.getValue(); + ppState.messageSequenceNumber = sequence; + ppState.messageTimestamp = System.currentTimeMillis(); + ppState.checksumType = CheckSumType.NONE.getValue(); + ppState.checksumState = ByteBuffer.allocate(0); + ppState.aggregates = new HashMap<>(); + ppState.debugInfo = new HashMap<>(); + return ppState; + } +}