diff --git a/server/src/main/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserver.java b/server/src/main/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserver.java index 81ad9d81..ccdbe354 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserver.java +++ b/server/src/main/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserver.java @@ -79,12 +79,12 @@ public class ConsumerStreamResponseObserver * SubscribeStreamResponse events from the Disruptor and passing them to the downstream consumer * via the subscribeStreamResponseObserver. * - * @param blockNodeContext contains the context with metrics and configuration for the - * application * @param producerLivenessClock the clock to use to determine the producer liveness * @param subscriptionHandler the subscription handler to use to manage the subscription * lifecycle * @param subscribeStreamResponseObserver the observer to use to send responses to the consumer + * @param blockNodeContext contains the context with metrics and configuration for the + * application */ public ConsumerStreamResponseObserver( @NonNull final InstantSource producerLivenessClock, diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediator.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediator.java index 6ae13438..716afab3 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediator.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediator.java @@ -20,5 +20,9 @@ import com.hedera.hapi.block.SubscribeStreamResponse; import com.hedera.hapi.block.stream.BlockItem; +/** + * Use this interface to combine the contract for mediating the live stream of blocks from the + * Hedera network with the contract to be notified of critical system events. + */ public interface LiveStreamMediator extends StreamMediator, Notifiable {} diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index 6f0ff931..59a4e088 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -50,15 +50,17 @@ class LiveStreamMediatorImpl extends SubscriptionHandlerBase { */ boolean isSubscribed(@NonNull final BlockNodeEventHandler> handler); + /** Unsubscribes all the expired handlers from the stream of events. */ void unsubscribeAllExpired(); } diff --git a/server/src/main/java/com/hedera/block/server/mediator/SubscriptionHandlerBase.java b/server/src/main/java/com/hedera/block/server/mediator/SubscriptionHandlerBase.java index 9999688e..e2849fa1 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/SubscriptionHandlerBase.java +++ b/server/src/main/java/com/hedera/block/server/mediator/SubscriptionHandlerBase.java @@ -31,6 +31,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +/** + * Inherit from this class to leverage RingBuffer subscription handling. + * + *

Subclasses may use the ringBuffer to publish events to the subscribers. + * + * @param the type of the subscription events + */ public abstract class SubscriptionHandlerBase implements SubscriptionHandler { private final System.Logger LOGGER = System.getLogger(getClass().getName()); @@ -42,7 +49,18 @@ public abstract class SubscriptionHandlerBase implements SubscriptionHandler< protected final RingBuffer> ringBuffer; private final ExecutorService executor; - public SubscriptionHandlerBase( + /** + * Constructs an abstract SubscriptionHandler instance with the given subscribers, block writer, + * and service status. Users of this constructor should take care to supply a thread-safe map + * implementation for the subscribers to handle the dynamic addition and removal of subscribers + * at runtime. + * + * @param subscribers the map of subscribers to batch event processors. It's recommended the map + * implementation is thread-safe + * @param subscriptionGauge the gauge to keep track of the number of subscribers + * @param ringBufferSize the size of the ring buffer + */ + protected SubscriptionHandlerBase( @NonNull final Map< BlockNodeEventHandler>, @@ -61,6 +79,11 @@ public SubscriptionHandlerBase( this.executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); } + /** + * Subscribes the given handler to the stream of events. + * + * @param handler the handler to subscribe + */ @Override public void subscribe(@NonNull final BlockNodeEventHandler> handler) { @@ -81,6 +104,11 @@ public void subscribe(@NonNull final BlockNodeEventHandler> handl subscriptionGauge.set(subscribers.size() - 1); } + /** + * Unsubscribes the given handler from the stream of events. + * + * @param handler the handler to unsubscribe + */ @Override public void unsubscribe(@NonNull final BlockNodeEventHandler> handler) { @@ -104,11 +132,18 @@ public void unsubscribe(@NonNull final BlockNodeEventHandler> han subscriptionGauge.set(subscribers.size() - 1); } + /** + * Checks if the given handler is subscribed to the stream of events. + * + * @param handler the handler to check + * @return true if the handler is subscribed, false otherwise + */ @Override public boolean isSubscribed(@NonNull BlockNodeEventHandler> handler) { return subscribers.containsKey(handler); } + /** Unsubscribes all the expired handlers from the stream of events. */ @Override public void unsubscribeAllExpired() { subscribers.keySet().stream() diff --git a/server/src/main/java/com/hedera/block/server/verifier/StreamVerifierImpl.java b/server/src/main/java/com/hedera/block/server/verifier/StreamVerifierImpl.java index a2d2157d..66595277 100644 --- a/server/src/main/java/com/hedera/block/server/verifier/StreamVerifierImpl.java +++ b/server/src/main/java/com/hedera/block/server/verifier/StreamVerifierImpl.java @@ -50,7 +50,7 @@ public class StreamVerifierImpl private static final String PROTOCOL_VIOLATION_MESSAGE = "Protocol Violation. %s is OneOf type %s but %s is null.\n%s"; - public StreamVerifierImpl( + StreamVerifierImpl( @NonNull final SubscriptionHandler subscriptionHandler, @NonNull final BlockWriter blockWriter, @NonNull final Notifier notifier, diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java index a496d160..bebf5f4f 100644 --- a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -39,7 +39,7 @@ import com.hedera.block.server.service.ServiceStatus; import com.hedera.block.server.service.ServiceStatusImpl; import com.hedera.block.server.util.TestConfigUtil; -import com.hedera.block.server.verifier.StreamVerifierImpl; +import com.hedera.block.server.verifier.StreamVerifierBuilder; import com.hedera.hapi.block.SubscribeStreamResponse; import com.hedera.hapi.block.SubscribeStreamResponseCode; import com.hedera.hapi.block.stream.BlockItem; @@ -157,8 +157,10 @@ public void testMediatorPersistenceWithoutSubscribers() throws IOException { // register the stream validator when(blockWriter.write(blockItem)).thenReturn(Optional.empty()); final var streamValidator = - new StreamVerifierImpl( - streamMediator, blockWriter, notifier, blockNodeContext, serviceStatus); + StreamVerifierBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus) + .notifier(notifier) + .subscriptionHandler(streamMediator) + .build(); streamMediator.subscribe(streamValidator); // Acting as a producer, notify the mediator of a new block @@ -217,8 +219,10 @@ public void testMediatorPublishEventToSubscribers() throws IOException { // register the stream validator when(blockWriter.write(blockItem)).thenReturn(Optional.empty()); final var streamValidator = - new StreamVerifierImpl( - streamMediator, blockWriter, notifier, blockNodeContext, serviceStatus); + StreamVerifierBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus) + .notifier(notifier) + .subscriptionHandler(streamMediator) + .build(); streamMediator.subscribe(streamValidator); // Acting as a producer, notify the mediator of a new block @@ -288,8 +292,10 @@ public void testOnCancelSubscriptionHandling() throws IOException { // register the stream validator when(blockWriter.write(blockItems.getFirst())).thenReturn(Optional.empty()); final var streamValidator = - new StreamVerifierImpl( - streamMediator, blockWriter, notifier, blockNodeContext, serviceStatus); + StreamVerifierBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus) + .notifier(notifier) + .subscriptionHandler(streamMediator) + .build(); streamMediator.subscribe(streamValidator); // register the test observer @@ -338,8 +344,10 @@ public void testOnCloseSubscriptionHandling() throws IOException { // register the stream validator when(blockWriter.write(blockItems.getFirst())).thenReturn(Optional.empty()); final var streamValidator = - new StreamVerifierImpl( - streamMediator, blockWriter, notifier, blockNodeContext, serviceStatus); + StreamVerifierBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus) + .notifier(notifier) + .subscriptionHandler(streamMediator) + .build(); streamMediator.subscribe(streamValidator); final var testConsumerBlockItemObserver = @@ -399,8 +407,10 @@ public void testMediatorBlocksPublishAfterException() throws IOException, Interr final Notifier notifier = NotifierBuilder.newBuilder(streamMediator, blockNodeContext, serviceStatus).build(); final var streamValidator = - new StreamVerifierImpl( - streamMediator, blockWriter, notifier, blockNodeContext, serviceStatus); + StreamVerifierBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus) + .notifier(notifier) + .subscriptionHandler(streamMediator) + .build(); // Set up the stream verifier streamMediator.subscribe(streamValidator); @@ -460,8 +470,10 @@ public void testUnsubscribeWhenNotSubscribed() throws IOException { // register the stream validator final var streamValidator = - new StreamVerifierImpl( - streamMediator, blockWriter, notifier, blockNodeContext, serviceStatus); + StreamVerifierBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus) + .notifier(notifier) + .subscriptionHandler(streamMediator) + .build(); streamMediator.subscribe(streamValidator); final var testConsumerBlockItemObserver =