From c5dfab02b995a5a3e49140315072c4765e8a7e44 Mon Sep 17 00:00:00 2001 From: Matt Peterson Date: Thu, 12 Sep 2024 14:27:36 -0600 Subject: [PATCH] fix: added test coverage for StreamVerifier Signed-off-by: Matt Peterson --- .../BlockStreamProtocolException.java | 24 +++++ .../server/verifier/StreamVerifierImpl.java | 49 ++++++++-- server/src/main/java/module-info.java | 1 + .../verifier/StreamVerifierImplTest.java | 93 +++++++++++++++++++ 4 files changed, 158 insertions(+), 9 deletions(-) create mode 100644 server/src/main/java/com/hedera/block/server/exception/BlockStreamProtocolException.java diff --git a/server/src/main/java/com/hedera/block/server/exception/BlockStreamProtocolException.java b/server/src/main/java/com/hedera/block/server/exception/BlockStreamProtocolException.java new file mode 100644 index 00000000..6d76c52a --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/exception/BlockStreamProtocolException.java @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.exception; + +public class BlockStreamProtocolException extends Exception { + + public BlockStreamProtocolException(String message) { + super(message); + } +} diff --git a/server/src/main/java/com/hedera/block/server/verifier/StreamVerifierImpl.java b/server/src/main/java/com/hedera/block/server/verifier/StreamVerifierImpl.java index 1276ffe2..a2d2157d 100644 --- a/server/src/main/java/com/hedera/block/server/verifier/StreamVerifierImpl.java +++ b/server/src/main/java/com/hedera/block/server/verifier/StreamVerifierImpl.java @@ -17,11 +17,13 @@ package com.hedera.block.server.verifier; import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.LiveBlocksVerified; +import static java.lang.System.Logger.Level.DEBUG; import static java.lang.System.Logger.Level.ERROR; import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.events.BlockNodeEventHandler; import com.hedera.block.server.events.ObjectEvent; +import com.hedera.block.server.exception.BlockStreamProtocolException; import com.hedera.block.server.mediator.SubscriptionHandler; import com.hedera.block.server.metrics.MetricsService; import com.hedera.block.server.notifier.Notifier; @@ -29,6 +31,7 @@ import com.hedera.block.server.service.ServiceStatus; import com.hedera.hapi.block.SubscribeStreamResponse; import com.hedera.hapi.block.stream.BlockItem; +import com.hedera.pbj.runtime.OneOf; import edu.umd.cs.findbugs.annotations.NonNull; import java.io.IOException; import java.util.Optional; @@ -44,6 +47,9 @@ public class StreamVerifierImpl private final MetricsService metricsService; private final ServiceStatus serviceStatus; + private static final String PROTOCOL_VIOLATION_MESSAGE = + "Protocol Violation. %s is OneOf type %s but %s is null.\n%s"; + public StreamVerifierImpl( @NonNull final SubscriptionHandler subscriptionHandler, @NonNull final BlockWriter blockWriter, @@ -62,22 +68,47 @@ public void onEvent( ObjectEvent event, long sequence, boolean endOfBatch) { try { if (serviceStatus.isRunning()) { - // Persist the BlockItem + final SubscribeStreamResponse subscribeStreamResponse = event.get(); - final BlockItem blockItem = subscribeStreamResponse.blockItem(); - Optional result = blockWriter.write(blockItem); - if (result.isPresent()) { - // Publish the block item back upstream to the notifier - // to send responses to producers. - notifier.publish(blockItem); - metricsService.get(LiveBlocksVerified).increment(); + final OneOf oneOfTypeOneOf = + subscribeStreamResponse.response(); + switch (oneOfTypeOneOf.kind()) { + case BLOCK_ITEM -> { + final BlockItem blockItem = subscribeStreamResponse.blockItem(); + if (blockItem == null) { + final String message = + PROTOCOL_VIOLATION_MESSAGE.formatted( + "SubscribeStreamResponse", + "BLOCK_ITEM", + "block_item", + subscribeStreamResponse); + LOGGER.log(ERROR, message); + throw new BlockStreamProtocolException(message); + } else { + // Persist the BlockItem + Optional result = blockWriter.write(blockItem); + if (result.isPresent()) { + // Publish the block item back upstream to the notifier + // to send responses to producers. + notifier.publish(blockItem); + metricsService.get(LiveBlocksVerified).increment(); + } + } + } + case STATUS -> LOGGER.log( + DEBUG, "Unexpected received a status message rather than a block item"); + default -> { + final String message = "Unknown response type: " + oneOfTypeOneOf.kind(); + LOGGER.log(ERROR, message); + throw new BlockStreamProtocolException(message); + } } } else { LOGGER.log( ERROR, "Service is not running. Block item will not be processed further."); } - } catch (IOException e) { + } catch (BlockStreamProtocolException | IOException e) { // Trigger the server to stop accepting new requests serviceStatus.stopRunning(getClass().getName()); diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index fa73c428..ccd73557 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -4,6 +4,7 @@ module com.hedera.block.server { exports com.hedera.block.server; exports com.hedera.block.server.consumer; + exports com.hedera.block.server.exception; exports com.hedera.block.server.persistence.storage; exports com.hedera.block.server.persistence.storage.write; exports com.hedera.block.server.persistence.storage.read; diff --git a/server/src/test/java/com/hedera/block/server/verifier/StreamVerifierImplTest.java b/server/src/test/java/com/hedera/block/server/verifier/StreamVerifierImplTest.java index e7397342..a849f224 100644 --- a/server/src/test/java/com/hedera/block/server/verifier/StreamVerifierImplTest.java +++ b/server/src/test/java/com/hedera/block/server/verifier/StreamVerifierImplTest.java @@ -18,7 +18,11 @@ import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.LiveBlocksVerified; import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems; +import static com.hedera.hapi.block.SubscribeStreamResponseCode.READ_STREAM_SUCCESS; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -31,6 +35,7 @@ import com.hedera.block.server.service.ServiceStatus; import com.hedera.hapi.block.SubscribeStreamResponse; import com.hedera.hapi.block.stream.BlockItem; +import com.hedera.pbj.runtime.OneOf; import java.util.List; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -52,6 +57,8 @@ public class StreamVerifierImplTest { @Mock private MetricsService metricsService; + private static final int testTimeout = 0; + @Test public void testOnEventWhenServiceIsNotRunning() { @@ -79,4 +86,90 @@ public void testOnEventWhenServiceIsNotRunning() { verify(notifier, never()).publish(blockItems.getFirst()); verify(metricsService, never()).get(LiveBlocksVerified); } + + @Test + public void testBlockItemIsNull() { + when(blockNodeContext.metricsService()).thenReturn(metricsService); + when(serviceStatus.isRunning()).thenReturn(true); + + final var streamVerifier = + new StreamVerifierImpl( + subscriptionHandler, + blockWriter, + notifier, + blockNodeContext, + serviceStatus); + + final List blockItems = generateBlockItems(1); + final var subscribeStreamResponse = + spy(SubscribeStreamResponse.newBuilder().blockItem(blockItems.getFirst()).build()); + + // Force the block item to be null + when(subscribeStreamResponse.blockItem()).thenReturn(null); + final ObjectEvent event = new ObjectEvent<>(); + event.set(subscribeStreamResponse); + + streamVerifier.onEvent(event, 0, false); + + verify(serviceStatus, timeout(testTimeout).times(1)).stopRunning(any()); + verify(subscriptionHandler, timeout(testTimeout).times(1)).unsubscribe(any()); + verify(notifier, timeout(testTimeout).times(1)).notifyUnrecoverableError(); + } + + @Test + public void testSubscribeStreamResponseTypeUnknown() { + when(blockNodeContext.metricsService()).thenReturn(metricsService); + when(serviceStatus.isRunning()).thenReturn(true); + + final var streamVerifier = + new StreamVerifierImpl( + subscriptionHandler, + blockWriter, + notifier, + blockNodeContext, + serviceStatus); + + final List blockItems = generateBlockItems(1); + final var subscribeStreamResponse = + spy(SubscribeStreamResponse.newBuilder().blockItem(blockItems.getFirst()).build()); + + // Force the block item to be UNSET + final OneOf illegalOneOf = + new OneOf<>(SubscribeStreamResponse.ResponseOneOfType.UNSET, null); + when(subscribeStreamResponse.response()).thenReturn(illegalOneOf); + + final ObjectEvent event = new ObjectEvent<>(); + event.set(subscribeStreamResponse); + + streamVerifier.onEvent(event, 0, false); + + verify(serviceStatus, timeout(testTimeout).times(1)).stopRunning(any()); + verify(subscriptionHandler, timeout(testTimeout).times(1)).unsubscribe(any()); + verify(notifier, timeout(testTimeout).times(1)).notifyUnrecoverableError(); + } + + @Test + public void testSubscribeStreamResponseTypeStatus() { + when(blockNodeContext.metricsService()).thenReturn(metricsService); + when(serviceStatus.isRunning()).thenReturn(true); + + final var streamVerifier = + new StreamVerifierImpl( + subscriptionHandler, + blockWriter, + notifier, + blockNodeContext, + serviceStatus); + + final SubscribeStreamResponse subscribeStreamResponse = + spy(SubscribeStreamResponse.newBuilder().status(READ_STREAM_SUCCESS).build()); + final ObjectEvent event = new ObjectEvent<>(); + event.set(subscribeStreamResponse); + + streamVerifier.onEvent(event, 0, false); + + verify(serviceStatus, never()).stopRunning(any()); + verify(subscriptionHandler, never()).unsubscribe(any()); + verify(notifier, never()).notifyUnrecoverableError(); + } }