From fe5e63b0720e501b9165eb165be952afaebe0d0a Mon Sep 17 00:00:00 2001 From: nabon Date: Sun, 6 Sep 2020 21:00:38 +0900 Subject: [PATCH 01/14] Use System.nanoTime() instead of System.currentTimeMillis() for time calculation. --- src/main/java/mqttloader/Constants.java | 3 +++ src/main/java/mqttloader/Loader.java | 13 ++++++++----- src/main/java/mqttloader/RecvTimeoutTask.java | 7 ++++++- src/main/java/mqttloader/Util.java | 16 ++++++++++++---- src/main/java/mqttloader/client/Publisher.java | 10 ++++++---- src/main/java/mqttloader/client/PublisherV3.java | 10 ++++++---- src/main/java/mqttloader/client/Subscriber.java | 9 +++++---- .../java/mqttloader/client/SubscriberV3.java | 9 +++++---- 8 files changed, 51 insertions(+), 26 deletions(-) diff --git a/src/main/java/mqttloader/Constants.java b/src/main/java/mqttloader/Constants.java index 0e498db..bce16c7 100644 --- a/src/main/java/mqttloader/Constants.java +++ b/src/main/java/mqttloader/Constants.java @@ -19,6 +19,9 @@ public class Constants { public static final String SUB_CLIENT_ID_PREFIX = "mqttloaderclient-sub"; public static final String PUB_CLIENT_ID_PREFIX = "mqttloaderclient-pub"; + public static final int MILLISECOND_IN_NANO = 1000000; + public static final int SECOND_IN_NANO = 1000000; + public static final int SECOND_IN_MILLI = 1000; public enum Opt { BROKER("b", "broker", true, "Broker URL. E.g., tcp://127.0.0.1:1883", null, true), diff --git a/src/main/java/mqttloader/Loader.java b/src/main/java/mqttloader/Loader.java index 4c6d7e0..166a65c 100644 --- a/src/main/java/mqttloader/Loader.java +++ b/src/main/java/mqttloader/Loader.java @@ -54,8 +54,8 @@ public class Loader { private ArrayList publishers = new ArrayList<>(); private ArrayList subscribers = new ArrayList<>(); public static volatile long startTime; + public static volatile long startNanoTime; private long endTime; - public static volatile long offset = 0; public static volatile long lastRecvTime; public static CountDownLatch countDownLatch; public static Logger logger = Logger.getLogger(Loader.class.getName()); @@ -95,8 +95,8 @@ public Loader(String[] args) { } int execTime = Integer.valueOf(cmd.getOptionValue(Opt.EXEC_TIME.getName(), Opt.EXEC_TIME.getDefaultValue())); - long holdTime = startTime - Util.getTime(); - if(holdTime > 0) execTime += (int)holdTime; + long holdNanoTime = Util.getElapsedNanoTime(); + if(holdNanoTime > 0) execTime += (int)(holdNanoTime/Constants.MILLISECOND_IN_NANO); try { countDownLatch.await(execTime, TimeUnit.SECONDS); } catch (InterruptedException e) { @@ -114,7 +114,7 @@ public Loader(String[] args) { logger.info("Terminating clients."); disconnectClients(); - endTime = Util.getTime(); + endTime = Util.getCurrentTimeMillis(); logger.info("Printing results."); dataCleansing(); @@ -207,6 +207,7 @@ private void prepareClients() { */ private void startMeasurement() { String ntpServer = cmd.getOptionValue(Opt.NTP.getName(), Opt.NTP.getDefaultValue()); + long offset = 0; if(ntpServer != null) { logger.info("Getting time information from NTP server."); NTPUDPClient client = new NTPUDPClient(); @@ -236,8 +237,10 @@ private void startMeasurement() { // delay: Give ScheduledExecutorService time to setup scheduling. long delay = publishers.size(); - startTime = Util.getTime() + delay; + startTime = System.currentTimeMillis() + offset + delay; + startNanoTime = System.nanoTime() + delay * Constants.MILLISECOND_IN_NANO; lastRecvTime = startTime; + for(IClient pub: publishers){ pub.start(delay); } diff --git a/src/main/java/mqttloader/RecvTimeoutTask.java b/src/main/java/mqttloader/RecvTimeoutTask.java index 2817379..e640241 100644 --- a/src/main/java/mqttloader/RecvTimeoutTask.java +++ b/src/main/java/mqttloader/RecvTimeoutTask.java @@ -26,6 +26,11 @@ public class RecvTimeoutTask extends TimerTask { private Timer timer; private int subTimeout; + /** + * + * @param timer Timer instance for this task. + * @param subTimeout Timeout value in second. + */ public RecvTimeoutTask(Timer timer, int subTimeout) { this.timer = timer; this.subTimeout = subTimeout; @@ -33,7 +38,7 @@ public RecvTimeoutTask(Timer timer, int subTimeout) { @Override public void run() { - long remainingTime = subTimeout*1000 - (Util.getTime() - lastRecvTime); // - + long remainingTime = subTimeout*Constants.SECOND_IN_MILLI - (Util.getCurrentTimeMillis() - lastRecvTime); // - if (remainingTime <= 0) { Loader.logger.info("Receiving messages on subscribers timed out."); countDownLatch.countDown(); diff --git a/src/main/java/mqttloader/Util.java b/src/main/java/mqttloader/Util.java index 6c485a9..b18d29f 100644 --- a/src/main/java/mqttloader/Util.java +++ b/src/main/java/mqttloader/Util.java @@ -63,11 +63,19 @@ public static void output(String filename, String str, boolean append){ } - public static byte[] genPayloads(int size) { - return ByteBuffer.allocate(size).putLong(getTime()).array(); + public static byte[] genPayloads(int size, long currentTime) { + return ByteBuffer.allocate(size).putLong(currentTime).array(); } - public static long getTime() { - return System.currentTimeMillis() + Loader.offset; + public static long getCurrentTimeMillis() { + return (getElapsedNanoTime()/Constants.MILLISECOND_IN_NANO) + Loader.startTime; + } + + /** + * + * @return Elapsed time from startTime in nano seconds. + */ + public static long getElapsedNanoTime() { + return System.nanoTime() - Loader.startNanoTime; } } diff --git a/src/main/java/mqttloader/client/Publisher.java b/src/main/java/mqttloader/client/Publisher.java index e4bc578..13e248d 100644 --- a/src/main/java/mqttloader/client/Publisher.java +++ b/src/main/java/mqttloader/client/Publisher.java @@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import mqttloader.Constants; import mqttloader.Loader; import mqttloader.Util; import mqttloader.record.Latency; @@ -123,15 +124,16 @@ public void periodicalRun() { } public void publish() { - message.setPayload(Util.genPayloads(payloadSize)); - try{ + long currentTime = Util.getCurrentTimeMillis(); + message.setPayload(Util.genPayloads(payloadSize, currentTime)); + try { client.publish(topic, message); - } catch(MqttException me) { + } catch (MqttException me) { Loader.logger.warning("On sending publish, MqttException occurred: "+clientId); me.printStackTrace(); } - int slot = (int)((Util.getTime()-Loader.startTime)/1000); + int slot = (int)((currentTime-Loader.startTime)/Constants.SECOND_IN_MILLI); if(throughputs.size()>0){ Throughput lastTh = throughputs.get(throughputs.size()-1); if(lastTh.getSlot() == slot) { diff --git a/src/main/java/mqttloader/client/PublisherV3.java b/src/main/java/mqttloader/client/PublisherV3.java index bcd821c..7809f23 100644 --- a/src/main/java/mqttloader/client/PublisherV3.java +++ b/src/main/java/mqttloader/client/PublisherV3.java @@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import mqttloader.Constants; import mqttloader.Loader; import mqttloader.Util; import mqttloader.record.Latency; @@ -124,15 +125,16 @@ public void periodicalRun() { } public void publish() { - message.setPayload(Util.genPayloads(payloadSize)); + long currentTime = Util.getCurrentTimeMillis(); + message.setPayload(Util.genPayloads(payloadSize, currentTime)); try { client.publish(topic, message); - } catch (MqttException e) { + } catch (MqttException me) { Loader.logger.warning("On sending publish, MqttException occurred: "+clientId); - e.printStackTrace(); + me.printStackTrace(); } - int slot = (int)((Util.getTime()-Loader.startTime)/1000); + int slot = (int)((currentTime-Loader.startTime)/Constants.SECOND_IN_MILLI); if(throughputs.size()>0){ Throughput lastTh = throughputs.get(throughputs.size()-1); if(lastTh.getSlot() == slot) { diff --git a/src/main/java/mqttloader/client/Subscriber.java b/src/main/java/mqttloader/client/Subscriber.java index 62b202e..d66eb6b 100644 --- a/src/main/java/mqttloader/client/Subscriber.java +++ b/src/main/java/mqttloader/client/Subscriber.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; +import mqttloader.Constants; import mqttloader.Loader; import mqttloader.Util; import mqttloader.record.Latency; @@ -102,8 +103,8 @@ public void mqttErrorOccurred(MqttException exception) {} @Override public void messageArrived(String topic, MqttMessage message) throws Exception { - long time = Util.getTime(); - int slot = (int)((time-Loader.startTime)/1000); + long currentTime = Util.getCurrentTimeMillis(); + int slot = (int)((currentTime - Loader.startTime)/Constants.SECOND_IN_MILLI); synchronized (throughputs) { if(throughputs.size()>0){ Throughput lastTh = throughputs.get(throughputs.size()-1); @@ -119,10 +120,10 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { long pubTime = ByteBuffer.wrap(message.getPayload()).getLong(); synchronized (latencies) { - latencies.add(new Latency(slot, (int)(time-pubTime))); + latencies.add(new Latency(slot, (int)(currentTime-pubTime))); } - Loader.lastRecvTime = time; + Loader.lastRecvTime = currentTime; Loader.logger.fine("Received a message (" + topic + "): "+clientId); } diff --git a/src/main/java/mqttloader/client/SubscriberV3.java b/src/main/java/mqttloader/client/SubscriberV3.java index 59d2fff..d984e28 100644 --- a/src/main/java/mqttloader/client/SubscriberV3.java +++ b/src/main/java/mqttloader/client/SubscriberV3.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; +import mqttloader.Constants; import mqttloader.Loader; import mqttloader.Util; import mqttloader.record.Latency; @@ -93,8 +94,8 @@ public void connectionLost(Throwable cause) {} @Override public void messageArrived(String topic, MqttMessage message) throws Exception { - long time = Util.getTime(); - int slot = (int)((time-Loader.startTime)/1000); + long currentTime = Util.getCurrentTimeMillis(); + int slot = (int)((currentTime - Loader.startTime)/ Constants.SECOND_IN_MILLI); synchronized (throughputs) { if(throughputs.size()>0){ Throughput lastTh = throughputs.get(throughputs.size()-1); @@ -110,10 +111,10 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { long pubTime = ByteBuffer.wrap(message.getPayload()).getLong(); synchronized (latencies) { - latencies.add(new Latency(slot, (int)(time-pubTime))); + latencies.add(new Latency(slot, (int)(currentTime-pubTime))); } - Loader.lastRecvTime = time; + Loader.lastRecvTime = currentTime; Loader.logger.fine("Received a message (" + topic + "): "+clientId); } From bcc9b05203ac7bae260eac392a2344e5e6407923 Mon Sep 17 00:00:00 2001 From: nabon Date: Sun, 6 Sep 2020 21:11:31 +0900 Subject: [PATCH 02/14] Changed to convert negative value of latency to zero. --- src/main/java/mqttloader/client/Subscriber.java | 8 +++++++- src/main/java/mqttloader/client/SubscriberV3.java | 8 +++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/main/java/mqttloader/client/Subscriber.java b/src/main/java/mqttloader/client/Subscriber.java index d66eb6b..f4807b3 100644 --- a/src/main/java/mqttloader/client/Subscriber.java +++ b/src/main/java/mqttloader/client/Subscriber.java @@ -119,8 +119,14 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { } long pubTime = ByteBuffer.wrap(message.getPayload()).getLong(); + int latency = (int)(currentTime - pubTime); + if (latency < 0) { + // If running MQTTLoader on multiple machines, a slight time error may cause a negative value of latency. + latency = 0; + Loader.logger.fine("Negative value of latency is converted to zero."); + } synchronized (latencies) { - latencies.add(new Latency(slot, (int)(currentTime-pubTime))); + latencies.add(new Latency(slot, latency)); } Loader.lastRecvTime = currentTime; diff --git a/src/main/java/mqttloader/client/SubscriberV3.java b/src/main/java/mqttloader/client/SubscriberV3.java index d984e28..f417ce8 100644 --- a/src/main/java/mqttloader/client/SubscriberV3.java +++ b/src/main/java/mqttloader/client/SubscriberV3.java @@ -110,8 +110,14 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { } long pubTime = ByteBuffer.wrap(message.getPayload()).getLong(); + int latency = (int)(currentTime - pubTime); + if (latency < 0) { + // If running MQTTLoader on multiple machines, a slight time error may cause a negative value of latency. + latency = 0; + Loader.logger.fine("Negative value of latency is converted to zero."); + } synchronized (latencies) { - latencies.add(new Latency(slot, (int)(currentTime-pubTime))); + latencies.add(new Latency(slot, latency)); } Loader.lastRecvTime = currentTime; From ee5806888d733bea9c5ef4aad2a421791cafd2a2 Mon Sep 17 00:00:00 2001 From: nabon Date: Mon, 7 Sep 2020 00:15:01 +0900 Subject: [PATCH 03/14] Add FileWriter class. --- src/main/java/mqttloader/FileWriter.java | 79 ++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 src/main/java/mqttloader/FileWriter.java diff --git a/src/main/java/mqttloader/FileWriter.java b/src/main/java/mqttloader/FileWriter.java new file mode 100644 index 0000000..bfbf28e --- /dev/null +++ b/src/main/java/mqttloader/FileWriter.java @@ -0,0 +1,79 @@ +package mqttloader; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.concurrent.ArrayBlockingQueue; + +public class FileWriter implements Runnable { + private ArrayBlockingQueue queue; + + private File file; + private FileOutputStream fos = null; + private OutputStreamWriter osw = null; + private BufferedWriter bw = null; + + private boolean terminated = false; + + public FileWriter(ArrayBlockingQueue queue, String filename) { + this.queue = queue; + file = new File(filename); + + if(file.exists()) { + file.delete(); + } + try { + file.createNewFile(); + } catch (IOException e) { + e.printStackTrace(); + } + + try { + fos = new FileOutputStream(file, true); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + osw = new OutputStreamWriter(fos); + bw = new BufferedWriter(osw); + } + + @Override + public void run() { + String record = null; + while (true) { + if (terminated) { + break; + } + try { + record = queue.take(); + if(record != null) { + bw.write(record); + bw.newLine(); + } + } catch (InterruptedException | IOException e) { + e.printStackTrace(); + } + } + + try{ + bw.flush(); + bw.close(); + osw.close(); + fos.close(); + } catch(IOException e){ + e.printStackTrace(); + } finally { + try { + if(bw != null) bw.close(); + if(osw != null) osw.close(); + if(fos != null) fos.close(); + } catch (IOException e) { + e.printStackTrace(); + System.exit(1); + } + } + } +} From 23d4be3ca07d4b21987e87c724404b740d1c750a Mon Sep 17 00:00:00 2001 From: nabon Date: Wed, 9 Sep 2020 01:31:25 +0900 Subject: [PATCH 04/14] Change how to record sending/receiving messages. --- src/main/java/mqttloader/Constants.java | 8 +- src/main/java/mqttloader/FileWriter.java | 27 +- src/main/java/mqttloader/Loader.java | 423 ++++++++---------- src/main/java/mqttloader/client/IClient.java | 6 - .../java/mqttloader/client/Publisher.java | 36 +- .../java/mqttloader/client/PublisherV3.java | 36 +- .../java/mqttloader/client/Subscriber.java | 44 +- .../java/mqttloader/client/SubscriberV3.java | 44 +- src/main/java/mqttloader/record/Latency.java | 35 -- .../java/mqttloader/record/Throughput.java | 39 -- 10 files changed, 225 insertions(+), 473 deletions(-) delete mode 100644 src/main/java/mqttloader/record/Latency.java delete mode 100644 src/main/java/mqttloader/record/Throughput.java diff --git a/src/main/java/mqttloader/Constants.java b/src/main/java/mqttloader/Constants.java index bce16c7..031d7cc 100644 --- a/src/main/java/mqttloader/Constants.java +++ b/src/main/java/mqttloader/Constants.java @@ -17,8 +17,10 @@ package mqttloader; public class Constants { - public static final String SUB_CLIENT_ID_PREFIX = "mqttloaderclient-sub"; - public static final String PUB_CLIENT_ID_PREFIX = "mqttloaderclient-pub"; + public static final String FILE_NAME_PREFIX = "mqttloader_"; + public static final String SUB_CLIENT_ID_PREFIX = "ml-s-"; + public static final String PUB_CLIENT_ID_PREFIX = "ml-p-"; + public static final String STOP_SIGNAL = "TERMINATED"; public static final int MILLISECOND_IN_NANO = 1000000; public static final int SECOND_IN_NANO = 1000000; public static final int SECOND_IN_MILLI = 1000; @@ -42,8 +44,6 @@ public enum Opt { EXEC_TIME("et", "exectime", true, "Execution time in seconds.", "60"), LOG_LEVEL("l", "log", true, "Log level (SEVERE/WARNING/INFO/ALL).", "WARNING"), NTP("n", "ntp", true, "NTP server. E.g., ntp.nict.jp", null), - TH_FILE("tf", "thfile", true, "File name for throughput data.", null), - LT_FILE("lf", "ltfile", true, "File name for latency data.", null), HELP("h", "help", false, "Display help.", null); private String name; diff --git a/src/main/java/mqttloader/FileWriter.java b/src/main/java/mqttloader/FileWriter.java index bfbf28e..495c75e 100644 --- a/src/main/java/mqttloader/FileWriter.java +++ b/src/main/java/mqttloader/FileWriter.java @@ -6,31 +6,14 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; -import java.util.concurrent.ArrayBlockingQueue; public class FileWriter implements Runnable { - private ArrayBlockingQueue queue; - private File file; private FileOutputStream fos = null; private OutputStreamWriter osw = null; private BufferedWriter bw = null; - private boolean terminated = false; - - public FileWriter(ArrayBlockingQueue queue, String filename) { - this.queue = queue; - file = new File(filename); - - if(file.exists()) { - file.delete(); - } - try { - file.createNewFile(); - } catch (IOException e) { - e.printStackTrace(); - } - + public FileWriter(File file) { try { fos = new FileOutputStream(file, true); } catch (FileNotFoundException e) { @@ -44,12 +27,12 @@ public FileWriter(ArrayBlockingQueue queue, String filename) { public void run() { String record = null; while (true) { - if (terminated) { - break; - } try { - record = queue.take(); + record = Loader.queue.take(); if(record != null) { + if(record.equals(Constants.STOP_SIGNAL)) { + break; + } bw.write(record); bw.newLine(); } diff --git a/src/main/java/mqttloader/Loader.java b/src/main/java/mqttloader/Loader.java index 166a65c..474bf60 100644 --- a/src/main/java/mqttloader/Loader.java +++ b/src/main/java/mqttloader/Loader.java @@ -16,18 +16,28 @@ package mqttloader; +import static java.lang.System.exit; import static mqttloader.Constants.Opt; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStreamReader; import java.net.InetAddress; +import java.net.MalformedURLException; import java.net.SocketException; +import java.net.URISyntaxException; +import java.net.URL; import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.Iterator; +import java.util.StringTokenizer; import java.util.Timer; import java.util.TreeMap; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -38,8 +48,6 @@ import mqttloader.client.PublisherV3; import mqttloader.client.Subscriber; import mqttloader.client.SubscriberV3; -import mqttloader.record.Latency; -import mqttloader.record.Throughput; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; @@ -57,6 +65,8 @@ public class Loader { public static volatile long startNanoTime; private long endTime; public static volatile long lastRecvTime; + public static ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1000000); + private File file; public static CountDownLatch countDownLatch; public static Logger logger = Logger.getLogger(Loader.class.getName()); private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS z"); @@ -64,6 +74,10 @@ public class Loader { public Loader(String[] args) { setOptions(args); + String logLevel = cmd.getOptionValue(Opt.LOG_LEVEL.getName(), Opt.LOG_LEVEL.getDefaultValue()); + logger.setLevel(Level.parse(logLevel)); + logger.info("Starting mqttloader tool."); + int numPub = Integer.valueOf(cmd.getOptionValue(Opt.NUM_PUB.getName(), Opt.NUM_PUB.getDefaultValue())); int numSub = Integer.valueOf(cmd.getOptionValue(Opt.NUM_SUB.getName(), Opt.NUM_SUB.getDefaultValue())); if (numSub > 0) { @@ -72,18 +86,14 @@ public Loader(String[] args) { countDownLatch = new CountDownLatch(numPub); } - String logLevel = cmd.getOptionValue(Opt.LOG_LEVEL.getName(), Opt.LOG_LEVEL.getDefaultValue()); - logger.setLevel(Level.parse(logLevel)); - - logger.info("Starting mqttloader tool."); logger.info("Preparing clients."); prepareClients(); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } + file = getFile(); + logger.info("Data file is placed at: "+file.getAbsolutePath()); + FileWriter writer = new FileWriter(file); + Thread fileThread = new Thread(writer); + fileThread.start(); logger.info("Starting measurement."); startMeasurement(); @@ -116,18 +126,15 @@ public Loader(String[] args) { endTime = Util.getCurrentTimeMillis(); - logger.info("Printing results."); - dataCleansing(); - - printThroughput(true); - System.out.println(); - printThroughput(false); - printLatency(); + queue.offer(Constants.STOP_SIGNAL); + try { + fileThread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } - String thFile = cmd.getOptionValue(Opt.TH_FILE.getName(), Opt.TH_FILE.getDefaultValue()); - String ltFile = cmd.getOptionValue(Opt.LT_FILE.getName(), Opt.LT_FILE.getDefaultValue()); - if(thFile!=null) thToFile(); - if(ltFile!=null) ltToFile(); + logger.info("Printing results."); + calcResult(); } private void setOptions(String[] args) { @@ -143,7 +150,7 @@ private void setOptions(String[] args) { for(String arg: args){ if(arg.equals("-"+Opt.HELP.getName()) || arg.equals("--"+options.getOption(Opt.HELP.getName()).getLongOpt())){ printHelp(options); - System.exit(0); + exit(0); } } @@ -153,7 +160,7 @@ private void setOptions(String[] args) { } catch (ParseException e) { logger.severe("Failed to parse options."); printHelp(options); - System.exit(1); + exit(1); } } @@ -163,6 +170,34 @@ private void printHelp(Options options) { help.printHelp(Loader.class.getName(), options, true); } + private File getFile() { + File file; + try { + URL url = Loader.class.getProtectionDomain().getCodeSource().getLocation(); + file = new File(new URL(url.toString()).toURI()); + if(file.getParentFile().getName().equals("lib")){ + file = file.getParentFile().getParentFile(); + } else { + file = new File("").getAbsoluteFile(); + } + } catch (SecurityException | NullPointerException | URISyntaxException | MalformedURLException e) { + file = new File("").getAbsoluteFile(); + } + String date = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date(System.currentTimeMillis()+getOffsetFromNtpServer())); + file = new File(file, Constants.FILE_NAME_PREFIX+date+".csv"); + + if(file.exists()) { + file.delete(); + } + try { + file.createNewFile(); + } catch (IOException e) { + e.printStackTrace(); + } + + return file; + } + private void prepareClients() { String broker = cmd.getOptionValue(Opt.BROKER.getName(), Opt.BROKER.getDefaultValue()); int version = Integer.valueOf(cmd.getOptionValue(Opt.VERSION.getName(), Opt.VERSION.getDefaultValue())); @@ -206,6 +241,18 @@ private void prepareClients() { * Start measurement by running publishers. */ private void startMeasurement() { + // delay: Give ScheduledExecutorService time to setup scheduling. + long delay = publishers.size(); + startTime = System.currentTimeMillis() + getOffsetFromNtpServer() + delay; + startNanoTime = System.nanoTime() + delay * Constants.MILLISECOND_IN_NANO; + lastRecvTime = startTime; + + for(IClient pub: publishers){ + pub.start(delay); + } + } + + private long getOffsetFromNtpServer() { String ntpServer = cmd.getOptionValue(Opt.NTP.getName(), Opt.NTP.getDefaultValue()); long offset = 0; if(ntpServer != null) { @@ -235,15 +282,7 @@ private void startMeasurement() { } } - // delay: Give ScheduledExecutorService time to setup scheduling. - long delay = publishers.size(); - startTime = System.currentTimeMillis() + offset + delay; - startNanoTime = System.nanoTime() + delay * Constants.MILLISECOND_IN_NANO; - lastRecvTime = startTime; - - for(IClient pub: publishers){ - pub.start(delay); - } + return offset; } private void disconnectClients() { @@ -264,253 +303,159 @@ private void disconnectClients() { } } - private void dataCleansing() { - int rampup = Integer.valueOf(cmd.getOptionValue(Opt.RAMP_UP.getName(), Opt.RAMP_UP.getDefaultValue())); - int rampdown = Integer.valueOf(cmd.getOptionValue(Opt.RAMP_DOWN.getName(), Opt.RAMP_DOWN.getDefaultValue())); - if(rampup==0 && rampdown==0) return; + private void calcResult() { + TreeMap sendThroughputs = new TreeMap<>(); + TreeMap recvThroughputs = new TreeMap<>(); + TreeMap> latencies = new TreeMap<>(); + + FileInputStream fis = null; + InputStreamReader isr = null; + BufferedReader br = null; + try{ + fis = new FileInputStream(file); + isr = new InputStreamReader(fis); + br = new BufferedReader(isr); + + String str; + while ((str = br.readLine()) != null) { + StringTokenizer st = new StringTokenizer(str, ","); + long timestamp = Long.valueOf(st.nextToken()); + st.nextToken(); //client ID + boolean isSendRecord = st.nextToken().equals("S") ? true : false; + int latency = 0; + if (st.hasMoreTokens()) { + latency = Integer.valueOf(st.nextToken()); + } - int pubFirstSlot = Integer.MAX_VALUE; - int pubLastSlot = 0; - for(IClient pub: publishers){ - ArrayList list = pub.getThroughputs(); - if(!list.isEmpty()) { - int first = list.get(0).getSlot(); - int last = list.get(list.size()-1).getSlot(); - if(first < pubFirstSlot) pubFirstSlot = first; - if(last > pubLastSlot) pubLastSlot = last; - } - } + int elapsedSecond = (int)((timestamp-startTime)/1000); + if(isSendRecord) { + if(sendThroughputs.containsKey(elapsedSecond)) { + sendThroughputs.put(elapsedSecond, sendThroughputs.get(elapsedSecond)+1); + } else { + sendThroughputs.put(elapsedSecond, 1); + } + } else { + if(recvThroughputs.containsKey(elapsedSecond)) { + recvThroughputs.put(elapsedSecond, recvThroughputs.get(elapsedSecond)+1); + } else { + recvThroughputs.put(elapsedSecond, 1); + } - for(IClient pub: publishers) { - Iterator itr = pub.getThroughputs().iterator(); - while(itr.hasNext()){ - Throughput th = itr.next(); - if(th.getSlot() < rampup+pubFirstSlot) { - itr.remove(); - }else if(th.getSlot() > pubLastSlot-rampdown){ - itr.remove(); + if(!latencies.containsKey(elapsedSecond)) { + latencies.put(elapsedSecond, new ArrayList()); + } + latencies.get(elapsedSecond).add(latency); } } - } - int subFirstSlot = Integer.MAX_VALUE; - int subLastSlot = 0; - for(IClient sub: subscribers){ - ArrayList list = sub.getThroughputs(); - if(!list.isEmpty()) { - int first = list.get(0).getSlot(); - int last = list.get(list.size()-1).getSlot(); - if(first < subFirstSlot) subFirstSlot = first; - if(last > subLastSlot) subLastSlot = last; + br.close(); + isr.close(); + fis.close(); + } catch(IOException e){ + e.printStackTrace(); + } finally { + try { + if(br != null) br.close(); + if(isr != null) isr.close(); + if(fis != null) fis.close(); + } catch (IOException e) { + e.printStackTrace(); + System.exit(1); } } - for(IClient sub: subscribers){ - Iterator itrTh = sub.getThroughputs().iterator(); - while(itrTh.hasNext()){ - Throughput th = itrTh.next(); - if(th.getSlot() < rampup+subFirstSlot) { - itrTh.remove(); - }else if(th.getSlot() > subLastSlot-rampdown){ - itrTh.remove(); - } - } + int rampup = Integer.valueOf(cmd.getOptionValue(Opt.RAMP_UP.getName(), Opt.RAMP_UP.getDefaultValue())); + int rampdown = Integer.valueOf(cmd.getOptionValue(Opt.RAMP_DOWN.getName(), Opt.RAMP_DOWN.getDefaultValue())); + + trimTreeMap(sendThroughputs, rampup, rampdown); + trimTreeMap(recvThroughputs, rampup, rampdown); + trimTreeMap(latencies, rampup, rampdown); - Iterator itrLt = sub.getLatencies().iterator(); - while(itrLt.hasNext()){ - Latency lt = itrLt.next(); - if(lt.getSlot() < rampup+subFirstSlot) { - itrLt.remove(); - }else if(lt.getSlot() > subLastSlot-rampdown){ - itrLt.remove(); + paddingTreeMap(sendThroughputs); + paddingTreeMap(recvThroughputs); + + System.out.println("-----Publisher-----"); + printThroughput(sendThroughputs, true); + System.out.println(); + System.out.println("-----Subscriber-----"); + printThroughput(recvThroughputs, false); + + int maxLt = 0; + long sumLt = 0; + int count = 0; + for(ArrayList latencyList: latencies.values()){ + for(int latency: latencyList){ + if(latency > maxLt) { + maxLt = latency; } + sumLt += latency; + count++; } } - } + double aveLt = count>0 ? (double)sumLt/count : 0; - private void printThroughput(boolean forPub) { - TreeMap thTotal = new TreeMap<>(); - ArrayList clients; - if(forPub){ - clients = publishers; - }else{ - clients = subscribers; - } + System.out.println("Maximum latency[ms]: "+maxLt); + System.out.println("Average latency[ms]: "+aveLt); + } - for(IClient client: clients){ - ArrayList ths = client.getThroughputs(); - for(Throughput th : ths) { - if(thTotal.containsKey(th.getSlot())){ - thTotal.put(th.getSlot(), thTotal.get(th.getSlot())+th.getCount()); - }else{ - thTotal.put(th.getSlot(), th.getCount()); - } + private void trimTreeMap(TreeMap map, int rampup, int rampdown) { + if(map.size() == 0) { + return; + } + int firstTime = map.firstKey(); + int lastTime = map.lastKey(); + Iterator itr = map.keySet().iterator(); + while(itr.hasNext()){ + int time = itr.next(); + if(time < rampup+firstTime) { + itr.remove(); + }else if(time > lastTime-rampdown){ + itr.remove(); } } + } - if(thTotal.size()>0){ - for(int i=thTotal.firstKey(); i<=thTotal.lastKey(); i++){ - if(!thTotal.containsKey(i)){ - thTotal.put(i, 0); - } + private void paddingTreeMap(TreeMap map) { + if(map.size() == 0) { + return; + } + for(int i=map.firstKey();i throughputs, boolean forPublisher) { int maxTh = 0; int sumMsg = 0; - for(int slot: thTotal.keySet()){ - int th = thTotal.get(slot); + for(int elapsedSecond: throughputs.keySet()){ + int th = throughputs.get(elapsedSecond); if(th > maxTh) { maxTh = th; } sumMsg += th; } - double aveTh = thTotal.size()>0 ? (double)sumMsg/thTotal.size() : 0; - if(forPub){ - System.out.println("-----Publisher-----"); - }else{ - System.out.println("-----Subscriber-----"); - } + double aveTh = throughputs.size()>0 ? (double)sumMsg/throughputs.size() : 0; System.out.println("Maximum throughput[msg/s]: "+maxTh); System.out.println("Average throughput[msg/s]: "+aveTh); - if(forPub){ + if(forPublisher){ System.out.println("Number of published messages: "+sumMsg); }else{ System.out.println("Number of received messages: "+sumMsg); } + System.out.print("Throughput[msg/s]: "); - for(int slot: thTotal.keySet()){ - System.out.print(thTotal.get(slot)); - if(slot maxLt) maxLt = lt; - sumLt += lt; - count++; - } - } - double aveLt = count>0 ? (double)sumLt/count : 0; - - System.out.println("Maximum latency[ms]: "+maxLt); - System.out.println("Average latency[ms]: "+aveLt); - } - - private void thToFile(){ - StringBuilder sb = new StringBuilder(); - - String sTime = sdf.format(new Date(startTime)); - String eTime = sdf.format(new Date(endTime)); - sb.append("Measurement start time: "+sTime+"\n"); - sb.append("Measurement end time: "+eTime+"\n"); - - sb.append("SLOT"); - for(int i=0;i - TreeMap> thAggregate = new TreeMap<>(); - for(int i=0;i pubth = publishers.get(i).getThroughputs(); - for(Throughput th: pubth) { - if(!thAggregate.containsKey(th.getSlot())){ - TreeMap map = new TreeMap<>(); - thAggregate.put(th.getSlot(), map); - } - thAggregate.get(th.getSlot()).put(i, th.getCount()); - } - } - for(int i=0;i subth = subscribers.get(i).getThroughputs(); - for(Throughput th: subth) { - if(!thAggregate.containsKey(th.getSlot())){ - TreeMap map = new TreeMap<>(); - thAggregate.put(th.getSlot(), map); - } - thAggregate.get(th.getSlot()).put(i+publishers.size(), th.getCount()); - } - } - - int numClients = publishers.size()+subscribers.size(); - if(thAggregate.size()>0){ - for(int slot=thAggregate.firstKey();slot0) sb.append(", "); - sb.append(subscribers.get(i).getClientId()); - } - sb.append("\n"); - - int index = 0; - while(true) { - StringBuilder lineSb = new StringBuilder(); - boolean hasNext = false; - for(int i=0;iindex){ - lt = subscribers.get(i).getLatencies().get(index).getLatency(); - hasNext = true; - } - if(i>0) lineSb.append(", "); - lineSb.append(lt); - } - lineSb.append("\n"); - if(hasNext){ - index++; - sb.append(lineSb); - }else{ - break; - } - } - - String ltFile = cmd.getOptionValue(Opt.LT_FILE.getName(), Opt.LT_FILE.getDefaultValue()); - Util.output(ltFile, sb.toString(), false); - } - public static void main(String[] args){ new Loader(args); } diff --git a/src/main/java/mqttloader/client/IClient.java b/src/main/java/mqttloader/client/IClient.java index 1efa77c..2617d08 100644 --- a/src/main/java/mqttloader/client/IClient.java +++ b/src/main/java/mqttloader/client/IClient.java @@ -16,15 +16,9 @@ package mqttloader.client; -import java.util.ArrayList; - -import mqttloader.record.Latency; -import mqttloader.record.Throughput; public interface IClient { String getClientId(); void start(long delay); void disconnect(); - ArrayList getThroughputs(); - ArrayList getLatencies(); } diff --git a/src/main/java/mqttloader/client/Publisher.java b/src/main/java/mqttloader/client/Publisher.java index 13e248d..ee1e865 100644 --- a/src/main/java/mqttloader/client/Publisher.java +++ b/src/main/java/mqttloader/client/Publisher.java @@ -18,17 +18,13 @@ import static mqttloader.Constants.PUB_CLIENT_ID_PREFIX; -import java.util.ArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import mqttloader.Constants; import mqttloader.Loader; import mqttloader.Util; -import mqttloader.record.Latency; -import mqttloader.record.Throughput; import org.eclipse.paho.mqttv5.client.MqttClient; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; import org.eclipse.paho.mqttv5.common.MqttException; @@ -43,9 +39,6 @@ public class Publisher implements Runnable, IClient { private final int pubInterval; private MqttMessage message = new MqttMessage(); - // If change publisher to be multi-threaded, throughputs (and others) should be thread-safe. - private ArrayList throughputs = new ArrayList<>(); - private ScheduledExecutorService service; private ScheduledFuture future; @@ -59,7 +52,7 @@ public Publisher(int clientNumber, String broker, int qos, boolean retain, Strin this.numMessage = numMessage; this.pubInterval = pubInterval; - clientId = PUB_CLIENT_ID_PREFIX + String.format("%06d", clientNumber); + clientId = PUB_CLIENT_ID_PREFIX + String.format("%05d", clientNumber); MqttConnectionOptions options = new MqttConnectionOptions(); try { client = new MqttClient(broker, clientId); @@ -133,17 +126,12 @@ public void publish() { me.printStackTrace(); } - int slot = (int)((currentTime-Loader.startTime)/Constants.SECOND_IN_MILLI); - if(throughputs.size()>0){ - Throughput lastTh = throughputs.get(throughputs.size()-1); - if(lastTh.getSlot() == slot) { - lastTh.setCount(lastTh.getCount()+1); - }else{ - throughputs.add(new Throughput(slot, 1)); - } - }else{ - throughputs.add(new Throughput(slot, 1)); - } + StringBuilder sb = new StringBuilder(); + sb.append(currentTime); + sb.append(","); + sb.append(clientId); + sb.append(",S,"); + Loader.queue.offer(new String(sb)); Loader.logger.fine("Published a message (" + topic + "): "+clientId); } @@ -176,14 +164,4 @@ public void disconnect() { public String getClientId() { return clientId; } - - @Override - public ArrayList getThroughputs() { - return throughputs; - } - - @Override - public ArrayList getLatencies(){ - return null; - } } diff --git a/src/main/java/mqttloader/client/PublisherV3.java b/src/main/java/mqttloader/client/PublisherV3.java index 7809f23..69dbb23 100644 --- a/src/main/java/mqttloader/client/PublisherV3.java +++ b/src/main/java/mqttloader/client/PublisherV3.java @@ -18,17 +18,13 @@ import static mqttloader.Constants.PUB_CLIENT_ID_PREFIX; -import java.util.ArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import mqttloader.Constants; import mqttloader.Loader; import mqttloader.Util; -import mqttloader.record.Latency; -import mqttloader.record.Throughput; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; @@ -43,9 +39,6 @@ public class PublisherV3 implements Runnable, IClient { private final int pubInterval; private MqttMessage message = new MqttMessage(); - // If change publisher to be multi-threaded, throughputs (and others) should be thread-safe. - private ArrayList throughputs = new ArrayList<>(); - private ScheduledExecutorService service; private ScheduledFuture future; @@ -59,7 +52,7 @@ public PublisherV3(int clientNumber, String broker, int qos, boolean retain, Str this.numMessage = numMessage; this.pubInterval = pubInterval; - clientId = PUB_CLIENT_ID_PREFIX + String.format("%06d", clientNumber); + clientId = PUB_CLIENT_ID_PREFIX + String.format("%05d", clientNumber); MqttConnectOptions options = new MqttConnectOptions(); options.setMqttVersion(4); try { @@ -134,17 +127,12 @@ public void publish() { me.printStackTrace(); } - int slot = (int)((currentTime-Loader.startTime)/Constants.SECOND_IN_MILLI); - if(throughputs.size()>0){ - Throughput lastTh = throughputs.get(throughputs.size()-1); - if(lastTh.getSlot() == slot) { - lastTh.setCount(lastTh.getCount()+1); - }else{ - throughputs.add(new Throughput(slot, 1)); - } - }else{ - throughputs.add(new Throughput(slot, 1)); - } + StringBuilder sb = new StringBuilder(); + sb.append(currentTime); + sb.append(","); + sb.append(clientId); + sb.append(",S,"); + Loader.queue.offer(new String(sb)); Loader.logger.fine("Published a message (" + topic + "): "+clientId); } @@ -177,14 +165,4 @@ public void disconnect() { public String getClientId() { return clientId; } - - @Override - public ArrayList getThroughputs() { - return throughputs; - } - - @Override - public ArrayList getLatencies(){ - return null; - } } diff --git a/src/main/java/mqttloader/client/Subscriber.java b/src/main/java/mqttloader/client/Subscriber.java index f4807b3..39fa070 100644 --- a/src/main/java/mqttloader/client/Subscriber.java +++ b/src/main/java/mqttloader/client/Subscriber.java @@ -19,13 +19,9 @@ import static mqttloader.Constants.SUB_CLIENT_ID_PREFIX; import java.nio.ByteBuffer; -import java.util.ArrayList; -import mqttloader.Constants; import mqttloader.Loader; import mqttloader.Util; -import mqttloader.record.Latency; -import mqttloader.record.Throughput; import org.eclipse.paho.mqttv5.client.IMqttToken; import org.eclipse.paho.mqttv5.client.MqttCallback; import org.eclipse.paho.mqttv5.client.MqttClient; @@ -39,11 +35,8 @@ public class Subscriber implements MqttCallback, IClient { private MqttClient client; private final String clientId; - private ArrayList throughputs = new ArrayList<>(); - private ArrayList latencies = new ArrayList<>(); - public Subscriber(int clientNumber, String broker, int qos, boolean shSub, String topic) { - clientId = SUB_CLIENT_ID_PREFIX + String.format("%06d", clientNumber); + clientId = SUB_CLIENT_ID_PREFIX + String.format("%05d", clientNumber); MqttConnectionOptions options = new MqttConnectionOptions(); try { client = new MqttClient(broker, clientId); @@ -85,16 +78,6 @@ public String getClientId() { return clientId; } - @Override - public ArrayList getThroughputs() { - return throughputs; - } - - @Override - public ArrayList getLatencies() { - return latencies; - } - @Override public void disconnected(MqttDisconnectResponse disconnectResponse) {} @@ -104,20 +87,6 @@ public void mqttErrorOccurred(MqttException exception) {} @Override public void messageArrived(String topic, MqttMessage message) throws Exception { long currentTime = Util.getCurrentTimeMillis(); - int slot = (int)((currentTime - Loader.startTime)/Constants.SECOND_IN_MILLI); - synchronized (throughputs) { - if(throughputs.size()>0){ - Throughput lastTh = throughputs.get(throughputs.size()-1); - if(lastTh.getSlot() == slot) { - lastTh.setCount(lastTh.getCount()+1); - }else{ - throughputs.add(new Throughput(slot, 1)); - } - }else{ - throughputs.add(new Throughput(slot, 1)); - } - } - long pubTime = ByteBuffer.wrap(message.getPayload()).getLong(); int latency = (int)(currentTime - pubTime); if (latency < 0) { @@ -125,9 +94,14 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { latency = 0; Loader.logger.fine("Negative value of latency is converted to zero."); } - synchronized (latencies) { - latencies.add(new Latency(slot, latency)); - } + + StringBuilder sb = new StringBuilder(); + sb.append(currentTime); + sb.append(","); + sb.append(clientId); + sb.append(",R,"); + sb.append(latency); + Loader.queue.offer(new String(sb)); Loader.lastRecvTime = currentTime; diff --git a/src/main/java/mqttloader/client/SubscriberV3.java b/src/main/java/mqttloader/client/SubscriberV3.java index f417ce8..41b42c5 100644 --- a/src/main/java/mqttloader/client/SubscriberV3.java +++ b/src/main/java/mqttloader/client/SubscriberV3.java @@ -19,13 +19,9 @@ import static mqttloader.Constants.SUB_CLIENT_ID_PREFIX; import java.nio.ByteBuffer; -import java.util.ArrayList; -import mqttloader.Constants; import mqttloader.Loader; import mqttloader.Util; -import mqttloader.record.Latency; -import mqttloader.record.Throughput; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; @@ -38,11 +34,8 @@ public class SubscriberV3 implements MqttCallback, IClient { private MqttClient client; private final String clientId; - private ArrayList throughputs = new ArrayList<>(); - private ArrayList latencies = new ArrayList<>(); - public SubscriberV3(int clientNumber, String broker, int qos, String topic) { - clientId = SUB_CLIENT_ID_PREFIX + String.format("%06d", clientNumber); + clientId = SUB_CLIENT_ID_PREFIX + String.format("%05d", clientNumber); MqttConnectOptions options = new MqttConnectOptions(); options.setMqttVersion(4); try { @@ -79,36 +72,12 @@ public String getClientId() { return clientId; } - @Override - public ArrayList getThroughputs() { - return throughputs; - } - - @Override - public ArrayList getLatencies() { - return latencies; - } - @Override public void connectionLost(Throwable cause) {} @Override public void messageArrived(String topic, MqttMessage message) throws Exception { long currentTime = Util.getCurrentTimeMillis(); - int slot = (int)((currentTime - Loader.startTime)/ Constants.SECOND_IN_MILLI); - synchronized (throughputs) { - if(throughputs.size()>0){ - Throughput lastTh = throughputs.get(throughputs.size()-1); - if(lastTh.getSlot() == slot) { - lastTh.setCount(lastTh.getCount()+1); - }else{ - throughputs.add(new Throughput(slot, 1)); - } - }else{ - throughputs.add(new Throughput(slot, 1)); - } - } - long pubTime = ByteBuffer.wrap(message.getPayload()).getLong(); int latency = (int)(currentTime - pubTime); if (latency < 0) { @@ -116,9 +85,14 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { latency = 0; Loader.logger.fine("Negative value of latency is converted to zero."); } - synchronized (latencies) { - latencies.add(new Latency(slot, latency)); - } + + StringBuilder sb = new StringBuilder(); + sb.append(currentTime); + sb.append(","); + sb.append(clientId); + sb.append(",R,"); + sb.append(latency); + Loader.queue.offer(new String(sb)); Loader.lastRecvTime = currentTime; diff --git a/src/main/java/mqttloader/record/Latency.java b/src/main/java/mqttloader/record/Latency.java deleted file mode 100644 index 00bf181..0000000 --- a/src/main/java/mqttloader/record/Latency.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2020 Distributed Systems Group - * - *

Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *

http://www.apache.org/licenses/LICENSE-2.0 - * - *

Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package mqttloader.record; - -public class Latency { - private final int slot; - private final int latency; - - public Latency(int slot, int latency) { - this.slot = slot; - this.latency = latency; - } - - public int getSlot() { - return slot; - } - - public int getLatency() { - return latency; - } -} diff --git a/src/main/java/mqttloader/record/Throughput.java b/src/main/java/mqttloader/record/Throughput.java deleted file mode 100644 index f6af83b..0000000 --- a/src/main/java/mqttloader/record/Throughput.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2020 Distributed Systems Group - * - *

Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *

http://www.apache.org/licenses/LICENSE-2.0 - * - *

Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package mqttloader.record; - -public class Throughput { - private final int slot; - private int count; - - public Throughput(int slot, int count) { - this.slot = slot; - this.count = count; - } - - public int getSlot() { - return slot; - } - - public int getCount() { - return count; - } - - public void setCount(int count) { - this.count = count; - } -} From 4e2bbb0671268db815c957729f810cc8accb0201 Mon Sep 17 00:00:00 2001 From: nabon Date: Wed, 9 Sep 2020 10:45:57 +0900 Subject: [PATCH 05/14] Extract common logics of clients as abstract classes. --- src/main/java/mqttloader/FileWriter.java | 16 +++ src/main/java/mqttloader/Loader.java | 11 +- .../{IClient.java => AbstractClient.java} | 15 ++- .../mqttloader/client/AbstractPublisher.java | 125 ++++++++++++++++++ .../mqttloader/client/AbstractSubscriber.java | 54 ++++++++ .../java/mqttloader/client/Publisher.java | 107 ++------------- .../java/mqttloader/client/PublisherV3.java | 107 ++------------- .../java/mqttloader/client/Subscriber.java | 40 +----- .../java/mqttloader/client/SubscriberV3.java | 40 +----- 9 files changed, 236 insertions(+), 279 deletions(-) rename src/main/java/mqttloader/client/{IClient.java => AbstractClient.java} (70%) create mode 100644 src/main/java/mqttloader/client/AbstractPublisher.java create mode 100644 src/main/java/mqttloader/client/AbstractSubscriber.java diff --git a/src/main/java/mqttloader/FileWriter.java b/src/main/java/mqttloader/FileWriter.java index 495c75e..53471d0 100644 --- a/src/main/java/mqttloader/FileWriter.java +++ b/src/main/java/mqttloader/FileWriter.java @@ -1,3 +1,19 @@ +/* + * Copyright 2020 Distributed Systems Group + * + *

Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package mqttloader; import java.io.BufferedWriter; diff --git a/src/main/java/mqttloader/Loader.java b/src/main/java/mqttloader/Loader.java index 474bf60..4d7ff8d 100644 --- a/src/main/java/mqttloader/Loader.java +++ b/src/main/java/mqttloader/Loader.java @@ -43,7 +43,8 @@ import java.util.logging.Level; import java.util.logging.Logger; -import mqttloader.client.IClient; +import mqttloader.client.AbstractClient; +import mqttloader.client.AbstractPublisher; import mqttloader.client.Publisher; import mqttloader.client.PublisherV3; import mqttloader.client.Subscriber; @@ -59,8 +60,8 @@ public class Loader { private CommandLine cmd = null; - private ArrayList publishers = new ArrayList<>(); - private ArrayList subscribers = new ArrayList<>(); + private ArrayList publishers = new ArrayList<>(); + private ArrayList subscribers = new ArrayList<>(); public static volatile long startTime; public static volatile long startNanoTime; private long endTime; @@ -247,8 +248,8 @@ private void startMeasurement() { startNanoTime = System.nanoTime() + delay * Constants.MILLISECOND_IN_NANO; lastRecvTime = startTime; - for(IClient pub: publishers){ - pub.start(delay); + for(AbstractClient pub: publishers){ + ((AbstractPublisher)pub).start(delay); } } diff --git a/src/main/java/mqttloader/client/IClient.java b/src/main/java/mqttloader/client/AbstractClient.java similarity index 70% rename from src/main/java/mqttloader/client/IClient.java rename to src/main/java/mqttloader/client/AbstractClient.java index 2617d08..f270c23 100644 --- a/src/main/java/mqttloader/client/IClient.java +++ b/src/main/java/mqttloader/client/AbstractClient.java @@ -16,9 +16,16 @@ package mqttloader.client; +public abstract class AbstractClient { + protected final String clientId; -public interface IClient { - String getClientId(); - void start(long delay); - void disconnect(); + public AbstractClient(String clientId) { + this.clientId = clientId; + } + + public abstract void disconnect(); + + public String getClientId() { + return clientId; + } } diff --git a/src/main/java/mqttloader/client/AbstractPublisher.java b/src/main/java/mqttloader/client/AbstractPublisher.java new file mode 100644 index 0000000..0343c49 --- /dev/null +++ b/src/main/java/mqttloader/client/AbstractPublisher.java @@ -0,0 +1,125 @@ +/* + * Copyright 2020 Distributed Systems Group + * + *

Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mqttloader.client; + +import static mqttloader.Constants.PUB_CLIENT_ID_PREFIX; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import mqttloader.Loader; + +public abstract class AbstractPublisher extends AbstractClient implements Runnable { + protected final String topic; + protected final int payloadSize; + protected int numMessage; + protected final int pubInterval; + + protected ScheduledExecutorService service; + protected ScheduledFuture future; + + protected volatile boolean cancelled = false; + + public AbstractPublisher(int clientNumber, String topic, int payloadSize, int numMessage, int pubInterval) { + super(PUB_CLIENT_ID_PREFIX + String.format("%05d", clientNumber)); + this.topic = topic; + this.payloadSize = payloadSize; + this.numMessage = numMessage; + this.pubInterval = pubInterval; + } + + public void start(long delay) { + service = Executors.newSingleThreadScheduledExecutor(); + if(pubInterval==0){ + future = service.schedule(this, delay, TimeUnit.MILLISECONDS); + }else{ + future = service.scheduleAtFixedRate(this, delay, pubInterval, TimeUnit.MILLISECONDS); + } + } + + @Override + public void run() { + if(pubInterval==0){ + continuousRun(); + }else{ + periodicalRun(); + } + } + + private void continuousRun() { + for(int i=0;i 0) { + if(isConnected()) { + publish(); + } else { + Loader.logger.warning("On sending publish, client was not connected: "+clientId); + } + + numMessage--; + if(numMessage==0){ + Loader.logger.info("Publisher finishes to send publish: "+clientId); + Loader.countDownLatch.countDown(); + } + } + } + + protected void recordSend(long currentTime) { + StringBuilder sb = new StringBuilder(); + sb.append(currentTime); + sb.append(","); + sb.append(clientId); + sb.append(",S,"); + Loader.queue.offer(new String(sb)); + + Loader.logger.fine("Published a message (" + topic + "): "+clientId); + } + + protected void terminateTasks() { + if(!future.isDone()) { + cancelled = true; + future.cancel(false); + } + + service.shutdown(); + try { + service.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + protected abstract void publish(); + protected abstract boolean isConnected(); +} diff --git a/src/main/java/mqttloader/client/AbstractSubscriber.java b/src/main/java/mqttloader/client/AbstractSubscriber.java new file mode 100644 index 0000000..0a48a0a --- /dev/null +++ b/src/main/java/mqttloader/client/AbstractSubscriber.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020 Distributed Systems Group + * + *

Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mqttloader.client; + +import static mqttloader.Constants.SUB_CLIENT_ID_PREFIX; + +import java.nio.ByteBuffer; + +import mqttloader.Loader; +import mqttloader.Util; + +public abstract class AbstractSubscriber extends AbstractClient { + public AbstractSubscriber(int clientNumber) { + super(SUB_CLIENT_ID_PREFIX + String.format("%05d", clientNumber)); + } + + protected void recordReceive(String topic, byte[] payload) { + long currentTime = Util.getCurrentTimeMillis(); + long pubTime = ByteBuffer.wrap(payload).getLong(); + + int latency = (int)(currentTime - pubTime); + if (latency < 0) { + // If running MQTTLoader on multiple machines, a slight time error may cause a negative value of latency. + latency = 0; + Loader.logger.fine("Negative value of latency is converted to zero."); + } + + StringBuilder sb = new StringBuilder(); + sb.append(currentTime); + sb.append(","); + sb.append(clientId); + sb.append(",R,"); + sb.append(latency); + Loader.queue.offer(new String(sb)); + + Loader.lastRecvTime = currentTime; + + Loader.logger.fine("Received a message (" + topic + "): "+clientId); + } +} diff --git a/src/main/java/mqttloader/client/Publisher.java b/src/main/java/mqttloader/client/Publisher.java index ee1e865..6a35532 100644 --- a/src/main/java/mqttloader/client/Publisher.java +++ b/src/main/java/mqttloader/client/Publisher.java @@ -16,13 +16,6 @@ package mqttloader.client; -import static mqttloader.Constants.PUB_CLIENT_ID_PREFIX; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - import mqttloader.Loader; import mqttloader.Util; import org.eclipse.paho.mqttv5.client.MqttClient; @@ -30,29 +23,15 @@ import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; -public class Publisher implements Runnable, IClient { +public class Publisher extends AbstractPublisher { private MqttClient client; - private final String clientId; - private final String topic; - private final int payloadSize; - private int numMessage; - private final int pubInterval; private MqttMessage message = new MqttMessage(); - private ScheduledExecutorService service; - private ScheduledFuture future; - - private volatile boolean cancelled = false; - public Publisher(int clientNumber, String broker, int qos, boolean retain, String topic, int payloadSize, int numMessage, int pubInterval) { + super(clientNumber, topic, payloadSize, numMessage, pubInterval); message.setQos(qos); message.setRetained(retain); - this.topic = topic; - this.payloadSize = payloadSize; - this.numMessage = numMessage; - this.pubInterval = pubInterval; - clientId = PUB_CLIENT_ID_PREFIX + String.format("%05d", clientNumber); MqttConnectionOptions options = new MqttConnectionOptions(); try { client = new MqttClient(broker, clientId); @@ -65,58 +44,7 @@ public Publisher(int clientNumber, String broker, int qos, boolean retain, Strin } @Override - public void start(long delay) { - service = Executors.newSingleThreadScheduledExecutor(); - if(pubInterval==0){ - future = service.schedule(this, delay, TimeUnit.MILLISECONDS); - }else{ - future = service.scheduleAtFixedRate(this, delay, pubInterval, TimeUnit.MILLISECONDS); - } - } - - @Override - public void run() { - if(pubInterval==0){ - continuousRun(); - }else{ - periodicalRun(); - } - } - - public void continuousRun() { - for(int i=0;i 0) { - if(client.isConnected()) { - publish(); - } else { - Loader.logger.warning("On sending publish, client was not connected: "+clientId); - } - - numMessage--; - if(numMessage==0){ - Loader.logger.info("Publisher finishes to send publish: "+clientId); - Loader.countDownLatch.countDown(); - } - } - } - - public void publish() { + protected void publish() { long currentTime = Util.getCurrentTimeMillis(); message.setPayload(Util.genPayloads(payloadSize, currentTime)); try { @@ -126,29 +54,17 @@ public void publish() { me.printStackTrace(); } - StringBuilder sb = new StringBuilder(); - sb.append(currentTime); - sb.append(","); - sb.append(clientId); - sb.append(",S,"); - Loader.queue.offer(new String(sb)); + recordSend(currentTime); + } - Loader.logger.fine("Published a message (" + topic + "): "+clientId); + @Override + protected boolean isConnected() { + return client.isConnected(); } @Override public void disconnect() { - if(!future.isDone()) { - cancelled = true; - future.cancel(false); - } - - service.shutdown(); - try { - service.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } + terminateTasks(); if (client.isConnected()) { try { @@ -159,9 +75,4 @@ public void disconnect() { } } } - - @Override - public String getClientId() { - return clientId; - } } diff --git a/src/main/java/mqttloader/client/PublisherV3.java b/src/main/java/mqttloader/client/PublisherV3.java index 69dbb23..af09d06 100644 --- a/src/main/java/mqttloader/client/PublisherV3.java +++ b/src/main/java/mqttloader/client/PublisherV3.java @@ -16,13 +16,6 @@ package mqttloader.client; -import static mqttloader.Constants.PUB_CLIENT_ID_PREFIX; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - import mqttloader.Loader; import mqttloader.Util; import org.eclipse.paho.client.mqttv3.MqttClient; @@ -30,29 +23,15 @@ import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; -public class PublisherV3 implements Runnable, IClient { +public class PublisherV3 extends AbstractPublisher { private MqttClient client; - private final String clientId; - private final String topic; - private final int payloadSize; - private int numMessage; - private final int pubInterval; private MqttMessage message = new MqttMessage(); - private ScheduledExecutorService service; - private ScheduledFuture future; - - private volatile boolean cancelled = false; - public PublisherV3(int clientNumber, String broker, int qos, boolean retain, String topic, int payloadSize, int numMessage, int pubInterval) { + super(clientNumber, topic, payloadSize, numMessage, pubInterval); message.setQos(qos); message.setRetained(retain); - this.topic = topic; - this.payloadSize = payloadSize; - this.numMessage = numMessage; - this.pubInterval = pubInterval; - clientId = PUB_CLIENT_ID_PREFIX + String.format("%05d", clientNumber); MqttConnectOptions options = new MqttConnectOptions(); options.setMqttVersion(4); try { @@ -66,58 +45,7 @@ public PublisherV3(int clientNumber, String broker, int qos, boolean retain, Str } @Override - public void start(long delay) { - service = Executors.newSingleThreadScheduledExecutor(); - if(pubInterval==0){ - future = service.schedule(this, delay, TimeUnit.MILLISECONDS); - }else{ - future = service.scheduleAtFixedRate(this, delay, pubInterval, TimeUnit.MILLISECONDS); - } - } - - @Override - public void run() { - if(pubInterval==0){ - continuousRun(); - }else{ - periodicalRun(); - } - } - - public void continuousRun() { - for(int i=0;i 0) { - if (client.isConnected()) { - publish(); - } else { - Loader.logger.warning("On sending publish, client was not connected: "+clientId); - } - - numMessage--; - if(numMessage==0){ - Loader.logger.info("Publisher finishes to send publish: "+clientId); - Loader.countDownLatch.countDown(); - } - } - } - - public void publish() { + protected void publish() { long currentTime = Util.getCurrentTimeMillis(); message.setPayload(Util.genPayloads(payloadSize, currentTime)); try { @@ -127,29 +55,17 @@ public void publish() { me.printStackTrace(); } - StringBuilder sb = new StringBuilder(); - sb.append(currentTime); - sb.append(","); - sb.append(clientId); - sb.append(",S,"); - Loader.queue.offer(new String(sb)); + recordSend(currentTime); + } - Loader.logger.fine("Published a message (" + topic + "): "+clientId); + @Override + protected boolean isConnected() { + return client.isConnected(); } @Override public void disconnect() { - if(!future.isDone()) { - cancelled = true; - future.cancel(false); - } - - service.shutdown(); - try { - service.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } + terminateTasks(); if (client.isConnected()) { try { @@ -160,9 +76,4 @@ public void disconnect() { } } } - - @Override - public String getClientId() { - return clientId; - } } diff --git a/src/main/java/mqttloader/client/Subscriber.java b/src/main/java/mqttloader/client/Subscriber.java index 39fa070..f9008ce 100644 --- a/src/main/java/mqttloader/client/Subscriber.java +++ b/src/main/java/mqttloader/client/Subscriber.java @@ -16,12 +16,7 @@ package mqttloader.client; -import static mqttloader.Constants.SUB_CLIENT_ID_PREFIX; - -import java.nio.ByteBuffer; - import mqttloader.Loader; -import mqttloader.Util; import org.eclipse.paho.mqttv5.client.IMqttToken; import org.eclipse.paho.mqttv5.client.MqttCallback; import org.eclipse.paho.mqttv5.client.MqttClient; @@ -31,12 +26,11 @@ import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; -public class Subscriber implements MqttCallback, IClient { +public class Subscriber extends AbstractSubscriber implements MqttCallback { private MqttClient client; - private final String clientId; public Subscriber(int clientNumber, String broker, int qos, boolean shSub, String topic) { - clientId = SUB_CLIENT_ID_PREFIX + String.format("%05d", clientNumber); + super(clientNumber); MqttConnectionOptions options = new MqttConnectionOptions(); try { client = new MqttClient(broker, clientId); @@ -57,10 +51,6 @@ public Subscriber(int clientNumber, String broker, int qos, boolean shSub, Strin } } - @Override - public void start(long delay){ - } - @Override public void disconnect() { if (client.isConnected()) { @@ -73,11 +63,6 @@ public void disconnect() { } } - @Override - public String getClientId() { - return clientId; - } - @Override public void disconnected(MqttDisconnectResponse disconnectResponse) {} @@ -86,26 +71,7 @@ public void mqttErrorOccurred(MqttException exception) {} @Override public void messageArrived(String topic, MqttMessage message) throws Exception { - long currentTime = Util.getCurrentTimeMillis(); - long pubTime = ByteBuffer.wrap(message.getPayload()).getLong(); - int latency = (int)(currentTime - pubTime); - if (latency < 0) { - // If running MQTTLoader on multiple machines, a slight time error may cause a negative value of latency. - latency = 0; - Loader.logger.fine("Negative value of latency is converted to zero."); - } - - StringBuilder sb = new StringBuilder(); - sb.append(currentTime); - sb.append(","); - sb.append(clientId); - sb.append(",R,"); - sb.append(latency); - Loader.queue.offer(new String(sb)); - - Loader.lastRecvTime = currentTime; - - Loader.logger.fine("Received a message (" + topic + "): "+clientId); + recordReceive(topic, message.getPayload()); } @Override diff --git a/src/main/java/mqttloader/client/SubscriberV3.java b/src/main/java/mqttloader/client/SubscriberV3.java index 41b42c5..bd83860 100644 --- a/src/main/java/mqttloader/client/SubscriberV3.java +++ b/src/main/java/mqttloader/client/SubscriberV3.java @@ -16,12 +16,7 @@ package mqttloader.client; -import static mqttloader.Constants.SUB_CLIENT_ID_PREFIX; - -import java.nio.ByteBuffer; - import mqttloader.Loader; -import mqttloader.Util; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; @@ -30,12 +25,11 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; -public class SubscriberV3 implements MqttCallback, IClient { +public class SubscriberV3 extends AbstractSubscriber implements MqttCallback { private MqttClient client; - private final String clientId; public SubscriberV3(int clientNumber, String broker, int qos, String topic) { - clientId = SUB_CLIENT_ID_PREFIX + String.format("%05d", clientNumber); + super(clientNumber); MqttConnectOptions options = new MqttConnectOptions(); options.setMqttVersion(4); try { @@ -51,10 +45,6 @@ public SubscriberV3(int clientNumber, String broker, int qos, String topic) { } } - @Override - public void start(long delay){ - } - @Override public void disconnect() { if (client.isConnected()) { @@ -67,36 +57,12 @@ public void disconnect() { } } - @Override - public String getClientId() { - return clientId; - } - @Override public void connectionLost(Throwable cause) {} @Override public void messageArrived(String topic, MqttMessage message) throws Exception { - long currentTime = Util.getCurrentTimeMillis(); - long pubTime = ByteBuffer.wrap(message.getPayload()).getLong(); - int latency = (int)(currentTime - pubTime); - if (latency < 0) { - // If running MQTTLoader on multiple machines, a slight time error may cause a negative value of latency. - latency = 0; - Loader.logger.fine("Negative value of latency is converted to zero."); - } - - StringBuilder sb = new StringBuilder(); - sb.append(currentTime); - sb.append(","); - sb.append(clientId); - sb.append(",R,"); - sb.append(latency); - Loader.queue.offer(new String(sb)); - - Loader.lastRecvTime = currentTime; - - Loader.logger.fine("Received a message (" + topic + "): "+clientId); + recordReceive(topic, message.getPayload()); } @Override From 7d617746d6bd0cd996f85793ff5d1a2625a07b99 Mon Sep 17 00:00:00 2001 From: nabon Date: Wed, 9 Sep 2020 12:54:39 +0900 Subject: [PATCH 06/14] Add in-memory mode. --- src/main/java/mqttloader/Constants.java | 3 +- src/main/java/mqttloader/FileWriter.java | 78 --------- src/main/java/mqttloader/Loader.java | 125 +++++++------- src/main/java/mqttloader/Record.java | 61 +++++++ src/main/java/mqttloader/Recorder.java | 158 ++++++++++++++++++ .../mqttloader/client/AbstractPublisher.java | 9 +- .../mqttloader/client/AbstractSubscriber.java | 11 +- 7 files changed, 282 insertions(+), 163 deletions(-) delete mode 100644 src/main/java/mqttloader/FileWriter.java create mode 100644 src/main/java/mqttloader/Record.java create mode 100644 src/main/java/mqttloader/Recorder.java diff --git a/src/main/java/mqttloader/Constants.java b/src/main/java/mqttloader/Constants.java index 031d7cc..15b057f 100644 --- a/src/main/java/mqttloader/Constants.java +++ b/src/main/java/mqttloader/Constants.java @@ -20,7 +20,7 @@ public class Constants { public static final String FILE_NAME_PREFIX = "mqttloader_"; public static final String SUB_CLIENT_ID_PREFIX = "ml-s-"; public static final String PUB_CLIENT_ID_PREFIX = "ml-p-"; - public static final String STOP_SIGNAL = "TERMINATED"; + public static final Record STOP_SIGNAL = new Record(); public static final int MILLISECOND_IN_NANO = 1000000; public static final int SECOND_IN_NANO = 1000000; public static final int SECOND_IN_MILLI = 1000; @@ -44,6 +44,7 @@ public enum Opt { EXEC_TIME("et", "exectime", true, "Execution time in seconds.", "60"), LOG_LEVEL("l", "log", true, "Log level (SEVERE/WARNING/INFO/ALL).", "WARNING"), NTP("n", "ntp", true, "NTP server. E.g., ntp.nict.jp", null), + IN_MEMORY("mm", "inmemory", false, "Enable in-memory mode", null), HELP("h", "help", false, "Display help.", null); private String name; diff --git a/src/main/java/mqttloader/FileWriter.java b/src/main/java/mqttloader/FileWriter.java deleted file mode 100644 index 53471d0..0000000 --- a/src/main/java/mqttloader/FileWriter.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2020 Distributed Systems Group - * - *

Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *

http://www.apache.org/licenses/LICENSE-2.0 - * - *

Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package mqttloader; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; - -public class FileWriter implements Runnable { - private File file; - private FileOutputStream fos = null; - private OutputStreamWriter osw = null; - private BufferedWriter bw = null; - - public FileWriter(File file) { - try { - fos = new FileOutputStream(file, true); - } catch (FileNotFoundException e) { - e.printStackTrace(); - } - osw = new OutputStreamWriter(fos); - bw = new BufferedWriter(osw); - } - - @Override - public void run() { - String record = null; - while (true) { - try { - record = Loader.queue.take(); - if(record != null) { - if(record.equals(Constants.STOP_SIGNAL)) { - break; - } - bw.write(record); - bw.newLine(); - } - } catch (InterruptedException | IOException e) { - e.printStackTrace(); - } - } - - try{ - bw.flush(); - bw.close(); - osw.close(); - fos.close(); - } catch(IOException e){ - e.printStackTrace(); - } finally { - try { - if(bw != null) bw.close(); - if(osw != null) osw.close(); - if(fos != null) fos.close(); - } catch (IOException e) { - e.printStackTrace(); - System.exit(1); - } - } - } -} diff --git a/src/main/java/mqttloader/Loader.java b/src/main/java/mqttloader/Loader.java index 4d7ff8d..28e1c81 100644 --- a/src/main/java/mqttloader/Loader.java +++ b/src/main/java/mqttloader/Loader.java @@ -66,8 +66,9 @@ public class Loader { public static volatile long startNanoTime; private long endTime; public static volatile long lastRecvTime; - public static ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1000000); + public static ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1000000); private File file; + private Recorder recorder; public static CountDownLatch countDownLatch; public static Logger logger = Logger.getLogger(Loader.class.getName()); private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS z"); @@ -90,10 +91,13 @@ public Loader(String[] args) { logger.info("Preparing clients."); prepareClients(); - file = getFile(); - logger.info("Data file is placed at: "+file.getAbsolutePath()); - FileWriter writer = new FileWriter(file); - Thread fileThread = new Thread(writer); + boolean inMemory = cmd.hasOption(Opt.IN_MEMORY.getName()); + if(!inMemory) { + file = getFile(); + logger.info("Data file is placed at: "+file.getAbsolutePath()); + } + recorder = new Recorder(file, inMemory); + Thread fileThread = new Thread(recorder); fileThread.start(); logger.info("Starting measurement."); @@ -305,72 +309,58 @@ private void disconnectClients() { } private void calcResult() { - TreeMap sendThroughputs = new TreeMap<>(); - TreeMap recvThroughputs = new TreeMap<>(); - TreeMap> latencies = new TreeMap<>(); - - FileInputStream fis = null; - InputStreamReader isr = null; - BufferedReader br = null; - try{ - fis = new FileInputStream(file); - isr = new InputStreamReader(fis); - br = new BufferedReader(isr); - - String str; - while ((str = br.readLine()) != null) { - StringTokenizer st = new StringTokenizer(str, ","); - long timestamp = Long.valueOf(st.nextToken()); - st.nextToken(); //client ID - boolean isSendRecord = st.nextToken().equals("S") ? true : false; - int latency = 0; - if (st.hasMoreTokens()) { - latency = Integer.valueOf(st.nextToken()); - } - - int elapsedSecond = (int)((timestamp-startTime)/1000); - if(isSendRecord) { - if(sendThroughputs.containsKey(elapsedSecond)) { - sendThroughputs.put(elapsedSecond, sendThroughputs.get(elapsedSecond)+1); - } else { - sendThroughputs.put(elapsedSecond, 1); - } - } else { - if(recvThroughputs.containsKey(elapsedSecond)) { - recvThroughputs.put(elapsedSecond, recvThroughputs.get(elapsedSecond)+1); - } else { - recvThroughputs.put(elapsedSecond, 1); + if(!cmd.hasOption(Opt.IN_MEMORY.getName())) { + FileInputStream fis = null; + InputStreamReader isr = null; + BufferedReader br = null; + try{ + fis = new FileInputStream(file); + isr = new InputStreamReader(fis); + br = new BufferedReader(isr); + + String str; + while ((str = br.readLine()) != null) { + StringTokenizer st = new StringTokenizer(str, ","); + long timestamp = Long.valueOf(st.nextToken()); + String clientId = st.nextToken(); //client ID + boolean isSend = st.nextToken().equals("S") ? true : false; + int latency = -1; + if (st.hasMoreTokens()) { + latency = Integer.valueOf(st.nextToken()); } - if(!latencies.containsKey(elapsedSecond)) { - latencies.put(elapsedSecond, new ArrayList()); - } - latencies.get(elapsedSecond).add(latency); + recorder.recordInMemory(new Record(timestamp, clientId, isSend, latency)); } - } - br.close(); - isr.close(); - fis.close(); - } catch(IOException e){ - e.printStackTrace(); - } finally { - try { - if(br != null) br.close(); - if(isr != null) isr.close(); - if(fis != null) fis.close(); - } catch (IOException e) { + br.close(); + isr.close(); + fis.close(); + } catch(IOException e){ e.printStackTrace(); - System.exit(1); + } finally { + try { + if(br != null) br.close(); + if(isr != null) isr.close(); + if(fis != null) fis.close(); + } catch (IOException e) { + e.printStackTrace(); + System.exit(1); + } } } + TreeMap sendThroughputs = recorder.getSendThroughputs(); + TreeMap recvThroughputs = recorder.getRecvThroughputs(); + TreeMap latencySums = recorder.getLatencySums(); + TreeMap latencyMaxs = recorder.getLatencyMaxs(); + int rampup = Integer.valueOf(cmd.getOptionValue(Opt.RAMP_UP.getName(), Opt.RAMP_UP.getDefaultValue())); int rampdown = Integer.valueOf(cmd.getOptionValue(Opt.RAMP_DOWN.getName(), Opt.RAMP_DOWN.getDefaultValue())); trimTreeMap(sendThroughputs, rampup, rampdown); trimTreeMap(recvThroughputs, rampup, rampdown); - trimTreeMap(latencies, rampup, rampdown); + trimTreeMap(latencySums, rampup, rampdown); + trimTreeMap(latencyMaxs, rampup, rampdown); paddingTreeMap(sendThroughputs); paddingTreeMap(recvThroughputs); @@ -382,18 +372,17 @@ private void calcResult() { printThroughput(recvThroughputs, false); int maxLt = 0; - long sumLt = 0; - int count = 0; - for(ArrayList latencyList: latencies.values()){ - for(int latency: latencyList){ - if(latency > maxLt) { - maxLt = latency; - } - sumLt += latency; - count++; + double aveLt = 0; + long numMsg = 0; + for(int elapsedSecond: latencySums.keySet()) { + if(latencyMaxs.get(elapsedSecond) > maxLt) { + maxLt = latencyMaxs.get(elapsedSecond); } + int numInSec = recvThroughputs.get(elapsedSecond); + numMsg += numInSec; + double aveInSec = (double)latencySums.get(elapsedSecond)/numInSec; + aveLt = aveLt + ((aveInSec-aveLt)*numInSec)/numMsg; } - double aveLt = count>0 ? (double)sumLt/count : 0; System.out.println("Maximum latency[ms]: "+maxLt); System.out.println("Average latency[ms]: "+aveLt); diff --git a/src/main/java/mqttloader/Record.java b/src/main/java/mqttloader/Record.java new file mode 100644 index 0000000..8161476 --- /dev/null +++ b/src/main/java/mqttloader/Record.java @@ -0,0 +1,61 @@ +/* + * Copyright 2020 Distributed Systems Group + * + *

Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mqttloader; + +public class Record { + private long timestamp; + private String clientId; + private boolean isSend; + private int latency; + + private boolean isStopSignal = false; + + public Record(long timestamp, String clientId, boolean isSend, int latency) { + this.timestamp = timestamp; + this.clientId = clientId; + this.isSend = isSend; + this.latency = latency; + } + + public Record(long timestamp, String clientId, boolean isSend) { + this(timestamp, clientId, isSend, -1); + } + + public Record() { + this.isStopSignal = true; + } + + public long getTimestamp() { + return timestamp; + } + + public String getClientId() { + return clientId; + } + + public boolean isSend() { + return isSend; + } + + public int getLatency() { + return latency; + } + + public boolean isStopSignal() { + return isStopSignal; + } +} diff --git a/src/main/java/mqttloader/Recorder.java b/src/main/java/mqttloader/Recorder.java new file mode 100644 index 0000000..e5f50c0 --- /dev/null +++ b/src/main/java/mqttloader/Recorder.java @@ -0,0 +1,158 @@ +/* + * Copyright 2020 Distributed Systems Group + * + *

Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mqttloader; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.TreeMap; + +public class Recorder implements Runnable { + private final boolean inMemory; + + private File file; + private FileOutputStream fos = null; + private OutputStreamWriter osw = null; + private BufferedWriter bw = null; + + private TreeMap sendThroughputs = new TreeMap<>(); + private TreeMap recvThroughputs = new TreeMap<>(); + private TreeMap latencySums = new TreeMap<>(); + private TreeMap latencyMaxs = new TreeMap<>(); + + public Recorder(File file, boolean inMemory) { + this.inMemory = inMemory; + if(!inMemory) { + try { + fos = new FileOutputStream(file, true); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + osw = new OutputStreamWriter(fos); + bw = new BufferedWriter(osw); + } + } + + @Override + public void run() { + Record record = null; + while (true) { + try { + record = Loader.queue.take(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if(record != null) { + if(record.isStopSignal()) { + break; + } + + if(inMemory) { + recordInMemory(record); + } else { + StringBuilder sb = new StringBuilder(); + sb.append(record.getTimestamp()); + sb.append(","); + sb.append(record.getClientId()); + if(record.isSend()) { + sb.append(",S,"); + } else { + sb.append(",R,"); + sb.append(record.getLatency()); + } + + try { + bw.write(new String(sb)); + bw.newLine(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + if(!inMemory) { + try{ + bw.flush(); + bw.close(); + osw.close(); + fos.close(); + } catch(IOException e){ + e.printStackTrace(); + } finally { + try { + if(bw != null) bw.close(); + if(osw != null) osw.close(); + if(fos != null) fos.close(); + } catch (IOException e) { + e.printStackTrace(); + System.exit(1); + } + } + } + } + + public void recordInMemory(Record record) { + int elapsedSecond = (int)((record.getTimestamp()-Loader.startTime)/1000); + if(record.isSend()) { + if(sendThroughputs.containsKey(elapsedSecond)) { + sendThroughputs.put(elapsedSecond, sendThroughputs.get(elapsedSecond)+1); + } else { + sendThroughputs.put(elapsedSecond, 1); + } + } else { + if(recvThroughputs.containsKey(elapsedSecond)) { + recvThroughputs.put(elapsedSecond, recvThroughputs.get(elapsedSecond)+1); + } else { + recvThroughputs.put(elapsedSecond, 1); + } + + if(latencySums.containsKey(elapsedSecond)) { + latencySums.put(elapsedSecond, latencySums.get(elapsedSecond)+(long)record.getLatency()); + } else { + latencySums.put(elapsedSecond, (long)record.getLatency()); + } + + if(latencyMaxs.containsKey(elapsedSecond)) { + if(latencyMaxs.get(elapsedSecond) < record.getLatency()) { + latencyMaxs.put(elapsedSecond, record.getLatency()); + } + } else { + latencyMaxs.put(elapsedSecond, record.getLatency()); + } + } + } + + public TreeMap getSendThroughputs() { + return sendThroughputs; + } + + public TreeMap getRecvThroughputs() { + return recvThroughputs; + } + + public TreeMap getLatencySums() { + return latencySums; + } + + public TreeMap getLatencyMaxs() { + return latencyMaxs; + } +} diff --git a/src/main/java/mqttloader/client/AbstractPublisher.java b/src/main/java/mqttloader/client/AbstractPublisher.java index 0343c49..deff1aa 100644 --- a/src/main/java/mqttloader/client/AbstractPublisher.java +++ b/src/main/java/mqttloader/client/AbstractPublisher.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import mqttloader.Loader; +import mqttloader.Record; public abstract class AbstractPublisher extends AbstractClient implements Runnable { protected final String topic; @@ -96,13 +97,7 @@ private void periodicalRun() { } protected void recordSend(long currentTime) { - StringBuilder sb = new StringBuilder(); - sb.append(currentTime); - sb.append(","); - sb.append(clientId); - sb.append(",S,"); - Loader.queue.offer(new String(sb)); - + Loader.queue.offer(new Record(currentTime, clientId, true)); Loader.logger.fine("Published a message (" + topic + "): "+clientId); } diff --git a/src/main/java/mqttloader/client/AbstractSubscriber.java b/src/main/java/mqttloader/client/AbstractSubscriber.java index 0a48a0a..97f5399 100644 --- a/src/main/java/mqttloader/client/AbstractSubscriber.java +++ b/src/main/java/mqttloader/client/AbstractSubscriber.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import mqttloader.Loader; +import mqttloader.Record; import mqttloader.Util; public abstract class AbstractSubscriber extends AbstractClient { @@ -39,16 +40,8 @@ protected void recordReceive(String topic, byte[] payload) { Loader.logger.fine("Negative value of latency is converted to zero."); } - StringBuilder sb = new StringBuilder(); - sb.append(currentTime); - sb.append(","); - sb.append(clientId); - sb.append(",R,"); - sb.append(latency); - Loader.queue.offer(new String(sb)); - + Loader.queue.offer(new Record(currentTime, clientId, false, latency)); Loader.lastRecvTime = currentTime; - Loader.logger.fine("Received a message (" + topic + "): "+clientId); } } From 6789ac4b5d073877c323ca53aa008a0efe5d6a3b Mon Sep 17 00:00:00 2001 From: nabon Date: Wed, 9 Sep 2020 16:29:45 +0900 Subject: [PATCH 07/14] Change log format and messages. --- build.gradle | 3 ++- src/dist/logging.properties | 4 ++-- src/main/java/mqttloader/Constants.java | 3 ++- src/main/java/mqttloader/Loader.java | 24 ++++--------------- src/main/java/mqttloader/LogFormatter.java | 7 +++--- src/main/java/mqttloader/RecvTimeoutTask.java | 2 +- .../mqttloader/client/AbstractPublisher.java | 12 +++++----- .../mqttloader/client/AbstractSubscriber.java | 2 +- .../java/mqttloader/client/Publisher.java | 7 +++--- .../java/mqttloader/client/PublisherV3.java | 7 +++--- .../java/mqttloader/client/Subscriber.java | 8 +++---- .../java/mqttloader/client/SubscriberV3.java | 8 +++---- 12 files changed, 36 insertions(+), 51 deletions(-) diff --git a/build.gradle b/build.gradle index c028251..c339989 100644 --- a/build.gradle +++ b/build.gradle @@ -18,7 +18,8 @@ dependencies { } run { - args '-h'.split('\\s+') +// args '-h'.split('\\s+') + args '-b tcp://192.168.47.129:1883 -p 1 -s 1 -m 10 -mm'.split('\\s+') } CreateStartScripts startScripts = project.startScripts diff --git a/src/dist/logging.properties b/src/dist/logging.properties index 753a897..413968a 100644 --- a/src/dist/logging.properties +++ b/src/dist/logging.properties @@ -1,8 +1,8 @@ mqttloader.handlers=java.util.logging.MemoryHandler -mqttloader.level=WARNING +mqttloader.level=INFO #mqttloader.level=SEVERE -#mqttloader.level=INFO +#mqttloader.level=WARNING #mqttloader.level=FINEST org.eclipse.paho.mqttv5.client.handlers=java.util.logging.MemoryHandler diff --git a/src/main/java/mqttloader/Constants.java b/src/main/java/mqttloader/Constants.java index 15b057f..19781da 100644 --- a/src/main/java/mqttloader/Constants.java +++ b/src/main/java/mqttloader/Constants.java @@ -17,6 +17,7 @@ package mqttloader; public class Constants { + public static final String VERSION = "0.7.0"; public static final String FILE_NAME_PREFIX = "mqttloader_"; public static final String SUB_CLIENT_ID_PREFIX = "ml-s-"; public static final String PUB_CLIENT_ID_PREFIX = "ml-p-"; @@ -42,7 +43,7 @@ public enum Opt { INTERVAL("i", "interval", true, "Publish interval in milliseconds.", "0"), SUB_TIMEOUT("st", "subtimeout", true, "Subscribers' timeout in seconds.", "5"), EXEC_TIME("et", "exectime", true, "Execution time in seconds.", "60"), - LOG_LEVEL("l", "log", true, "Log level (SEVERE/WARNING/INFO/ALL).", "WARNING"), + LOG_LEVEL("l", "log", true, "Log level (SEVERE/WARNING/INFO/ALL).", "INFO"), NTP("n", "ntp", true, "NTP server. E.g., ntp.nict.jp", null), IN_MEMORY("mm", "inmemory", false, "Enable in-memory mode", null), HELP("h", "help", false, "Display help.", null); diff --git a/src/main/java/mqttloader/Loader.java b/src/main/java/mqttloader/Loader.java index 28e1c81..f5d8f84 100644 --- a/src/main/java/mqttloader/Loader.java +++ b/src/main/java/mqttloader/Loader.java @@ -78,7 +78,7 @@ public Loader(String[] args) { String logLevel = cmd.getOptionValue(Opt.LOG_LEVEL.getName(), Opt.LOG_LEVEL.getDefaultValue()); logger.setLevel(Level.parse(logLevel)); - logger.info("Starting mqttloader tool."); + logger.info("MQTTLoader version " + Constants.VERSION + " starting."); int numPub = Integer.valueOf(cmd.getOptionValue(Opt.NUM_PUB.getName(), Opt.NUM_PUB.getDefaultValue())); int numSub = Integer.valueOf(cmd.getOptionValue(Opt.NUM_SUB.getName(), Opt.NUM_SUB.getDefaultValue())); @@ -94,7 +94,7 @@ public Loader(String[] args) { boolean inMemory = cmd.hasOption(Opt.IN_MEMORY.getName()); if(!inMemory) { file = getFile(); - logger.info("Data file is placed at: "+file.getAbsolutePath()); + logger.info("Output file placed at: "+file.getAbsolutePath()); } recorder = new Recorder(file, inMemory); Thread fileThread = new Thread(recorder); @@ -138,7 +138,7 @@ public Loader(String[] args) { e.printStackTrace(); } - logger.info("Printing results."); + logger.info("Calculating results."); calcResult(); } @@ -218,10 +218,6 @@ private void prepareClients() { int pubInterval = Integer.valueOf(cmd.getOptionValue(Opt.INTERVAL.getName(), Opt.INTERVAL.getDefaultValue())); for(int i=0;i throughputs, boolean forP System.out.println("Number of received messages: "+sumMsg); } - System.out.print("Throughput[msg/s]: "); + System.out.print("Per second throughput[msg/s]: "); for(int elapsedSecond: throughputs.keySet()){ System.out.print(throughputs.get(elapsedSecond)); if(elapsedSecond - if (remainingTime <= 0) { - Loader.logger.info("Receiving messages on subscribers timed out."); + Loader.logger.info("Subscribers timed out."); countDownLatch.countDown(); } else { timer.schedule(new RecvTimeoutTask(timer, subTimeout), remainingTime); diff --git a/src/main/java/mqttloader/client/AbstractPublisher.java b/src/main/java/mqttloader/client/AbstractPublisher.java index deff1aa..e33abe1 100644 --- a/src/main/java/mqttloader/client/AbstractPublisher.java +++ b/src/main/java/mqttloader/client/AbstractPublisher.java @@ -66,17 +66,17 @@ public void run() { private void continuousRun() { for(int i=0;i Date: Wed, 9 Sep 2020 16:36:23 +0900 Subject: [PATCH 08/14] Add version info in displaying help. --- build.gradle | 3 +-- src/main/java/mqttloader/Constants.java | 4 ++-- src/main/java/mqttloader/Loader.java | 3 ++- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/build.gradle b/build.gradle index c339989..c028251 100644 --- a/build.gradle +++ b/build.gradle @@ -18,8 +18,7 @@ dependencies { } run { -// args '-h'.split('\\s+') - args '-b tcp://192.168.47.129:1883 -p 1 -s 1 -m 10 -mm'.split('\\s+') + args '-h'.split('\\s+') } CreateStartScripts startScripts = project.startScripts diff --git a/src/main/java/mqttloader/Constants.java b/src/main/java/mqttloader/Constants.java index 19781da..c3293d7 100644 --- a/src/main/java/mqttloader/Constants.java +++ b/src/main/java/mqttloader/Constants.java @@ -28,7 +28,7 @@ public class Constants { public enum Opt { BROKER("b", "broker", true, "Broker URL. E.g., tcp://127.0.0.1:1883", null, true), - VERSION("v", "version", true, "MQTT version (\"3\" for 3.1.1 or \"5\" for 5.0).", "5"), + MQTT_VERSION("v", "version", true, "MQTT version (\"3\" for 3.1.1 or \"5\" for 5.0).", "5"), NUM_PUB("p", "npub", true, "Number of publishers.", "1"), NUM_SUB("s", "nsub", true, "Number of subscribers.", "1"), PUB_QOS("pq", "pubqos", true, "QoS level of publishers (0/1/2).", "0"), @@ -45,7 +45,7 @@ public enum Opt { EXEC_TIME("et", "exectime", true, "Execution time in seconds.", "60"), LOG_LEVEL("l", "log", true, "Log level (SEVERE/WARNING/INFO/ALL).", "INFO"), NTP("n", "ntp", true, "NTP server. E.g., ntp.nict.jp", null), - IN_MEMORY("mm", "inmemory", false, "Enable in-memory mode", null), + IN_MEMORY("im", "inmemory", false, "Enable in-memory mode", null), HELP("h", "help", false, "Display help.", null); private String name; diff --git a/src/main/java/mqttloader/Loader.java b/src/main/java/mqttloader/Loader.java index f5d8f84..1537365 100644 --- a/src/main/java/mqttloader/Loader.java +++ b/src/main/java/mqttloader/Loader.java @@ -170,6 +170,7 @@ private void setOptions(String[] args) { } private void printHelp(Options options) { + System.out.println("MQTTLoader version " + Constants.VERSION); HelpFormatter help = new HelpFormatter(); help.setOptionComparator(null); help.printHelp(Loader.class.getName(), options, true); @@ -205,7 +206,7 @@ private File getFile() { private void prepareClients() { String broker = cmd.getOptionValue(Opt.BROKER.getName(), Opt.BROKER.getDefaultValue()); - int version = Integer.valueOf(cmd.getOptionValue(Opt.VERSION.getName(), Opt.VERSION.getDefaultValue())); + int version = Integer.valueOf(cmd.getOptionValue(Opt.MQTT_VERSION.getName(), Opt.MQTT_VERSION.getDefaultValue())); int numPub = Integer.valueOf(cmd.getOptionValue(Opt.NUM_PUB.getName(), Opt.NUM_PUB.getDefaultValue())); int numSub = Integer.valueOf(cmd.getOptionValue(Opt.NUM_SUB.getName(), Opt.NUM_SUB.getDefaultValue())); int pubQos = Integer.valueOf(cmd.getOptionValue(Opt.PUB_QOS.getName(), Opt.PUB_QOS.getDefaultValue())); From b85c41a36f342a76f7ef4d75b45165413a0cec9e Mon Sep 17 00:00:00 2001 From: nabon Date: Wed, 9 Sep 2020 17:10:18 +0900 Subject: [PATCH 09/14] Add parameter validation. --- src/main/java/mqttloader/Loader.java | 32 ++++++++++++++++--- .../{Publisher.java => PublisherV5.java} | 4 +-- .../{Subscriber.java => SubscriberV5.java} | 4 +-- 3 files changed, 32 insertions(+), 8 deletions(-) rename src/main/java/mqttloader/client/{Publisher.java => PublisherV5.java} (89%) rename src/main/java/mqttloader/client/{Subscriber.java => SubscriberV5.java} (91%) diff --git a/src/main/java/mqttloader/Loader.java b/src/main/java/mqttloader/Loader.java index 1537365..0f22100 100644 --- a/src/main/java/mqttloader/Loader.java +++ b/src/main/java/mqttloader/Loader.java @@ -45,9 +45,9 @@ import mqttloader.client.AbstractClient; import mqttloader.client.AbstractPublisher; -import mqttloader.client.Publisher; +import mqttloader.client.PublisherV5; import mqttloader.client.PublisherV3; -import mqttloader.client.Subscriber; +import mqttloader.client.SubscriberV5; import mqttloader.client.SubscriberV3; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -167,6 +167,27 @@ private void setOptions(String[] args) { printHelp(options); exit(1); } + + // Validate arguments. + int version = Integer.valueOf(cmd.getOptionValue(Opt.MQTT_VERSION.getName(), Opt.MQTT_VERSION.getDefaultValue())); + if(version != 3 && version != 5) { + logger.warning("\"-v\" parameter value must be 3 or 5."); + exit(1); + } + int pubqos = Integer.valueOf(cmd.getOptionValue(Opt.PUB_QOS.getName(), Opt.PUB_QOS.getDefaultValue())); + if(pubqos != 0 && pubqos != 1 && pubqos != 2) { + logger.warning("\"-pq\" parameter value must be 0 or 1 or 2."); + exit(1); + } + int subqos = Integer.valueOf(cmd.getOptionValue(Opt.SUB_QOS.getName(), Opt.SUB_QOS.getDefaultValue())); + if(subqos != 0 && subqos != 1 && subqos != 2) { + logger.warning("\"-sq\" parameter value must be 0 or 1 or 2."); + exit(1); + } + if(Integer.valueOf(cmd.getOptionValue(Opt.PAYLOAD.getName(), Opt.PAYLOAD.getDefaultValue())) < 8) { + logger.warning("\"-d\" parameter value must be equal to or larger than 8."); + exit(1); + } } private void printHelp(Options options) { @@ -206,6 +227,9 @@ private File getFile() { private void prepareClients() { String broker = cmd.getOptionValue(Opt.BROKER.getName(), Opt.BROKER.getDefaultValue()); + if(!broker.startsWith("tcp://") && !broker.startsWith("ssl://")) { + broker = "tcp://"+broker; + } int version = Integer.valueOf(cmd.getOptionValue(Opt.MQTT_VERSION.getName(), Opt.MQTT_VERSION.getDefaultValue())); int numPub = Integer.valueOf(cmd.getOptionValue(Opt.NUM_PUB.getName(), Opt.NUM_PUB.getDefaultValue())); int numSub = Integer.valueOf(cmd.getOptionValue(Opt.NUM_SUB.getName(), Opt.NUM_SUB.getDefaultValue())); @@ -220,7 +244,7 @@ private void prepareClients() { for(int i=0;i Date: Wed, 9 Sep 2020 17:36:19 +0900 Subject: [PATCH 10/14] Change client ID format to avoid duplicate IDs among different hosts. --- src/main/java/mqttloader/Constants.java | 5 +- src/main/java/mqttloader/Util.java | 48 ++++--------------- .../mqttloader/client/AbstractPublisher.java | 1 + 3 files changed, 12 insertions(+), 42 deletions(-) diff --git a/src/main/java/mqttloader/Constants.java b/src/main/java/mqttloader/Constants.java index c3293d7..eb7159e 100644 --- a/src/main/java/mqttloader/Constants.java +++ b/src/main/java/mqttloader/Constants.java @@ -19,8 +19,9 @@ public class Constants { public static final String VERSION = "0.7.0"; public static final String FILE_NAME_PREFIX = "mqttloader_"; - public static final String SUB_CLIENT_ID_PREFIX = "ml-s-"; - public static final String PUB_CLIENT_ID_PREFIX = "ml-p-"; + private static final String HOST_ID = Util.genRandomChars(4); + public static final String SUB_CLIENT_ID_PREFIX = "ml-"+HOST_ID+"-s-"; + public static final String PUB_CLIENT_ID_PREFIX = "ml-"+HOST_ID+"-p-"; public static final Record STOP_SIGNAL = new Record(); public static final int MILLISECOND_IN_NANO = 1000000; public static final int SECOND_IN_NANO = 1000000; diff --git a/src/main/java/mqttloader/Util.java b/src/main/java/mqttloader/Util.java index b18d29f..c940181 100644 --- a/src/main/java/mqttloader/Util.java +++ b/src/main/java/mqttloader/Util.java @@ -16,51 +16,19 @@ package mqttloader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; import java.nio.ByteBuffer; +import java.util.Random; public class Util { - public static void output(String filename, String str, boolean append){ - File file = new File(filename); + private static final String chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"; + private static Random random = new Random(); - if(!file.exists() || file == null){ - try { - file.createNewFile(); - } catch (IOException e) { - e.printStackTrace(); - } + public static String genRandomChars(int length) { + StringBuilder sb = new StringBuilder(); + for(int i=0;i Date: Wed, 9 Sep 2020 17:47:36 +0900 Subject: [PATCH 11/14] Change clients' persistence settings. --- src/main/java/mqttloader/client/PublisherV3.java | 4 +++- src/main/java/mqttloader/client/PublisherV5.java | 4 +++- src/main/java/mqttloader/client/SubscriberV3.java | 4 +++- src/main/java/mqttloader/client/SubscriberV5.java | 4 +++- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/main/java/mqttloader/client/PublisherV3.java b/src/main/java/mqttloader/client/PublisherV3.java index 2b309ef..b2545df 100644 --- a/src/main/java/mqttloader/client/PublisherV3.java +++ b/src/main/java/mqttloader/client/PublisherV3.java @@ -22,6 +22,7 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class PublisherV3 extends AbstractPublisher { private MqttClient client; @@ -34,8 +35,9 @@ public PublisherV3(int clientNumber, String broker, int qos, boolean retain, Str MqttConnectOptions options = new MqttConnectOptions(); options.setMqttVersion(4); + options.setCleanSession(true); try { - client = new MqttClient(broker, clientId); + client = new MqttClient(broker, clientId, new MemoryPersistence()); client.connect(options); Loader.logger.info("Publisher " + clientId + " connected."); } catch (MqttException e) { diff --git a/src/main/java/mqttloader/client/PublisherV5.java b/src/main/java/mqttloader/client/PublisherV5.java index 4b88166..947ba08 100644 --- a/src/main/java/mqttloader/client/PublisherV5.java +++ b/src/main/java/mqttloader/client/PublisherV5.java @@ -20,6 +20,7 @@ import mqttloader.Util; import org.eclipse.paho.mqttv5.client.MqttClient; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; @@ -33,8 +34,9 @@ public PublisherV5(int clientNumber, String broker, int qos, boolean retain, Str message.setRetained(retain); MqttConnectionOptions options = new MqttConnectionOptions(); + options.setCleanStart(true); try { - client = new MqttClient(broker, clientId); + client = new MqttClient(broker, clientId, new MemoryPersistence()); client.connect(options); Loader.logger.info("Publisher " + clientId + " connected."); } catch (MqttException e) { diff --git a/src/main/java/mqttloader/client/SubscriberV3.java b/src/main/java/mqttloader/client/SubscriberV3.java index 5373f48..48499c3 100644 --- a/src/main/java/mqttloader/client/SubscriberV3.java +++ b/src/main/java/mqttloader/client/SubscriberV3.java @@ -23,6 +23,7 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class SubscriberV3 extends AbstractSubscriber implements MqttCallback { @@ -32,8 +33,9 @@ public SubscriberV3(int clientNumber, String broker, int qos, String topic) { super(clientNumber); MqttConnectOptions options = new MqttConnectOptions(); options.setMqttVersion(4); + options.setCleanSession(true); try { - client = new MqttClient(broker, clientId); + client = new MqttClient(broker, clientId, new MemoryPersistence()); client.setCallback(this); client.connect(options); Loader.logger.info("Subscriber " + clientId + " connected."); diff --git a/src/main/java/mqttloader/client/SubscriberV5.java b/src/main/java/mqttloader/client/SubscriberV5.java index 8e5a2f5..89a6862 100644 --- a/src/main/java/mqttloader/client/SubscriberV5.java +++ b/src/main/java/mqttloader/client/SubscriberV5.java @@ -22,6 +22,7 @@ import org.eclipse.paho.mqttv5.client.MqttClient; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse; +import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; @@ -32,8 +33,9 @@ public class SubscriberV5 extends AbstractSubscriber implements MqttCallback { public SubscriberV5(int clientNumber, String broker, int qos, boolean shSub, String topic) { super(clientNumber); MqttConnectionOptions options = new MqttConnectionOptions(); + options.setCleanStart(true); try { - client = new MqttClient(broker, clientId); + client = new MqttClient(broker, clientId, new MemoryPersistence()); client.setCallback(this); client.connect(options); Loader.logger.info("Subscriber " + clientId + " connected."); From 0a542bd3f66fd40e5279bb05817ddf1cd52e68a5 Mon Sep 17 00:00:00 2001 From: nabon Date: Wed, 9 Sep 2020 19:45:52 +0900 Subject: [PATCH 12/14] Update usage documents. --- doc/usage_en.md | 132 ++++++++++++++++++++++-------------------------- doc/usage_jp.md | 132 +++++++++++++++++++++--------------------------- 2 files changed, 117 insertions(+), 147 deletions(-) diff --git a/doc/usage_en.md b/doc/usage_en.md index 1e6c796..edb4004 100644 --- a/doc/usage_en.md +++ b/doc/usage_en.md @@ -12,11 +12,12 @@ Download the archive file (zip or tar) from: https://github.com/dist-sys/mqttloa By extracting it, you can get the following files. ``` -mqttloader -+-- bin +mqttloader/ ++-- bin/ +-- mqttloader +-- mqttloader.bat -+-- lib ++-- lib/ ++-- logging.properties ``` Scripts for executing MQTTLoader is in *bin* directory. @@ -26,10 +27,11 @@ You can display the help by: `$ ./mqttloader -h` ``` +MQTTLoader version 0.7.0 usage: mqttloader.Loader -b [-v ] [-p ] [-s ] [-pq ] [-sq ] [-ss] [-r] [-t ] [-d ] [-m ] [-ru ] [-rd ] [-i ] [-st ] [-et ] [-l ] - [-n ] [-tf ] [-lf ] [-h] + [-n ] [-im] [-h] -b,--broker Broker URL. E.g., tcp://127.0.0.1:1883 -v,--version MQTT version ("3" for 3.1.1 or "5" for 5.0). : @@ -48,6 +50,7 @@ For example, the following command uses a public MQTT broker provided by HiveMQ. ### Run on multiple machines You can run MQTTLoader on multiple machines. + Running both publishers and subscribers on a single machine may cause mutual influence, e.g., the subscribers' receiving load lowers the publishers' throughput. By running publishers and subscribers separately on different machines, you can avoid such mutual influence. @@ -81,17 +84,16 @@ Please refer to **3. Parameteres of MQTTLoader** for more details of each parame | -ss | | Enable shared subscription. By default, it is disabled. Valid for only MQTT v5.0.
If it is enabled, a message is delivered to one of the subscribers. | | -r | | Enable retain for the messages sent by publishers. By default, it is disabled. | | -t \ | mqttloader-test-topic | Topic name to be used. | -| -d \ | 20 | The size of data (payload of messages to be published) in bytes. | +| -d \ | 20 | The size of data (payload of messages to be published) in bytes. It must be equal to or larger than 8. | | -m \ | 100 | Number of messages sent by **each** publisher. | | -ru \ | 0 | Ramp-up time in seconds.
See **4. How to read the results** for details. | | -rd \ | 0 | Ramp-down time in seconds.
See **4. How to read the results** for details. | | -i \ | 0 | Publish interval in milliseconds. | | -st \ | 5 | Timeout for receiving messages by subscribers in seconds. | | -et \ | 60 | Maximum execution time for measurement in seconds. | -| -l \ | WARNING | Log level.
Valid values are `SEVERE`/`WARNING`/`INFO`/`ALL`. | +| -l \ | INFO | Log level.
Valid values are `SEVERE`/`WARNING`/`INFO`/`ALL`. | | -n \ | (none) | URL of the NTP server. By setting this, time synchronization is enabled.
Ex. `ntp.nict.jp` | -| -tf \ | (none) | File name to write out the throughput data. By default, file output is disabled. | -| -lf \ | (none) | File name to write out the latency data. By default, file output is disabled. | +| -im \ | (none) | Run MQTTLoader by in-memory mode. By default, MQTTLoader writes out measurement records to a file. | | -h | | Display help. | MQTTLoader starts to terminate when all of the following conditions are met. @@ -99,10 +101,10 @@ MQTTLoader starts to terminate when all of the following conditions are met. - The time specified by the parameter `-st` elapses from the last time subscribers receive a message. MQTTLoader also starts to terminate when the time specified by the parameter `-et` elapses, even if there are in-flight messages. -Thus, `-et` should be long sufficiently. +Thus, if you want to test fixed number of messages, `-et` should be long sufficiently. If you want to do measurement with fixed time period, you can set the measurement time by the parameter `-et`. -Note that you need to set sufficiently large value to the parameter `-m`. +In this case, you need to set sufficiently large value to the parameter `-m`. By setting the parameter `-n`, MQTTLoader obtains the offset time from the specified NTP server and reflects it to calculate throughput and latency. It might be useful for running multiple MQTTLoader on different machines. @@ -116,86 +118,71 @@ MQTTLoader displays results like the following on standard output. Maximum throughput[msg/s]: 18622 Average throughput[msg/s]: 16666.666666666668 Number of published messages: 100000 -Throughput[msg/s]: 11955, 16427, 18430, 18030, 18622, 16536 +Per second throughput[msg/s]: 11955, 16427, 18430, 18030, 18622, 16536 -----Subscriber----- Maximum throughput[msg/s]: 18620 Average throughput[msg/s]: 16666.666666666668 Number of received messages: 100000 -Throughput[msg/s]: 11218, 16414, 18426, 18026, 18620, 17296 +Per second throughput[msg/s]: 11218, 16414, 18426, 18026, 18620, 17296 Maximum latency[ms]: 81 Average latency[ms]: 42.23691 ``` -For each publisher, MQTTLoader counts the number of messages sent for each second. +MQTTLoader counts the number of messages sent by publishers. If QoS level is set to 1 or 2, counting is done when receiving PUBACK or PUBCOMP respectively. -After completion, MQTTLoader collects the counted numbers from all publishers and calculates the maximum throughput, the average throughput, and the number of published messages. -`Throughput[msg/s]` is the list of throughputs, which are the sum of each second for all publishers. -Note that these calculation exclude the beginning and trailing seconds that have 0 messages. -Below is an example of calculating throughputs in the case that two publishers, A and B, send messages. - -| Elapsed seconds from starting measurement | # of meessages from A | # of messages from B | Throughputs | -|:-----------|:------------|:------------|:------------| -| 0 | 0 | 0 | Excluded | -| 1 | 3 | 0 | 3 | -| 2 | 4 | 3 | 7 | -| 3 | 5 | 5 | 10 | -| 4 | 0 | 0 | 0 | -| 5 | 3 | 4 | 7 | -| 6 | 2 | 2 | 4 | -| 7 | 0 | 0 | Excluded | -| 8 | 0 | 0 | Excluded | - -By using the parameterse `-ru` and `-rd`, you can further exclude the beginning and trailing data. -If you set `-ru 1 -rd 1` in the above example, the following data is used. - -| Elapsed seconds from starting measurement | # of meessages from A | # of messages from B | Throughputs | -|:-----------|:------------|:------------|:------------| -| 2 | 4 | 3 | 7 | -| 3 | 5 | 5 | 10 | -| 4 | 0 | 0 | 0 | -| 5 | 3 | 4 | 7 | +After completion, MQTTLoader calculates the maximum throughput, the average throughput, and the number of published messages. +`Per second throughput[msg/s]` is the time series of throughputs per second. + +By using the parameterse `-ru` and `-rd`, you can exclude the beginning and trailing data. +If you set `-ru 1 -rd 1` for example, the beginning one second and the trailing one second are excluded. For subscribers, throughputs are calculated as same as the above for the received messages. In addition, the maximum latency and the average latency are calculated. Latency is the required time from sending out by a publisher to receiving by a subscriber. Each message has a timestamp of sending out in its payload and the subscriber receives it calculates the latency. + To calculate the latency accurately, the clocks of pubilshers and subscribers should be the same or synchronized. -Thus, when running multiple MQTTLoader on different machines (e.g., publishers on a machine and subscriber on another), enabling `-n` parameter can improve the calculation of latency. +When running multiple MQTTLoader on different machines (e.g., publishers on a machine and subscriber on another), it is better to use `-n` parameter. +By using `-n` parameter, MQTTLoader acquires time information from the specified NTP server and uses it for timestamps and calculation. -### Data to file -By specifying the file name with `-tf` parameter, you can obtain throughput data like the following. +### Send/Receive record file +By default, MQTTLoader writes out the record of sending/receiving MQTT messages to a file. +As shown below, a file `mqttloader_xxxxxxxx-xxxxxx.csv` is created in `mqttloader` directory. +The file name is generated from the measurement start time. +Note that in the case of running MQTTLoader by Gradle or IDE, the file is created in the current working directory. ``` -Measurement start time: 2020-09-01 18:33:38.122 JST -Measurement end time: 2020-09-01 18:33:54.104 JST -SLOT, mqttloaderclient-pub000000, mqttloaderclient-sub000000 -0, 11955, 11218 -1, 16427, 16414 -2, 18430, 18426 -3, 18030, 18026 -4, 18622, 18620 -5, 16536, 17296 +mqttloader/ ++-- bin/ + +-- mqttloader + +-- mqttloader.bat ++-- lib/ ++-- logging.properties ++-- mqttloader_xxxxxxxx-xxxxxx.csv ``` -This indicates the throughput for each second for each publisher. -The data that used to calculate the summary data in the standard output is written out. -By specifying the file name with `-lf` parameter, you can obtain latency data like the following. +The file `mqttloader_xxxxxxxx-xxxxxx.csv` has records like the following: ``` -Measurement start time: 2020-09-01 18:33:38.122 JST -Measurement end time: 2020-09-01 18:33:54.104 JST -mqttloaderclient-sub000000, mqttloaderclient-sub000001 -7, 7 -4, 4 -3, 3 -4, 4 -3, 4 -3, 3 -4, 4 -3, 4 +1599643916416,ml-EeiE-p-00001,S, +1599643916416,ml-EeiE-p-00000,S, +1599643916419,ml-EeiE-s-00000,R,3 +1599643916422,ml-EeiE-p-00001,S, + : + : ``` -This indicates the latency for each message for each subscriber. + +Each line, consists of comma-separeted values, indicates the following data. +In the case that the event type is `R`, latency data follows. + +``` +timestamp (Unix time), client ID, event type (S: send, R: receive), latency +``` + +Although MQTTLoader outputs the measurement result to the console, you can use the above .csv file for further analysis. +If you want to avoid the influence of file I/O on the measurement, you can run MQTTLoader with in-memory mode by using the `-im` parameter. +In this case, the above .csv file is not created. --- --- @@ -216,11 +203,10 @@ Clone the MQTTLoader repository from GitHub: `$ git clone git@github.com:dist-sy The structure of the directories/files is as follows: ``` -mqttloader -+-- docs -+-- src +mqttloader/ ++-- doc/ ++-- src/ +-- build.gradle -+-- logging.properties : ``` @@ -237,9 +223,9 @@ If successful, *build* directory is created under *\*. You can find *distributions* directory under the *build* directory. ``` - -+-- build - +-- distributions +/ ++-- build/ + +-- distributions/ +-- mqttloader.tar +-- mqttloader.zip ``` diff --git a/doc/usage_jp.md b/doc/usage_jp.md index f5c1fa9..8c66b91 100644 --- a/doc/usage_jp.md +++ b/doc/usage_jp.md @@ -13,11 +13,12 @@ https://github.com/dist-sys/mqttloader/releases ダウンロードしたファイルを解凍すると、以下のディレクトリ構造が得られます。 ``` -mqttloader -+-- bin +mqttloader/ ++-- bin/ +-- mqttloader +-- mqttloader.bat -+-- lib ++-- lib/ ++-- logging.properties ``` *bin* に入っているのがMQTTLoaderの実行スクリプトです。 @@ -27,10 +28,11 @@ Windowsユーザはmqttloader.bat(バッチファイル)を、Linux等のユ `$ ./mqttloader -h` ``` +MQTTLoader version 0.7.0 usage: mqttloader.Loader -b [-v ] [-p ] [-s ] [-pq ] [-sq ] [-ss] [-r] [-t ] [-d ] [-m ] [-ru ] [-rd ] [-i ] [-st ] [-et ] [-l ] - [-n ] [-tf ] [-lf ] [-h] + [-n ] [-im] [-h] -b,--broker Broker URL. E.g., tcp://127.0.0.1:1883 -v,--version MQTT version ("3" for 3.1.1 or "5" for 5.0). : @@ -49,6 +51,7 @@ MQTTLoaderの動作を確認するだけなら、パブリックブローカを ### 複数台での実行 複数台のマシン上でMQTTLoaderを動かすこともできます。 + 1台のマシン上でpublisherとsubscriberを動かした場合、subscriberの受信負荷によってpublisherの送信スループットが低下する等の可能性があります。 publisherとsubscriberを別マシンで動かすことで、負荷が相互に影響することを避けることができます。 @@ -82,17 +85,16 @@ publisherとsubscriberを別マシンで動かすことで、負荷が相互に | -ss | | Shared subscriptionを有効にするかどうか(デフォルト:無効)。MQTT v5.0でのみ設定可。
有効にすると、各メッセージは全subscriberのうちいずれかひとつに届く。 | | -r | | publisherの送信メッセージにてRetainを有効にするかどうか(デフォルト:無効)。 | | -t \ | mqttloader-test-topic | 測定で用いられるトピック名 | -| -d \ | 20 | publisherが送信するメッセージのデータサイズ(MQTTパケットのペイロードサイズ)。単位はbyte。 | +| -d \ | 20 | publisherが送信するメッセージのデータサイズ(MQTTパケットのペイロードサイズ)。単位はbyte。設定可能な最小値は8。 | | -m \ | 100 | **各**publisherによって送信されるメッセージの数。 | | -ru \ | 0 | ランプアップ時間。単位は秒。
詳細は **4. 測定結果の見方** を参照。 | | -rd \ | 0 | ランプダウン時間。単位は秒。
詳細は **4. 測定結果の見方** を参照。 | | -i \ | 0 | 各publisherがメッセージを送信する間隔。単位はミリ秒。 | | -st \ | 5 | subscriberの受信タイムアウト。単位は秒。 | | -et \ | 60 | 測定の実行時間上限。単位は秒。 | -| -l \ | WARNING | ログレベル。
設定可能な値:`SEVERE`/`WARNING`/`INFO`/`ALL` | +| -l \ | INFO | ログレベル。
設定可能な値:`SEVERE`/`WARNING`/`INFO`/`ALL` | | -n \ | (無し) | NTPサーバのURL。設定すると時刻同期が有効になる(デフォルト:無効)。
例:`ntp.nict.jp`  | -| -tf \ | (無し) | スループットデータを記録するファイル名。デフォルトではファイル出力は無し。 | -| -lf \ | (無し) | レイテンシデータを記録するファイル名。デフォルトではファイル出力は無し。 | +| -im \ | (無し) | MQTTLoaderをメモリ上でのみ動作させる。デフォルトでは、測定レコードはファイルに書き出される。 | | -h | | ヘルプを表示 | MQTTLoaderは、以下の条件をすべて満たすと、クライアントを切断させ終了します。 @@ -100,9 +102,8 @@ MQTTLoaderは、以下の条件をすべて満たすと、クライアントを - 全subscriberのメッセージ受信のうち、最後の受信からパラメータ`-st`で指定した秒数が経過 また、MQTTLoaderは、パラメータ`-et`によって指定される時間が経過すると、メッセージ送受信中であっても、終了します。 -送受信を中断したくない場合は、`-et`は長めに設定しておくと良いでしょう。 - -一定時間の測定を行いたい場合には、`-et`を用いて測定時間を設定し、`-m`で十分に大きな値を設定します。 +**一定数のメッセージ送受信**をテストしたい場合は、`-et`は長めに設定しておくと良いでしょう。 +**一定時間の測定**を行いたい場合には、`-et`を用いて測定時間を設定し、`-m`には十分大きな値を設定します。 パラメータ`-n`を設定すると、MQTTLoaderは指定されたNTPサーバから時刻のオフセット情報(NTPサーバ時刻からのずれ)を取得し、スループットやレイテンシの計算にそれを反映します。 複数のMQTTLoaderを異なるマシン上で実行する場合に、利用を検討してください。 @@ -116,88 +117,72 @@ MQTTLoadは標準出力に以下のような測定結果の情報を出力しま Maximum throughput[msg/s]: 18622 Average throughput[msg/s]: 16666.666666666668 Number of published messages: 100000 -Throughput[msg/s]: 11955, 16427, 18430, 18030, 18622, 16536 +Per second throughput[msg/s]: 11955, 16427, 18430, 18030, 18622, 16536 -----Subscriber----- Maximum throughput[msg/s]: 18620 Average throughput[msg/s]: 16666.666666666668 Number of received messages: 100000 -Throughput[msg/s]: 11218, 16414, 18426, 18026, 18620, 17296 +Per second throughput[msg/s]: 11218, 16414, 18426, 18026, 18620, 17296 Maximum latency[ms]: 81 Average latency[ms]: 42.23691 ``` -MQTTLoaderは、各publisherごとに、毎秒の送信メッセージ数をカウントします。 +MQTTLoaderは、各publisherによるメッセージの送信をカウントします。 QoSレベルが1または2の場合は、それぞれ、PUBACKおよびPUBCOMPを受信したタイミングでカウントされます。 -全てのメッセージ送信が完了したら、MQTTLoaderは全publisherからカウントしたメッセージ数の情報を集めて集計し、最大スループット、平均スループット、送信メッセージ数を計算します。 -`Throughput[msg/s]`の項は、スループット値の列挙です。列挙されているそれぞれの値は、各秒における全publisherの送信メッセージ数を足し合わせたものです。 -なお、測定開始時および終了時に送信メッセージ数が0の期間がある場合は、スループットの計算からは除外されます。 -ふたつのpublisher AとBがメッセージを送信する場合の、スループット集計値の例を以下に示します。 - -| 測定開始からの秒数 | Aの送信メッセージ数 | Bの送信メッセージ数 | スループット集計値 | -|:-----------|:------------|:------------|:------------| -| 0 | 0 | 0 | 集計対象外 | -| 1 | 3 | 0 | 3 | -| 2 | 4 | 3 | 7 | -| 3 | 5 | 5 | 10 | -| 4 | 0 | 0 | 0 | -| 5 | 3 | 4 | 7 | -| 6 | 2 | 2 | 4 | -| 7 | 0 | 0 | 集計対象外 | -| 8 | 0 | 0 | 集計対象外 | - -パラメータ`-ru`と`-rd`を用いると、集計対象データからさらに最初と最後の一定秒数分を計算から除外することができます。 -上記の例にて、 `-ru 1 -rd 1` と設定した場合、以下のデータが集計対象として扱われることになります。 - -| 測定開始からの秒数 | Aの送信メッセージ数 | Bの送信メッセージ数 | スループット集計値 | -|:-----------|:------------|:------------|:------------| -| 2 | 4 | 3 | 7 | -| 3 | 5 | 5 | 10 | -| 4 | 0 | 0 | 0 | -| 5 | 3 | 4 | 7 | +測定が終了したら、MQTTLoaderはカウントしたメッセージ数を集計し、最大スループット、平均スループット、送信メッセージ数を計算します。 +`Per second throughput[msg/s]`は、スループット値の時間変化を秒単位で列挙したものです。 + +パラメータ`-ru`と`-rd`を用いると、測定開始直後と終了直前の一定秒数分を、集計対象データから除外することができます。 +例えば、 `-ru 1 -rd 1` と設定した場合、最初と最後の1秒間のデータは集計対象外となります。 subscriberに関しても、上記と同様にして、受信メッセージのスループットが計算されます。 これに加えて、subscriber側では、最大レイテンシと平均レイテンシも計算されます。 レイテンシは、publisherが送信したメッセージがsubscriberに届くまでの時間です。 各メッセージはペイロード部に送信時刻を格納しており、subscriberは受信時にそれを用いてレイテンシの計算をおこないます。 + レイテンシを正確に算出するためには、publisherとsubscriberの時刻が同期されている必要があります。 このため、複数の異なるマシン上でMQTTLoaderを動かす場合(例えば、publisherとsubscriberを別マシンで動かす場合)には、注意が必要です。 -`-n`パラメータを使うことで、レイテンシ計算の正確性を改善できる可能性があります。 +`-n`パラメータを使うと、MQTTLoaderはNTPサーバから時刻情報を取得し、その情報をもとに送受信時刻やレイテンシを計算するため、マシンの時刻がずれていても(ある程度)正確なレイテンシを得られます。 + +### 送受信レコードファイル +デフォルトでは、MQTTLoaderはMQTTメッセージの送受信記録をファイルに出力します。 +以下のように、 `mqttloader` ディレクトリの直下に、csv形式のファイルとして出力されます。 +ファイル名は測定開始日時から生成されます。 +なお、GradleやIDEから実行した場合には、作業ディレクトリにファイルが作成されます。 + +``` +mqttloader/ ++-- bin/ + +-- mqttloader + +-- mqttloader.bat ++-- lib/ ++-- logging.properties ++-- mqttloader_xxxxxxxx-xxxxxx.csv +``` -### ファイル出力 -パラメータ`-tf`でファイル名を指定することで、以下のようなスループットの詳細データをファイルに書き出すことができます。 +このcsvファイルには、以下のようなデータが記録されます。 ``` -Measurement start time: 2020-09-01 18:33:38.122 JST -Measurement end time: 2020-09-01 18:33:44.104 JST -SLOT, mqttloaderclient-pub000000, mqttloaderclient-sub000000 -0, 11955, 11218 -1, 16427, 16414 -2, 18430, 18426 -3, 18030, 18026 -4, 18622, 18620 -5, 16536, 17296 +1599643916416,ml-EeiE-p-00001,S, +1599643916416,ml-EeiE-p-00000,S, +1599643916419,ml-EeiE-s-00000,R,3 +1599643916422,ml-EeiE-p-00001,S, + : + : ``` -これは、各秒における、各publisherのスループット(送信メッセージ数)を表しています。 -標準出力のサマリ情報のところで述べた、集計対象となっているデータが、ファイルに出力されます。 -パラメータ`-lf`でファイル名を指定することで、以下のようなレイテンシの詳細データをファイルに書き出すことができます。 +各行は、カンマ区切りで、以下の内容となっています。 +送受信種別が `R` の場合のみ、レイテンシも記載されます。 ``` -Measurement start time: 2020-09-01 18:33:38.122 JST -Measurement end time: 2020-09-01 18:33:44.104 JST -mqttloaderclient-sub000000, mqttloaderclient-sub000001 -7, 7 -4, 4 -3, 3 -4, 4 -3, 4 -3, 3 -4, 4 -3, 4 +タイムスタンプ(Unix時間), クライアントID, 送受信種別(S: 送信, R: 受信), レイテンシ ``` -これは、各subscriberが受信した各メッセージのレイテンシを表しています。 + +MQTTLoaderは、測定結果のサマリをコンソールに出力しますが、追加の集計・分析を行いたい場合には上記のファイルを使ってください。 +なお、ファイル出力の負荷を抑えて測定をおこないたい場合には、 `-im` パラメータによりインメモリモードで動作させることができます。 + `-im` パラメータを指定した場合、上記のcsvファイルは作成されません。 --- --- @@ -218,11 +203,10 @@ GitHubからクローンしてください: `$ git clone git@github.com:dist-s リポジトリのディレクトリ構造は下記のようになっています。 ``` -mqttloader -+-- docs -+-- src +mqttloader/ ++-- doc/ ++-- src/ +-- build.gradle -+-- logging.properties : ``` @@ -239,9 +223,9 @@ $ gradle build 成功すると、*\* 配下に *build* ディレクトリが生成されます。 ``` - -+-- build - +-- distributions +/ ++-- build/ + +-- distributions/ +-- mqttloader.tar +-- mqttloader.zip ``` From 7dc68b389f526f15a813622618dff39313ac6c7d Mon Sep 17 00:00:00 2001 From: nabon Date: Wed, 9 Sep 2020 23:48:00 +0900 Subject: [PATCH 13/14] Handling initial connection error. --- src/main/java/mqttloader/client/PublisherV3.java | 1 + src/main/java/mqttloader/client/PublisherV5.java | 1 + src/main/java/mqttloader/client/SubscriberV3.java | 1 + src/main/java/mqttloader/client/SubscriberV5.java | 1 + 4 files changed, 4 insertions(+) diff --git a/src/main/java/mqttloader/client/PublisherV3.java b/src/main/java/mqttloader/client/PublisherV3.java index b2545df..79cafcf 100644 --- a/src/main/java/mqttloader/client/PublisherV3.java +++ b/src/main/java/mqttloader/client/PublisherV3.java @@ -43,6 +43,7 @@ public PublisherV3(int clientNumber, String broker, int qos, boolean retain, Str } catch (MqttException e) { Loader.logger.warning("Publisher failed to connect (" + clientId + ")."); e.printStackTrace(); + System.exit(1); } } diff --git a/src/main/java/mqttloader/client/PublisherV5.java b/src/main/java/mqttloader/client/PublisherV5.java index 947ba08..0ef05b4 100644 --- a/src/main/java/mqttloader/client/PublisherV5.java +++ b/src/main/java/mqttloader/client/PublisherV5.java @@ -42,6 +42,7 @@ public PublisherV5(int clientNumber, String broker, int qos, boolean retain, Str } catch (MqttException e) { Loader.logger.warning("Publisher failed to connect (" + clientId + ")."); e.printStackTrace(); + System.exit(1); } } diff --git a/src/main/java/mqttloader/client/SubscriberV3.java b/src/main/java/mqttloader/client/SubscriberV3.java index 48499c3..62f9f4e 100644 --- a/src/main/java/mqttloader/client/SubscriberV3.java +++ b/src/main/java/mqttloader/client/SubscriberV3.java @@ -44,6 +44,7 @@ public SubscriberV3(int clientNumber, String broker, int qos, String topic) { } catch (MqttException e) { Loader.logger.warning("Subscriber failed to connect (" + clientId + ")."); e.printStackTrace(); + System.exit(1); } } diff --git a/src/main/java/mqttloader/client/SubscriberV5.java b/src/main/java/mqttloader/client/SubscriberV5.java index 89a6862..176bb51 100644 --- a/src/main/java/mqttloader/client/SubscriberV5.java +++ b/src/main/java/mqttloader/client/SubscriberV5.java @@ -50,6 +50,7 @@ public SubscriberV5(int clientNumber, String broker, int qos, boolean shSub, Str } catch (MqttException e) { Loader.logger.warning("Subscriber failed to connect (" + clientId + ")."); e.printStackTrace(); + System.exit(1); } } From 33ad4f31b6c10b0c51cd13e81666780ebdf09e27 Mon Sep 17 00:00:00 2001 From: nabon Date: Thu, 10 Sep 2020 01:17:22 +0900 Subject: [PATCH 14/14] Change subscribers to wait for start time calculation. --- src/main/java/mqttloader/Constants.java | 2 +- src/main/java/mqttloader/Loader.java | 5 +++-- src/main/java/mqttloader/client/AbstractSubscriber.java | 6 ++++++ 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/main/java/mqttloader/Constants.java b/src/main/java/mqttloader/Constants.java index eb7159e..c70b12c 100644 --- a/src/main/java/mqttloader/Constants.java +++ b/src/main/java/mqttloader/Constants.java @@ -24,7 +24,7 @@ public class Constants { public static final String PUB_CLIENT_ID_PREFIX = "ml-"+HOST_ID+"-p-"; public static final Record STOP_SIGNAL = new Record(); public static final int MILLISECOND_IN_NANO = 1000000; - public static final int SECOND_IN_NANO = 1000000; + public static final int SECOND_IN_NANO = 1000000000; public static final int SECOND_IN_MILLI = 1000; public enum Opt { diff --git a/src/main/java/mqttloader/Loader.java b/src/main/java/mqttloader/Loader.java index 0f22100..0875e99 100644 --- a/src/main/java/mqttloader/Loader.java +++ b/src/main/java/mqttloader/Loader.java @@ -62,8 +62,8 @@ public class Loader { private CommandLine cmd = null; private ArrayList publishers = new ArrayList<>(); private ArrayList subscribers = new ArrayList<>(); - public static volatile long startTime; - public static volatile long startNanoTime; + public static volatile long startTime = 0; + public static volatile long startNanoTime = 0; private long endTime; public static volatile long lastRecvTime; public static ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1000000); @@ -418,6 +418,7 @@ private void paddingTreeMap(TreeMap map) { if(map.size() == 0) { return; } + for(int i=map.firstKey();i