Skip to content

Commit

Permalink
fix: fixing threading issue with unsubscribe
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Aug 1, 2024
1 parent e595712 commit 2febc51
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,16 @@ public ConsumerBlockItemObserver(
public void onEvent(
final ObjectEvent<SubscribeStreamResponse> event, final long l, final boolean b) {

final long currentMillis = producerLivenessClock.millis();
if (currentMillis - producerLivenessMillis > timeoutThresholdMillis) {
streamMediator.unsubscribe(this);
LOGGER.log(
System.Logger.Level.DEBUG,
"Unsubscribed ConsumerBlockItemObserver due to producer timeout");
} else {

// Only send the response if the consumer has not cancelled
// or closed the stream.
if (isResponsePermitted.get()) {

// Only send the response if the consumer has not cancelled
// or closed the stream.
if (isResponsePermitted.get()) {
final long currentMillis = producerLivenessClock.millis();
if (currentMillis - producerLivenessMillis > timeoutThresholdMillis) {
streamMediator.unsubscribe(this);
LOGGER.log(
System.Logger.Level.DEBUG,
"Unsubscribed ConsumerBlockItemObserver due to producer timeout");
} else {
// Refresh the producer liveness and pass the BlockItem to the downstream observer.
producerLivenessMillis = currentMillis;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,19 @@ public void subscribe(final EventHandler<ObjectEvent<SubscribeStreamResponse>> h
public void unsubscribe(final EventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {

// Remove the subscriber
final var batchEventProcessor = subscribers.remove(handler);
if (subscribers.containsKey(handler)) {

// Stop the processor
batchEventProcessor.halt();
final var batchEventProcessor = subscribers.remove(handler);

// Remove the gating sequence from the ring buffer
ringBuffer.removeGatingSequence(batchEventProcessor.getSequence());
// Stop the processor
batchEventProcessor.halt();

// Remove the gating sequence from the ring buffer
ringBuffer.removeGatingSequence(batchEventProcessor.getSequence());

} else {
LOGGER.log(System.Logger.Level.ERROR, "Subscriber not found: {0}", handler);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public class BlockStreamServiceIT {
private Path testPath;
private Config testConfig;

private static final int testTimeout = 100;

@BeforeEach
public void setUp() throws IOException {
testPath = Files.createTempDirectory(TEMP_DIR);
Expand All @@ -103,11 +105,11 @@ public void tearDown() {

@Test
public void testPublishBlockStreamRegistrationAndExecution()
throws InterruptedException, IOException, NoSuchAlgorithmException {
throws IOException, NoSuchAlgorithmException {

final BlockStreamService blockStreamService =
new BlockStreamService(
50L,
1500L,
new ItemAckBuilder(),
streamMediator,
blockPersistenceHandler,
Expand All @@ -131,21 +133,22 @@ public void testPublishBlockStreamRegistrationAndExecution()
PublishStreamResponse.newBuilder().setAcknowledgement(itemAck).build();

// Verify the BlockItem message is sent to the mediator
verify(streamMediator, timeout(50).times(1)).publishEvent(blockItem);
verify(streamMediator, timeout(testTimeout).times(1)).publishEvent(blockItem);

// Verify our custom StreamObserver implementation builds and sends
// a response back to the producer
verify(publishStreamResponseObserver, timeout(50).times(1)).onNext(publishStreamResponse);
verify(publishStreamResponseObserver, timeout(testTimeout).times(1))
.onNext(publishStreamResponse);

// Close the stream as Helidon does
streamObserver.onCompleted();

// verify the onCompleted() method is invoked on the wrapped StreamObserver
verify(publishStreamResponseObserver, timeout(50).times(1)).onCompleted();
verify(publishStreamResponseObserver, timeout(testTimeout).times(1)).onCompleted();
}

@Test
public void testSubscribeBlockStream() throws InterruptedException {
public void testSubscribeBlockStream() {

final ServiceStatus serviceStatus = new ServiceStatusImpl();
serviceStatus.setWebServer(webServer);
Expand Down Expand Up @@ -185,13 +188,16 @@ public void testSubscribeBlockStream() throws InterruptedException {
final SubscribeStreamResponse subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().setBlockItem(blockItems.getFirst()).build();

verify(subscribeStreamObserver1, timeout(50).times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver2, timeout(50).times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver3, timeout(50).times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver1, timeout(testTimeout).times(1))
.onNext(subscribeStreamResponse);
verify(subscribeStreamObserver2, timeout(testTimeout).times(1))
.onNext(subscribeStreamResponse);
verify(subscribeStreamObserver3, timeout(testTimeout).times(1))
.onNext(subscribeStreamResponse);
}

@Test
public void testFullHappyPath() throws IOException, InterruptedException {
public void testFullHappyPath() throws IOException {
int numberOfBlocks = 100;

final BlockStreamService blockStreamService = buildBlockStreamService();
Expand Down Expand Up @@ -226,7 +232,7 @@ public void testFullHappyPath() throws IOException, InterruptedException {
}

@Test
public void testFullWithSubscribersAddedDynamically() throws IOException, InterruptedException {
public void testFullWithSubscribersAddedDynamically() throws IOException {

int numberOfBlocks = 100;

Expand Down Expand Up @@ -296,7 +302,7 @@ public void testFullWithSubscribersAddedDynamically() throws IOException, Interr
}

@Test
public void testSubAndUnsubWhileStreaming() throws IOException, InterruptedException {
public void testSubAndUnsubWhileStreaming() throws IOException {

int numberOfBlocks = 100;

Expand Down Expand Up @@ -384,8 +390,7 @@ public void testSubAndUnsubWhileStreaming() throws IOException, InterruptedExcep
}

@Test
public void testMediatorExceptionHandlingWhenPersistenceFailure()
throws IOException, InterruptedException {
public void testMediatorExceptionHandlingWhenPersistenceFailure() throws IOException {
final Map<
EventHandler<ObjectEvent<SubscribeStreamResponse>>,
BatchEventProcessor<ObjectEvent<SubscribeStreamResponse>>>
Expand Down Expand Up @@ -444,9 +449,12 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
// before the IOException was thrown.
final SubscribeStreamResponse subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().setBlockItem(blockItems.getFirst()).build();
verify(subscribeStreamObserver1, timeout(50).times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver2, timeout(50).times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver3, timeout(50).times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver1, timeout(testTimeout).times(1))
.onNext(subscribeStreamResponse);
verify(subscribeStreamObserver2, timeout(testTimeout).times(1))
.onNext(subscribeStreamResponse);
verify(subscribeStreamObserver3, timeout(testTimeout).times(1))
.onNext(subscribeStreamResponse);

// Verify all the consumers received the end of stream response
// TODO: Fix the response code when it's available
Expand All @@ -456,9 +464,9 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
SubscribeStreamResponse.SubscribeStreamResponseCode
.READ_STREAM_SUCCESS)
.build();
verify(subscribeStreamObserver1, timeout(50).times(1)).onNext(endStreamResponse);
verify(subscribeStreamObserver2, timeout(50).times(1)).onNext(endStreamResponse);
verify(subscribeStreamObserver3, timeout(50).times(1)).onNext(endStreamResponse);
verify(subscribeStreamObserver1, timeout(testTimeout).times(1)).onNext(endStreamResponse);
verify(subscribeStreamObserver2, timeout(testTimeout).times(1)).onNext(endStreamResponse);
verify(subscribeStreamObserver3, timeout(testTimeout).times(1)).onNext(endStreamResponse);

// Verify all the consumers were unsubscribed
for (final var s : subscribers.keySet()) {
Expand All @@ -473,8 +481,9 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
.build();
final var endOfStreamResponse =
PublishStreamResponse.newBuilder().setStatus(endOfStream).build();
verify(publishStreamResponseObserver, timeout(50).times(2)).onNext(endOfStreamResponse);
verify(webServer, timeout(50).times(1)).stop();
verify(publishStreamResponseObserver, timeout(testTimeout).times(2))
.onNext(endOfStreamResponse);
verify(webServer, timeout(testTimeout).times(1)).stop();

// Now verify the block was removed from the file system.
final BlockReader<Block> blockReader = new BlockAsDirReader(JUNIT, testConfig);
Expand All @@ -489,7 +498,7 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
SingleBlockResponse.SingleBlockResponseCode
.READ_BLOCK_NOT_AVAILABLE)
.build();
verify(singleBlockResponseStreamObserver, timeout(50).times(1))
verify(singleBlockResponseStreamObserver, timeout(testTimeout).times(1))
.onNext(expectedSingleBlockNotAvailable);

// TODO: Fix the response code when it's available
Expand All @@ -499,7 +508,7 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
SubscribeStreamResponse.SubscribeStreamResponseCode
.READ_STREAM_SUCCESS)
.build();
verify(subscribeStreamObserver4, timeout(50).times(1))
verify(subscribeStreamObserver4, timeout(testTimeout).times(1))
.onNext(expectedSubscriberStreamNotAvailable);
}

Expand Down Expand Up @@ -535,9 +544,9 @@ private static void verifySubscribeStreamResponse(
final SubscribeStreamResponse stateProofStreamResponse =
buildSubscribeStreamResponse(stateProofBlockItem);

verify(streamObserver, timeout(50).times(1)).onNext(headerSubStreamResponse);
verify(streamObserver, timeout(50).times(8)).onNext(bodySubStreamResponse);
verify(streamObserver, timeout(50).times(1)).onNext(stateProofStreamResponse);
verify(streamObserver, timeout(testTimeout).times(1)).onNext(headerSubStreamResponse);
verify(streamObserver, timeout(testTimeout).times(8)).onNext(bodySubStreamResponse);
verify(streamObserver, timeout(testTimeout).times(1)).onNext(stateProofStreamResponse);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public class LiveStreamMediatorImplTest {
private final long TEST_TIME = 1_719_427_664_950L;

@Test
public void testUnsubscribeEach() {
// @Disabled
public void testUnsubscribeEach() throws InterruptedException {

final var streamMediator =
new LiveStreamMediatorImpl(
Expand All @@ -81,6 +82,8 @@ public void testUnsubscribeEach() {
streamMediator.isSubscribed(observer3),
"Expected the mediator to have observer3 subscribed");

Thread.sleep(50L);

streamMediator.unsubscribe(observer1);
assertFalse(
streamMediator.isSubscribed(observer1),
Expand Down Expand Up @@ -168,6 +171,7 @@ public void testMediatorPublishEventToSubscribers() throws IOException, Interrup
}

@Test
// @Disabled
public void testSubAndUnsubHandling() {
final var streamMediator =
new LiveStreamMediatorImpl(
Expand Down Expand Up @@ -220,6 +224,9 @@ public void testOnCancelSubscriptionHandling() throws IOException {
final List<BlockItem> blockItems = generateBlockItems(1);
streamMediator.publishEvent(blockItems.getFirst());

// Verify the event made it to the consumer
verify(serverCallStreamObserver, timeout(50).times(1)).setOnCancelHandler(any());

// Simulate the consumer cancelling the stream
testConsumerBlockItemObserver.getOnCancel().run();

Expand Down Expand Up @@ -251,6 +258,9 @@ public void testOnCloseSubscriptionHandling() throws IOException {
final List<BlockItem> blockItems = generateBlockItems(1);
streamMediator.publishEvent(blockItems.getFirst());

// Verify the event made it to the consumer
verify(serverCallStreamObserver, timeout(50).times(1)).setOnCancelHandler(any());

// Simulate the consumer completing the stream
testConsumerBlockItemObserver.getOnClose().run();

Expand Down

0 comments on commit 2febc51

Please sign in to comment.