diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 357b884fbb18..b7f4c850bbb7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -2764,6 +2764,7 @@ public void run() {} } private static class MockActiveWork extends StreamingDataflowWorker.Work { + // exit must be volatile so changes to it are reflected in the run function public static volatile boolean exit; public MockActiveWork(long workToken) { @@ -2945,13 +2946,13 @@ public void run() { @Test public void testActiveThreadMetric() throws Exception { int maxThreads = 5; - int threadExpiration = 60; + int threadExpirationSec = 60; // setting up actual implementation of executor instead of mocking to keep track of // active thread count. BoundedQueueExecutor executor = new BoundedQueueExecutor( maxThreads, - threadExpiration, + threadExpirationSec, TimeUnit.SECONDS, maxThreads, 10000000, @@ -2975,6 +2976,9 @@ public void testActiveThreadMetric() throws Exception { new MockActiveWork(1) { @Override public void run() { + synchronized(this) { + this.notify(); + } int count = 0; while (!exit) { count += 1; @@ -2988,6 +2992,9 @@ public void run() { new MockWork(2) { @Override public void run() { + synchronized(this) { + this.notify(); + } try { Thread.sleep(2000); } catch (InterruptedException e) { @@ -3001,6 +3008,9 @@ public void run() { new MockWork(3) { @Override public void run() { + synchronized(this) { + this.notify(); + } try { Thread.sleep(2000); } catch (InterruptedException e) { @@ -3011,15 +3021,25 @@ public void run() { assertEquals(0, executor.activeCount()); assertTrue(computationState.activateWork(key1Shard1, m1)); - executor.execute(m1, m1.getWorkItem().getSerializedSize()); - Thread.sleep(1000); + synchronized (m1) { + executor.execute(m1, m1.getWorkItem().getSerializedSize()); + LOG.info("[chengedward] waiting on thread start m1]"); + m1.wait(); + } + LOG.info("[chengedward] thread started m1]"); assertEquals(2, executor.activeCount()); - + assertTrue(computationState.activateWork(key1Shard1, m2)); assertTrue(computationState.activateWork(key1Shard1, m3)); - executor.execute(m2, m2.getWorkItem().getSerializedSize()); - executor.execute(m3, m3.getWorkItem().getSerializedSize()); - Thread.sleep(1000); + synchronized (m2) { + executor.execute(m2, m2.getWorkItem().getSerializedSize()); + m2.wait(); + } + synchronized (m3) { + executor.execute(m3, m3.getWorkItem().getSerializedSize()); + m3.wait(); + } + // this.wait(); assertEquals(4, executor.activeCount()); m1.stop();