Skip to content

Commit

Permalink
runtime launch and shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
xspanger3770 committed Oct 21, 2023
1 parent f94bb72 commit 544fada
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 87 deletions.
2 changes: 1 addition & 1 deletion GlobalQuakeServer/globalQuake.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#Fun fact: I've never felt an earthquake in my life
#Sat Oct 21 17:40:26 GMT 2023
#Sat Oct 21 20:00:44 GMT 2023
displayTime=true
oldEventsOpacity=100.0
stationIntensityVisibilityZoomLevel=0.2
Expand Down
32 changes: 28 additions & 4 deletions src/main/java/gqserver/core/GlobalQuakeRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ public class GlobalQuakeRuntime {
private long lastGC;
private long clusterAnalysisT;
private long lastQuakesT;
private ScheduledExecutorService execAnalysis;
private ScheduledExecutorService exec1Sec;
private ScheduledExecutorService execClusters;
private ScheduledExecutorService execQuake;

public void runThreads() {
ScheduledExecutorService execAnalysis = Executors
execAnalysis = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("Station Analysis Thread"));
ScheduledExecutorService exec1Sec = Executors
exec1Sec = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("1-Second Loop Thread"));
ScheduledExecutorService execClusters = Executors
execClusters = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("Cluster Analysis Thread"));
ScheduledExecutorService execQuake = Executors
execQuake = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("Hypocenter Location Thread"));

execAnalysis.scheduleAtFixedRate(() -> {
Expand Down Expand Up @@ -77,4 +81,24 @@ public void runThreads() {
}, 0, 1, TimeUnit.SECONDS);
}

public void stop() {

System.err.println("INT OTH");
execAnalysis.shutdownNow();
execQuake.shutdownNow();
execClusters.shutdownNow();
exec1Sec.shutdownNow();

try {
execAnalysis.awaitTermination(10, TimeUnit.SECONDS);
execQuake.awaitTermination(10, TimeUnit.SECONDS);
execClusters.awaitTermination(10, TimeUnit.SECONDS);
exec1Sec.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Logger.error(e);
}


System.err.println("INT OTH DONE");
}
}
16 changes: 9 additions & 7 deletions src/main/java/gqserver/core/GlobalQuakeServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,19 @@ public GlobalQuakeServer(StationDatabaseManager stationDatabaseManager) {
serverSocket = new GQServerSocket();
}

