Skip to content

Commit

Permalink
Merge pull request #127 from IOT-DSA/waitForStream-fix
Browse files Browse the repository at this point in the history
0.23.1
  • Loading branch information
d-shapiro authored Nov 11, 2019
2 parents 9414a9f + 3acb9a3 commit 6d83970
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 9 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ subprojects {
apply plugin: 'java-library'
apply plugin: 'maven'

version = '0.23.0'
version = '0.23.1'
sourceCompatibility = 1.7
targetCompatibility = 1.7

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class Table {
private Responder responder;
private Handler<Void> closeHandler;
private Object streamMutex;
private boolean closed = false;

/**
* Adds a column to the table.
Expand Down Expand Up @@ -192,6 +193,19 @@ public synchronized void setStreaming(int rid,
public boolean waitForStream(long millis) {
return waitForStream(millis, false);
}

/**
* Returns as soon as stream is established or the wait has expired.
*
* @param millis How long to wait for a stream, &lt;= 0 means wait forever
* and is discouraged.
* @param throwException If true, and a stream is not acquired, this will
* throw an IllegalStateException.
* @return True if the stream is established.
*/
public boolean waitForStream(long millis, boolean throwException) {
return waitForStream(millis, throwException, false);
}

/**
* Returns as soon as stream is established or the wait has expired.
Expand All @@ -200,15 +214,19 @@ public boolean waitForStream(long millis) {
* and is discouraged.
* @param throwException If true, and a stream is not acquired, this will
* throw an IllegalStateException.
* @param checkIfClosed If true,and stream has already been closed, this will return false
* @return True if the stream is established.
*/
@SuppressFBWarnings("WA_NOT_IN_LOOP")
public boolean waitForStream(long millis, boolean throwException) {
public boolean waitForStream(long millis, boolean throwException, boolean checkIfClosed) {
Object mutex = null;
synchronized (this) {
if (writer != null) {
return true;
}
if (checkIfClosed && closed) {
return false;
}
if (streamMutex == null) {
streamMutex = new Object();
}
Expand Down Expand Up @@ -273,6 +291,7 @@ public synchronized void setClosed() {
this.closeHandler = null;
this.meta = null;
this.responder = null;
this.closed = true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.dsa.iot.dslink.node.value.ValueType;
import org.dsa.iot.dslink.provider.LoopProvider;
import org.dsa.iot.dslink.util.*;
import org.dsa.iot.dslink.util.Objects;
import org.dsa.iot.dslink.util.handler.CompleteHandler;
import org.dsa.iot.dslink.util.handler.Handler;
import org.dsa.iot.historian.database.Database;
Expand All @@ -21,10 +20,7 @@
import org.dsa.iot.historian.stats.interval.IntervalProcessor;
import org.dsa.iot.historian.stats.rollup.Rollup;
import org.dsa.iot.historian.utils.QueryData;
import org.dsa.iot.historian.utils.TimeParser;

import java.util.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;

/**
* @author Samuel Grenier
Expand Down Expand Up @@ -109,6 +105,8 @@ public void handle(Void ignored) {
}
}
});



query(from.getTimeInMillis(), to.getTimeInMillis(), rollup, parser,
new CompleteHandler<QueryData>() {
Expand Down Expand Up @@ -158,7 +156,6 @@ public void handle(QueryData event) {
});
}

@SuppressWarnings("UnusedParameters")
protected void query(long from,
long to,
Rollup.Type type,
Expand Down Expand Up @@ -204,8 +201,9 @@ protected void processQueryData(Table table,
// If we can't get a stream open to the requester, then there's a chance batch rows
// could eventually cause an out of memory situation. So fail the invocation after a
// minute of not getting a response.
table.waitForStream(60000, true);
table.addBatchRows(batch);
if (table.waitForStream(60000, true, true)) {
table.addBatchRows(batch);
}
}
}

Expand Down

0 comments on commit 6d83970

Please sign in to comment.