Skip to content

Commit

Permalink
fix: added test coverage for StreamVerifier
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Sep 12, 2024
1 parent 6de31c0 commit c5dfab0
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server.exception;

public class BlockStreamProtocolException extends Exception {

public BlockStreamProtocolException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@
package com.hedera.block.server.verifier;

import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.LiveBlocksVerified;
import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.ERROR;

import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.events.BlockNodeEventHandler;
import com.hedera.block.server.events.ObjectEvent;
import com.hedera.block.server.exception.BlockStreamProtocolException;
import com.hedera.block.server.mediator.SubscriptionHandler;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.block.server.notifier.Notifier;
import com.hedera.block.server.persistence.storage.write.BlockWriter;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.hapi.block.SubscribeStreamResponse;
import com.hedera.hapi.block.stream.BlockItem;
import com.hedera.pbj.runtime.OneOf;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.util.Optional;
Expand All @@ -44,6 +47,9 @@ public class StreamVerifierImpl
private final MetricsService metricsService;
private final ServiceStatus serviceStatus;

private static final String PROTOCOL_VIOLATION_MESSAGE =
"Protocol Violation. %s is OneOf type %s but %s is null.\n%s";

public StreamVerifierImpl(
@NonNull final SubscriptionHandler<SubscribeStreamResponse> subscriptionHandler,
@NonNull final BlockWriter<BlockItem> blockWriter,
Expand All @@ -62,22 +68,47 @@ public void onEvent(
ObjectEvent<SubscribeStreamResponse> event, long sequence, boolean endOfBatch) {
try {
if (serviceStatus.isRunning()) {
// Persist the BlockItem

final SubscribeStreamResponse subscribeStreamResponse = event.get();
final BlockItem blockItem = subscribeStreamResponse.blockItem();
Optional<BlockItem> result = blockWriter.write(blockItem);
if (result.isPresent()) {
// Publish the block item back upstream to the notifier
// to send responses to producers.
notifier.publish(blockItem);
metricsService.get(LiveBlocksVerified).increment();
final OneOf<SubscribeStreamResponse.ResponseOneOfType> oneOfTypeOneOf =
subscribeStreamResponse.response();
switch (oneOfTypeOneOf.kind()) {
case BLOCK_ITEM -> {
final BlockItem blockItem = subscribeStreamResponse.blockItem();
if (blockItem == null) {
final String message =
PROTOCOL_VIOLATION_MESSAGE.formatted(
"SubscribeStreamResponse",
"BLOCK_ITEM",
"block_item",
subscribeStreamResponse);
LOGGER.log(ERROR, message);
throw new BlockStreamProtocolException(message);
} else {
// Persist the BlockItem
Optional<BlockItem> result = blockWriter.write(blockItem);
if (result.isPresent()) {
// Publish the block item back upstream to the notifier
// to send responses to producers.
notifier.publish(blockItem);
metricsService.get(LiveBlocksVerified).increment();
}
}
}
case STATUS -> LOGGER.log(
DEBUG, "Unexpected received a status message rather than a block item");
default -> {
final String message = "Unknown response type: " + oneOfTypeOneOf.kind();
LOGGER.log(ERROR, message);
throw new BlockStreamProtocolException(message);
}
}
} else {
LOGGER.log(
ERROR, "Service is not running. Block item will not be processed further.");
}

} catch (IOException e) {
} catch (BlockStreamProtocolException | IOException e) {

// Trigger the server to stop accepting new requests
serviceStatus.stopRunning(getClass().getName());
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
module com.hedera.block.server {
exports com.hedera.block.server;
exports com.hedera.block.server.consumer;
exports com.hedera.block.server.exception;
exports com.hedera.block.server.persistence.storage;
exports com.hedera.block.server.persistence.storage.write;
exports com.hedera.block.server.persistence.storage.read;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.LiveBlocksVerified;
import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems;
import static com.hedera.hapi.block.SubscribeStreamResponseCode.READ_STREAM_SUCCESS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -31,6 +35,7 @@
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.hapi.block.SubscribeStreamResponse;
import com.hedera.hapi.block.stream.BlockItem;
import com.hedera.pbj.runtime.OneOf;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -52,6 +57,8 @@ public class StreamVerifierImplTest {

@Mock private MetricsService metricsService;

private static final int testTimeout = 0;

@Test
public void testOnEventWhenServiceIsNotRunning() {

Expand Down Expand Up @@ -79,4 +86,90 @@ public void testOnEventWhenServiceIsNotRunning() {
verify(notifier, never()).publish(blockItems.getFirst());
verify(metricsService, never()).get(LiveBlocksVerified);
}

@Test
public void testBlockItemIsNull() {
when(blockNodeContext.metricsService()).thenReturn(metricsService);
when(serviceStatus.isRunning()).thenReturn(true);

final var streamVerifier =
new StreamVerifierImpl(
subscriptionHandler,
blockWriter,
notifier,
blockNodeContext,
serviceStatus);

final List<BlockItem> blockItems = generateBlockItems(1);
final var subscribeStreamResponse =
spy(SubscribeStreamResponse.newBuilder().blockItem(blockItems.getFirst()).build());

// Force the block item to be null
when(subscribeStreamResponse.blockItem()).thenReturn(null);
final ObjectEvent<SubscribeStreamResponse> event = new ObjectEvent<>();
event.set(subscribeStreamResponse);

streamVerifier.onEvent(event, 0, false);

verify(serviceStatus, timeout(testTimeout).times(1)).stopRunning(any());
verify(subscriptionHandler, timeout(testTimeout).times(1)).unsubscribe(any());
verify(notifier, timeout(testTimeout).times(1)).notifyUnrecoverableError();
}

@Test
public void testSubscribeStreamResponseTypeUnknown() {
when(blockNodeContext.metricsService()).thenReturn(metricsService);
when(serviceStatus.isRunning()).thenReturn(true);

final var streamVerifier =
new StreamVerifierImpl(
subscriptionHandler,
blockWriter,
notifier,
blockNodeContext,
serviceStatus);

final List<BlockItem> blockItems = generateBlockItems(1);
final var subscribeStreamResponse =
spy(SubscribeStreamResponse.newBuilder().blockItem(blockItems.getFirst()).build());

// Force the block item to be UNSET
final OneOf<SubscribeStreamResponse.ResponseOneOfType> illegalOneOf =
new OneOf<>(SubscribeStreamResponse.ResponseOneOfType.UNSET, null);
when(subscribeStreamResponse.response()).thenReturn(illegalOneOf);

final ObjectEvent<SubscribeStreamResponse> event = new ObjectEvent<>();
event.set(subscribeStreamResponse);

streamVerifier.onEvent(event, 0, false);

verify(serviceStatus, timeout(testTimeout).times(1)).stopRunning(any());
verify(subscriptionHandler, timeout(testTimeout).times(1)).unsubscribe(any());
verify(notifier, timeout(testTimeout).times(1)).notifyUnrecoverableError();
}

@Test
public void testSubscribeStreamResponseTypeStatus() {
when(blockNodeContext.metricsService()).thenReturn(metricsService);
when(serviceStatus.isRunning()).thenReturn(true);

final var streamVerifier =
new StreamVerifierImpl(
subscriptionHandler,
blockWriter,
notifier,
blockNodeContext,
serviceStatus);

final SubscribeStreamResponse subscribeStreamResponse =
spy(SubscribeStreamResponse.newBuilder().status(READ_STREAM_SUCCESS).build());
final ObjectEvent<SubscribeStreamResponse> event = new ObjectEvent<>();
event.set(subscribeStreamResponse);

streamVerifier.onEvent(event, 0, false);

verify(serviceStatus, never()).stopRunning(any());
verify(subscriptionHandler, never()).unsubscribe(any());
verify(notifier, never()).notifyUnrecoverableError();
}
}

0 comments on commit c5dfab0

Please sign in to comment.