Skip to content

Commit

Permalink
fix: reduced access to builder only. javadoc.
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Sep 12, 2024
1 parent c5dfab0 commit 7d6f0b4
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ public class ConsumerStreamResponseObserver
* SubscribeStreamResponse events from the Disruptor and passing them to the downstream consumer
* via the subscribeStreamResponseObserver.
*
* @param blockNodeContext contains the context with metrics and configuration for the
* application
* @param producerLivenessClock the clock to use to determine the producer liveness
* @param subscriptionHandler the subscription handler to use to manage the subscription
* lifecycle
* @param subscribeStreamResponseObserver the observer to use to send responses to the consumer
* @param blockNodeContext contains the context with metrics and configuration for the
* application
*/
public ConsumerStreamResponseObserver(
@NonNull final InstantSource producerLivenessClock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,9 @@
import com.hedera.hapi.block.SubscribeStreamResponse;
import com.hedera.hapi.block.stream.BlockItem;

/**
* Use this interface to combine the contract for mediating the live stream of blocks from the
* Hedera network with the contract to be notified of critical system events.
*/
public interface LiveStreamMediator
extends StreamMediator<BlockItem, SubscribeStreamResponse>, Notifiable {}
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,17 @@ class LiveStreamMediatorImpl extends SubscriptionHandlerBase<SubscribeStreamResp
private final MetricsService metricsService;

/**
* Constructs a new LiveStreamMediatorImpl instance with the given subscribers, block writer,
* and service status. This constructor is primarily used for testing purposes. Users of this
* constructor should take care to supply a thread-safe map implementation for the subscribers
* to handle the dynamic addition and removal of subscribers at runtime.
* Constructs a new LiveStreamMediatorImpl instance with the given subscribers, and service
* status. This constructor is primarily used for testing purposes. Users of this constructor
* should take care to supply a thread-safe map implementation for the subscribers to handle the
* dynamic addition and removal of subscribers at runtime.
*
* @param subscribers the map of subscribers to batch event processors. It's recommended the map
* implementation is thread-safe
* @param serviceStatus the service status to stop the service and web server if an exception
* occurs while persisting a block item, stop the web server for maintenance, etc
* @param blockNodeContext contains the context with metrics and configuration for the
* application
*/
LiveStreamMediatorImpl(
@NonNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@ public interface SubscriptionHandler<V> {
*/
boolean isSubscribed(@NonNull final BlockNodeEventHandler<ObjectEvent<V>> handler);

/** Unsubscribes all the expired handlers from the stream of events. */
void unsubscribeAllExpired();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Inherit from this class to leverage RingBuffer subscription handling.
*
* <p>Subclasses may use the ringBuffer to publish events to the subscribers.
*
* @param <V> the type of the subscription events
*/
public abstract class SubscriptionHandlerBase<V> implements SubscriptionHandler<V> {

private final System.Logger LOGGER = System.getLogger(getClass().getName());
Expand All @@ -42,7 +49,18 @@ public abstract class SubscriptionHandlerBase<V> implements SubscriptionHandler<
protected final RingBuffer<ObjectEvent<V>> ringBuffer;
private final ExecutorService executor;

public SubscriptionHandlerBase(
/**
* Constructs an abstract SubscriptionHandler instance with the given subscribers, block writer,
* and service status. Users of this constructor should take care to supply a thread-safe map
* implementation for the subscribers to handle the dynamic addition and removal of subscribers
* at runtime.
*
* @param subscribers the map of subscribers to batch event processors. It's recommended the map
* implementation is thread-safe
* @param subscriptionGauge the gauge to keep track of the number of subscribers
* @param ringBufferSize the size of the ring buffer
*/
protected SubscriptionHandlerBase(
@NonNull
final Map<
BlockNodeEventHandler<ObjectEvent<V>>,
Expand All @@ -61,6 +79,11 @@ public SubscriptionHandlerBase(
this.executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE);
}

/**
* Subscribes the given handler to the stream of events.
*
* @param handler the handler to subscribe
*/
@Override
public void subscribe(@NonNull final BlockNodeEventHandler<ObjectEvent<V>> handler) {

Expand All @@ -81,6 +104,11 @@ public void subscribe(@NonNull final BlockNodeEventHandler<ObjectEvent<V>> handl
subscriptionGauge.set(subscribers.size() - 1);
}

/**
* Unsubscribes the given handler from the stream of events.
*
* @param handler the handler to unsubscribe
*/
@Override
public void unsubscribe(@NonNull final BlockNodeEventHandler<ObjectEvent<V>> handler) {

Expand All @@ -104,11 +132,18 @@ public void unsubscribe(@NonNull final BlockNodeEventHandler<ObjectEvent<V>> han
subscriptionGauge.set(subscribers.size() - 1);
}

/**
* Checks if the given handler is subscribed to the stream of events.
*
* @param handler the handler to check
* @return true if the handler is subscribed, false otherwise
*/
@Override
public boolean isSubscribed(@NonNull BlockNodeEventHandler<ObjectEvent<V>> handler) {
return subscribers.containsKey(handler);
}

/** Unsubscribes all the expired handlers from the stream of events. */
@Override
public void unsubscribeAllExpired() {
subscribers.keySet().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class StreamVerifierImpl
private static final String PROTOCOL_VIOLATION_MESSAGE =
"Protocol Violation. %s is OneOf type %s but %s is null.\n%s";

public StreamVerifierImpl(
StreamVerifierImpl(
@NonNull final SubscriptionHandler<SubscribeStreamResponse> subscriptionHandler,
@NonNull final BlockWriter<BlockItem> blockWriter,
@NonNull final Notifier notifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.block.server.service.ServiceStatusImpl;
import com.hedera.block.server.util.TestConfigUtil;
import com.hedera.block.server.verifier.StreamVerifierImpl;
import com.hedera.block.server.verifier.StreamVerifierBuilder;
import com.hedera.hapi.block.SubscribeStreamResponse;
import com.hedera.hapi.block.SubscribeStreamResponseCode;
import com.hedera.hapi.block.stream.BlockItem;
Expand Down Expand Up @@ -157,8 +157,10 @@ public void testMediatorPersistenceWithoutSubscribers() throws IOException {
// register the stream validator
when(blockWriter.write(blockItem)).thenReturn(Optional.empty());
final var streamValidator =
new StreamVerifierImpl(
streamMediator, blockWriter, notifier, blockNodeContext, serviceStatus);
StreamVerifierBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus)
.notifier(notifier)
.subscriptionHandler(streamMediator)
.build();
streamMediator.subscribe(streamValidator);

// Acting as a producer, notify the mediator of a new block
Expand Down Expand Up @@ -217,8 +219,10 @@ public void testMediatorPublishEventToSubscribers() throws IOException {
// register the stream validator
when(blockWriter.write(blockItem)).thenReturn(Optional.empty());
final var streamValidator =
new StreamVerifierImpl(
streamMediator, blockWriter, notifier, blockNodeContext, serviceStatus);
StreamVerifierBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus)
.notifier(notifier)
.subscriptionHandler(streamMediator)
.build();
streamMediator.subscribe(streamValidator);

// Acting as a producer, notify the mediator of a new block
Expand Down Expand Up @@ -288,8 +292,10 @@ public void testOnCancelSubscriptionHandling() throws IOException {
// register the stream validator
when(blockWriter.write(blockItems.getFirst())).thenReturn(Optional.empty());
final var streamValidator =
new StreamVerifierImpl(
streamMediator, blockWriter, notifier, blockNodeContext, serviceStatus);
StreamVerifierBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus)
.notifier(notifier)
.subscriptionHandler(streamMediator)
.build();
streamMediator.subscribe(streamValidator);

// register the test observer
Expand Down Expand Up @@ -338,8 +344,10 @@ public void testOnCloseSubscriptionHandling() throws IOException {
// register the stream validator
when(blockWriter.write(blockItems.getFirst())).thenReturn(Optional.empty());
final var streamValidator =
new StreamVerifierImpl(
streamMediator, blockWriter, notifier, blockNodeContext, serviceStatus);
StreamVerifierBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus)
.notifier(notifier)
.subscriptionHandler(streamMediator)
.build();
streamMediator.subscribe(streamValidator);

final var testConsumerBlockItemObserver =
Expand Down Expand Up @@ -399,8 +407,10 @@ public void testMediatorBlocksPublishAfterException() throws IOException, Interr
final Notifier notifier =
NotifierBuilder.newBuilder(streamMediator, blockNodeContext, serviceStatus).build();
final var streamValidator =
new StreamVerifierImpl(
streamMediator, blockWriter, notifier, blockNodeContext, serviceStatus);
StreamVerifierBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus)
.notifier(notifier)
.subscriptionHandler(streamMediator)
.build();

// Set up the stream verifier
streamMediator.subscribe(streamValidator);
Expand Down Expand Up @@ -460,8 +470,10 @@ public void testUnsubscribeWhenNotSubscribed() throws IOException {

// register the stream validator
final var streamValidator =
new StreamVerifierImpl(
streamMediator, blockWriter, notifier, blockNodeContext, serviceStatus);
StreamVerifierBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus)
.notifier(notifier)
.subscriptionHandler(streamMediator)
.build();
streamMediator.subscribe(streamValidator);

final var testConsumerBlockItemObserver =
Expand Down

0 comments on commit 7d6f0b4

Please sign in to comment.