public void initStations(){
public void startRuntime(){
globalStationManager.initStations(stationDatabaseManager);
}

public GlobalQuakeServer runSeedlinkReader() {
getGlobalQuakeRuntime().runThreads();
seedlinkNetworksReader.run();
return this;
}

public void startRuntime(){
getGlobalQuakeRuntime().runThreads();
public void stopRuntime(){
getGlobalQuakeRuntime().stop();
getSeedlinkReader().stop();

getEarthquakeAnalysis().getEarthquakes().clear();
getClusterAnalysis().getClusters().clear();
getStationManager().getStations().clear();
}

public ClusterAnalysis getClusterAnalysis() {
Expand Down
178 changes: 103 additions & 75 deletions src/main/java/gqserver/core/SeedlinkNetworksReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class SeedlinkNetworksReader {

Expand All @@ -19,6 +23,8 @@ public class SeedlinkNetworksReader {

private long lastReceivedRecord;

private ExecutorService seedlinkReaderService;

public static void main(String[] args) throws Exception{
SeedlinkReader reader = new SeedlinkReader("rtserve.iris.washington.edu", 18000);
reader.select("AK", "D25K", "", "BHZ");
Expand All @@ -45,9 +51,11 @@ public static void main(String[] args) throws Exception{

public void run() {
createCache();
seedlinkReaderService = Executors.newCachedThreadPool();
GlobalQuakeServer.instance.getStationDatabaseManager().getStationDatabase().getDatabaseReadLock().lock();
try{
GlobalQuakeServer.instance.getStationDatabaseManager().getStationDatabase().getSeedlinkNetworks().forEach(this::runSeedlinkThread);
GlobalQuakeServer.instance.getStationDatabaseManager().getStationDatabase().getSeedlinkNetworks().forEach(
seedlinkServer -> seedlinkReaderService.submit(() -> runSeedlinkThread(seedlinkServer)));
} finally {
GlobalQuakeServer.instance.getStationDatabaseManager().getStationDatabase().getDatabaseReadLock().unlock();
}
Expand All @@ -63,87 +71,88 @@ private void createCache() {
}
}

private Queue<SeedlinkReader> activeReaders = new ConcurrentLinkedQueue<>();

private void runSeedlinkThread(SeedlinkNetwork seedlinkNetwork) {
Thread seedlinkThread = new Thread("Seedlink Network Thread - " + seedlinkNetwork.getHost()) {
@SuppressWarnings("BusyWait")
@Override
public void run() {
int reconnectDelay = RECONNECT_DELAY;

while (true) {
seedlinkNetwork.status = SeedlinkStatus.CONNECTING;
seedlinkNetwork.connectedStations = 0;

SeedlinkReader reader = null;
try {
Logger.info("Connecting to seedlink server \"" + seedlinkNetwork.getHost() + "\"");
reader = new SeedlinkReader(seedlinkNetwork.getHost(), seedlinkNetwork.getPort(), 90, false);
reader.sendHello();

reconnectDelay = RECONNECT_DELAY;
boolean first = true;

for (AbstractStation s : GlobalQuakeServer.instance.getStationManager().getStations()) {
if (s.getSeedlinkNetwork() != null && s.getSeedlinkNetwork().equals(seedlinkNetwork)) {
Logger.trace("Connecting to %s %s %s %s [%s]".formatted(s.getStationCode(), s.getNetworkCode(), s.getChannelName(), s.getLocationCode(), seedlinkNetwork.getName()));
if(!first) {
reader.sendCmd("DATA");
} else{
first = false;
}
reader.select(s.getNetworkCode(), s.getStationCode(), s.getLocationCode(),
s.getChannelName());
seedlinkNetwork.connectedStations++;
}
}

if(seedlinkNetwork.connectedStations == 0){
Logger.info("No stations connected to "+seedlinkNetwork.getName());
seedlinkNetwork.status = SeedlinkStatus.DISCONNECTED;
break;
}

reader.startData();
seedlinkNetwork.status = SeedlinkStatus.RUNNING;

while (reader.hasNext()) {
SeedlinkPacket slp = reader.readPacket();
try {
newPacket(slp.getMiniSeed());
} catch (Exception e) {
Logger.error(e);
}
}

reader.close();
} catch (Exception e) {
Logger.error(e);
if (reader != null) {
try {
reader.close();
} catch (Exception ex) {
Logger.error(ex);
}
}
int reconnectDelay = RECONNECT_DELAY;
seedlinkNetwork.status = SeedlinkStatus.CONNECTING;
seedlinkNetwork.connectedStations = 0;

SeedlinkReader reader = null;
try {
Logger.info("Connecting to seedlink server \"" + seedlinkNetwork.getHost() + "\"");
reader = new SeedlinkReader(seedlinkNetwork.getHost(), seedlinkNetwork.getPort(), 90, false);
activeReaders.add(reader);

reader.sendHello();

reconnectDelay = RECONNECT_DELAY;
boolean first = true;

for (AbstractStation s : GlobalQuakeServer.instance.getStationManager().getStations()) {
if (s.getSeedlinkNetwork() != null && s.getSeedlinkNetwork().equals(seedlinkNetwork)) {
Logger.trace("Connecting to %s %s %s %s [%s]".formatted(s.getStationCode(), s.getNetworkCode(), s.getChannelName(), s.getLocationCode(), seedlinkNetwork.getName()));
if(!first) {
reader.sendCmd("DATA");
} else{
first = false;
}
reader.select(s.getNetworkCode(), s.getStationCode(), s.getLocationCode(),
s.getChannelName());
seedlinkNetwork.connectedStations++;
}
}

seedlinkNetwork.status = SeedlinkStatus.DISCONNECTED;
seedlinkNetwork.connectedStations = 0;
Logger.warn(seedlinkNetwork.getHost() + " Disconnected, Reconnecting after " + reconnectDelay
+ " seconds...");
try {
sleep(reconnectDelay * 1000L);
if(reconnectDelay < 60 * 5) {
reconnectDelay *= 2;
}
} catch (InterruptedException ignored) {
if(seedlinkNetwork.connectedStations == 0){
Logger.info("No stations connected to "+seedlinkNetwork.getName());
seedlinkNetwork.status = SeedlinkStatus.DISCONNECTED;
return;
}

}
reader.startData();
seedlinkNetwork.status = SeedlinkStatus.RUNNING;

while (reader.hasNext()) {
SeedlinkPacket slp = reader.readPacket();
try {
newPacket(slp.getMiniSeed());
} catch (Exception e) {
Logger.error(e);
}
}
};

seedlinkThread.start();
reader.close();
} catch (Exception e) {
Logger.error(e);
if (reader != null) {
try {
reader.close();
} catch (Exception ex) {
Logger.error(ex);
}
}
}finally{
if(reader != null){
activeReaders.remove(reader);
}
}

seedlinkNetwork.status = SeedlinkStatus.DISCONNECTED;
seedlinkNetwork.connectedStations = 0;
Logger.warn(seedlinkNetwork.getHost() + " Disconnected, Reconnecting after " + reconnectDelay
+ " seconds...");

try {
Thread.sleep(reconnectDelay * 1000L);
if(reconnectDelay < 60 * 5) {
reconnectDelay *= 2;
}
} catch (InterruptedException ignored) {
Logger.warn("Thread interrupted, nothing will happen");
return;
}

seedlinkReaderService.submit(() -> runSeedlinkThread(seedlinkNetwork));
}

private void newPacket(DataRecord dr) {
Expand Down Expand Up @@ -171,4 +180,23 @@ public void logRecord(long time) {
}
}

public void stop() {
if(seedlinkReaderService != null) {
System.err.println("INT SDL");
seedlinkReaderService.shutdownNow();
for (Iterator<SeedlinkReader> iterator = activeReaders.iterator(); iterator.hasNext(); ) {
SeedlinkReader reader = iterator.next();
reader.close();
iterator.remove();
}
try {
seedlinkReaderService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Logger.error(e);
}

System.err.println("INT SDL DONE");
}
stationCache.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public void initStations(StationDatabaseManager databaseManager) {
return;
}
stations.clear();
nextID.set(0);
databaseManager.getStationDatabase().getDatabaseReadLock().lock();
try {
for (Network n : databaseManager.getStationDatabase().getNetworks()) {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/gqserver/ui/server/ServerStatusPanel.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,15 @@ public void actionPerformed(ActionEvent actionEvent) {
if(status == SocketStatus.IDLE){
try {
GlobalQuakeServer.instance.getServerSocket().run(addressField.getText(), Integer.parseInt(portField.getText()));
GlobalQuakeServer.instance.startRuntime();
} catch(Exception e){
Main.getErrorHandler().handleException(new RuntimeApplicationException("Failed to start server", e));
}
} else if(status == SocketStatus.RUNNING) {
if(confirm("Are you sure you want to close the server?")) {
try {
GlobalQuakeServer.instance.getServerSocket().stop();
GlobalQuakeServer.instance.stopRuntime();
} catch (IOException e) {
Main.getErrorHandler().handleException(new RuntimeApplicationException("Failed to stop server", e));
}
Expand Down

0 comments on commit 544fada

Please sign in to comment.