Skip to content

Commit

Permalink
synchronize threads in unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
edman124 committed Sep 21, 2023
1 parent e194aeb commit 9ecb9c0
Showing 1 changed file with 28 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2764,6 +2764,7 @@ public void run() {}
}

private static class MockActiveWork extends StreamingDataflowWorker.Work {
// exit must be volatile so changes to it are reflected in the run function
public static volatile boolean exit;

public MockActiveWork(long workToken) {
Expand Down Expand Up @@ -2945,13 +2946,13 @@ public void run() {
@Test
public void testActiveThreadMetric() throws Exception {
int maxThreads = 5;
int threadExpiration = 60;
int threadExpirationSec = 60;
// setting up actual implementation of executor instead of mocking to keep track of
// active thread count.
BoundedQueueExecutor executor =
new BoundedQueueExecutor(
maxThreads,
threadExpiration,
threadExpirationSec,
TimeUnit.SECONDS,
maxThreads,
10000000,
Expand All @@ -2975,6 +2976,9 @@ public void testActiveThreadMetric() throws Exception {
new MockActiveWork(1) {
@Override
public void run() {
synchronized(this) {
this.notify();
}
int count = 0;
while (!exit) {
count += 1;
Expand All @@ -2988,6 +2992,9 @@ public void run() {
new MockWork(2) {
@Override
public void run() {
synchronized(this) {
this.notify();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Expand All @@ -3001,6 +3008,9 @@ public void run() {
new MockWork(3) {
@Override
public void run() {
synchronized(this) {
this.notify();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Expand All @@ -3011,15 +3021,25 @@ public void run() {
assertEquals(0, executor.activeCount());

assertTrue(computationState.activateWork(key1Shard1, m1));
executor.execute(m1, m1.getWorkItem().getSerializedSize());
Thread.sleep(1000);
synchronized (m1) {
executor.execute(m1, m1.getWorkItem().getSerializedSize());
LOG.info("[chengedward] waiting on thread start m1]");
m1.wait();
}
LOG.info("[chengedward] thread started m1]");
assertEquals(2, executor.activeCount());

assertTrue(computationState.activateWork(key1Shard1, m2));
assertTrue(computationState.activateWork(key1Shard1, m3));
executor.execute(m2, m2.getWorkItem().getSerializedSize());
executor.execute(m3, m3.getWorkItem().getSerializedSize());
Thread.sleep(1000);
synchronized (m2) {
executor.execute(m2, m2.getWorkItem().getSerializedSize());
m2.wait();
}
synchronized (m3) {
executor.execute(m3, m3.getWorkItem().getSerializedSize());
m3.wait();
}
// this.wait();
assertEquals(4, executor.activeCount());

m1.stop();
Expand Down

0 comments on commit 9ecb9c0

Please sign in to comment.