Skip to content

Commit

Permalink
add watch test with restart delay
Browse files Browse the repository at this point in the history
Signed-off-by: Luca Burgazzoli <[email protected]>
  • Loading branch information
lburgazzoli committed Aug 28, 2023
1 parent aa9e8a0 commit 73b90b0
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 13 deletions.
5 changes: 4 additions & 1 deletion jetcd-core/src/test/java/io/etcd/jetcd/impl/KVTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,10 @@ public void testKVClientCanRetryPutOnEtcdRestart() throws InterruptedException {
});

// restart the cluster while uploading
executor.schedule(() -> cluster.restart(), 100, TimeUnit.MILLISECONDS);
executor.schedule(
() -> cluster.restart(0, TimeUnit.MILLISECONDS),
100,
TimeUnit.MILLISECONDS);

executor.shutdown();
assertThat(executor.awaitTermination(30, TimeUnit.SECONDS)).isTrue();
Expand Down
18 changes: 10 additions & 8 deletions jetcd-core/src/test/java/io/etcd/jetcd/impl/WatchResumeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
Expand All @@ -32,19 +28,25 @@
import io.etcd.jetcd.watch.WatchEvent.EventType;
import io.etcd.jetcd.watch.WatchResponse;

import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

@Timeout(value = 30, unit = TimeUnit.SECONDS)
@Timeout(value = 180, unit = TimeUnit.SECONDS)
public class WatchResumeTest {

@RegisterExtension
public static final EtcdClusterExtension cluster = EtcdClusterExtension.builder()
.withNodes(3)
.build();

@Test
public void testWatchOnPut() throws Exception {
@ParameterizedTest
@ValueSource(ints = { 0, 10, 30, 50, 60 })
public void testWatchOnPut(int delaySec) throws Exception {
try (Client client = TestUtil.client(cluster).build()) {
Watch watchClient = client.getWatchClient();
KV kvClient = client.getKVClient();
Expand All @@ -54,7 +56,7 @@ public void testWatchOnPut() throws Exception {
final AtomicReference<WatchResponse> ref = new AtomicReference<>();

try (Watcher watcher = watchClient.watch(key, ref::set)) {
cluster.restart();
cluster.restart(delaySec, TimeUnit.SECONDS);

kvClient.put(key, value).get();

Expand Down
2 changes: 1 addition & 1 deletion jetcd-core/src/test/java/io/etcd/jetcd/impl/WatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public void testWatchFutureRevisionIsNotOverwrittenOnCreation(final Client clien

try (Watcher watcher = client.getWatchClient().watch(key, watchOption, events::add)) {

cluster.restart(); // resumes (recreates) the watch
cluster.restart(0, TimeUnit.MILLISECONDS); // resumes (recreates) the watch

Thread.sleep(2000); // await().duration() would be better but it's broken
assertThat(events.isEmpty()).as("verify that received events list is empty").isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@

import java.net.URI;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.testcontainers.lifecycle.Startable;

public interface EtcdCluster extends Startable {

default void restart() {
default void restart(long delay, TimeUnit unit) throws InterruptedException {
stop();

if (delay > 0) {
unit.sleep(delay);
}

start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.jupiter.api.extension.AfterAllCallback;
Expand Down Expand Up @@ -52,8 +53,12 @@ public EtcdCluster cluster() {
return this.cluster;
}

public void restart() {
this.cluster.restart();
public void restart(long delay, TimeUnit unit) {
try {
this.cluster.restart(delay, unit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

public String clusterName() {
Expand Down

0 comments on commit 73b90b0

Please sign in to comment.