diff --git a/doc/usage_en.md b/doc/usage_en.md index 1e6c796..edb4004 100644 --- a/doc/usage_en.md +++ b/doc/usage_en.md @@ -12,11 +12,12 @@ Download the archive file (zip or tar) from: https://github.com/dist-sys/mqttloa By extracting it, you can get the following files. ``` -mqttloader -+-- bin +mqttloader/ ++-- bin/ +-- mqttloader +-- mqttloader.bat -+-- lib ++-- lib/ ++-- logging.properties ``` Scripts for executing MQTTLoader is in *bin* directory. @@ -26,10 +27,11 @@ You can display the help by: `$ ./mqttloader -h` ``` +MQTTLoader version 0.7.0 usage: mqttloader.Loader -b [-v ] [-p ] [-s ] [-pq ] [-sq ] [-ss] [-r] [-t ] [-d ] [-m ] [-ru ] [-rd ] [-i ] [-st ] [-et ] [-l ] - [-n ] [-tf ] [-lf ] [-h] + [-n ] [-im] [-h] -b,--broker Broker URL. E.g., tcp://127.0.0.1:1883 -v,--version MQTT version ("3" for 3.1.1 or "5" for 5.0). : @@ -48,6 +50,7 @@ For example, the following command uses a public MQTT broker provided by HiveMQ. ### Run on multiple machines You can run MQTTLoader on multiple machines. + Running both publishers and subscribers on a single machine may cause mutual influence, e.g., the subscribers' receiving load lowers the publishers' throughput. By running publishers and subscribers separately on different machines, you can avoid such mutual influence. @@ -81,17 +84,16 @@ Please refer to **3. Parameteres of MQTTLoader** for more details of each parame | -ss | | Enable shared subscription. By default, it is disabled. Valid for only MQTT v5.0.
If it is enabled, a message is delivered to one of the subscribers. | | -r | | Enable retain for the messages sent by publishers. By default, it is disabled. | | -t \ | mqttloader-test-topic | Topic name to be used. | -| -d \ | 20 | The size of data (payload of messages to be published) in bytes. | +| -d \ | 20 | The size of data (payload of messages to be published) in bytes. It must be equal to or larger than 8. | | -m \ | 100 | Number of messages sent by **each** publisher. | | -ru \ | 0 | Ramp-up time in seconds.
See **4. How to read the results** for details. | | -rd \ | 0 | Ramp-down time in seconds.
See **4. How to read the results** for details. | | -i \ | 0 | Publish interval in milliseconds. | | -st \ | 5 | Timeout for receiving messages by subscribers in seconds. | | -et \ | 60 | Maximum execution time for measurement in seconds. | -| -l \ | WARNING | Log level.
Valid values are `SEVERE`/`WARNING`/`INFO`/`ALL`. | +| -l \ | INFO | Log level.
Valid values are `SEVERE`/`WARNING`/`INFO`/`ALL`. | | -n \ | (none) | URL of the NTP server. By setting this, time synchronization is enabled.
Ex. `ntp.nict.jp` | -| -tf \ | (none) | File name to write out the throughput data. By default, file output is disabled. | -| -lf \ | (none) | File name to write out the latency data. By default, file output is disabled. | +| -im \ | (none) | Run MQTTLoader by in-memory mode. By default, MQTTLoader writes out measurement records to a file. | | -h | | Display help. | MQTTLoader starts to terminate when all of the following conditions are met. @@ -99,10 +101,10 @@ MQTTLoader starts to terminate when all of the following conditions are met. - The time specified by the parameter `-st` elapses from the last time subscribers receive a message. MQTTLoader also starts to terminate when the time specified by the parameter `-et` elapses, even if there are in-flight messages. -Thus, `-et` should be long sufficiently. +Thus, if you want to test fixed number of messages, `-et` should be long sufficiently. If you want to do measurement with fixed time period, you can set the measurement time by the parameter `-et`. -Note that you need to set sufficiently large value to the parameter `-m`. +In this case, you need to set sufficiently large value to the parameter `-m`. By setting the parameter `-n`, MQTTLoader obtains the offset time from the specified NTP server and reflects it to calculate throughput and latency. It might be useful for running multiple MQTTLoader on different machines. @@ -116,86 +118,71 @@ MQTTLoader displays results like the following on standard output. Maximum throughput[msg/s]: 18622 Average throughput[msg/s]: 16666.666666666668 Number of published messages: 100000 -Throughput[msg/s]: 11955, 16427, 18430, 18030, 18622, 16536 +Per second throughput[msg/s]: 11955, 16427, 18430, 18030, 18622, 16536 -----Subscriber----- Maximum throughput[msg/s]: 18620 Average throughput[msg/s]: 16666.666666666668 Number of received messages: 100000 -Throughput[msg/s]: 11218, 16414, 18426, 18026, 18620, 17296 +Per second throughput[msg/s]: 11218, 16414, 18426, 18026, 18620, 17296 Maximum latency[ms]: 81 Average latency[ms]: 42.23691 ``` -For each publisher, MQTTLoader counts the number of messages sent for each second. +MQTTLoader counts the number of messages sent by publishers. If QoS level is set to 1 or 2, counting is done when receiving PUBACK or PUBCOMP respectively. -After completion, MQTTLoader collects the counted numbers from all publishers and calculates the maximum throughput, the average throughput, and the number of published messages. -`Throughput[msg/s]` is the list of throughputs, which are the sum of each second for all publishers. -Note that these calculation exclude the beginning and trailing seconds that have 0 messages. -Below is an example of calculating throughputs in the case that two publishers, A and B, send messages. - -| Elapsed seconds from starting measurement | # of meessages from A | # of messages from B | Throughputs | -|:-----------|:------------|:------------|:------------| -| 0 | 0 | 0 | Excluded | -| 1 | 3 | 0 | 3 | -| 2 | 4 | 3 | 7 | -| 3 | 5 | 5 | 10 | -| 4 | 0 | 0 | 0 | -| 5 | 3 | 4 | 7 | -| 6 | 2 | 2 | 4 | -| 7 | 0 | 0 | Excluded | -| 8 | 0 | 0 | Excluded | - -By using the parameterse `-ru` and `-rd`, you can further exclude the beginning and trailing data. -If you set `-ru 1 -rd 1` in the above example, the following data is used. - -| Elapsed seconds from starting measurement | # of meessages from A | # of messages from B | Throughputs | -|:-----------|:------------|:------------|:------------| -| 2 | 4 | 3 | 7 | -| 3 | 5 | 5 | 10 | -| 4 | 0 | 0 | 0 | -| 5 | 3 | 4 | 7 | +After completion, MQTTLoader calculates the maximum throughput, the average throughput, and the number of published messages. +`Per second throughput[msg/s]` is the time series of throughputs per second. + +By using the parameterse `-ru` and `-rd`, you can exclude the beginning and trailing data. +If you set `-ru 1 -rd 1` for example, the beginning one second and the trailing one second are excluded. For subscribers, throughputs are calculated as same as the above for the received messages. In addition, the maximum latency and the average latency are calculated. Latency is the required time from sending out by a publisher to receiving by a subscriber. Each message has a timestamp of sending out in its payload and the subscriber receives it calculates the latency. + To calculate the latency accurately, the clocks of pubilshers and subscribers should be the same or synchronized. -Thus, when running multiple MQTTLoader on different machines (e.g., publishers on a machine and subscriber on another), enabling `-n` parameter can improve the calculation of latency. +When running multiple MQTTLoader on different machines (e.g., publishers on a machine and subscriber on another), it is better to use `-n` parameter. +By using `-n` parameter, MQTTLoader acquires time information from the specified NTP server and uses it for timestamps and calculation. -### Data to file -By specifying the file name with `-tf` parameter, you can obtain throughput data like the following. +### Send/Receive record file +By default, MQTTLoader writes out the record of sending/receiving MQTT messages to a file. +As shown below, a file `mqttloader_xxxxxxxx-xxxxxx.csv` is created in `mqttloader` directory. +The file name is generated from the measurement start time. +Note that in the case of running MQTTLoader by Gradle or IDE, the file is created in the current working directory. ``` -Measurement start time: 2020-09-01 18:33:38.122 JST -Measurement end time: 2020-09-01 18:33:54.104 JST -SLOT, mqttloaderclient-pub000000, mqttloaderclient-sub000000 -0, 11955, 11218 -1, 16427, 16414 -2, 18430, 18426 -3, 18030, 18026 -4, 18622, 18620 -5, 16536, 17296 +mqttloader/ ++-- bin/ + +-- mqttloader + +-- mqttloader.bat ++-- lib/ ++-- logging.properties ++-- mqttloader_xxxxxxxx-xxxxxx.csv ``` -This indicates the throughput for each second for each publisher. -The data that used to calculate the summary data in the standard output is written out. -By specifying the file name with `-lf` parameter, you can obtain latency data like the following. +The file `mqttloader_xxxxxxxx-xxxxxx.csv` has records like the following: ``` -Measurement start time: 2020-09-01 18:33:38.122 JST -Measurement end time: 2020-09-01 18:33:54.104 JST -mqttloaderclient-sub000000, mqttloaderclient-sub000001 -7, 7 -4, 4 -3, 3 -4, 4 -3, 4 -3, 3 -4, 4 -3, 4 +1599643916416,ml-EeiE-p-00001,S, +1599643916416,ml-EeiE-p-00000,S, +1599643916419,ml-EeiE-s-00000,R,3 +1599643916422,ml-EeiE-p-00001,S, + : + : ``` -This indicates the latency for each message for each subscriber. + +Each line, consists of comma-separeted values, indicates the following data. +In the case that the event type is `R`, latency data follows. + +``` +timestamp (Unix time), client ID, event type (S: send, R: receive), latency +``` + +Although MQTTLoader outputs the measurement result to the console, you can use the above .csv file for further analysis. +If you want to avoid the influence of file I/O on the measurement, you can run MQTTLoader with in-memory mode by using the `-im` parameter. +In this case, the above .csv file is not created. --- --- @@ -216,11 +203,10 @@ Clone the MQTTLoader repository from GitHub: `$ git clone git@github.com:dist-sy The structure of the directories/files is as follows: ``` -mqttloader -+-- docs -+-- src +mqttloader/ ++-- doc/ ++-- src/ +-- build.gradle -+-- logging.properties : ``` @@ -237,9 +223,9 @@ If successful, *build* directory is created under *\*. You can find *distributions* directory under the *build* directory. ``` - -+-- build - +-- distributions +/ ++-- build/ + +-- distributions/ +-- mqttloader.tar +-- mqttloader.zip ``` diff --git a/doc/usage_jp.md b/doc/usage_jp.md index f5c1fa9..8c66b91 100644 --- a/doc/usage_jp.md +++ b/doc/usage_jp.md @@ -13,11 +13,12 @@ https://github.com/dist-sys/mqttloader/releases ダウンロードしたファイルを解凍すると、以下のディレクトリ構造が得られます。 ``` -mqttloader -+-- bin +mqttloader/ ++-- bin/ +-- mqttloader +-- mqttloader.bat -+-- lib ++-- lib/ ++-- logging.properties ``` *bin* に入っているのがMQTTLoaderの実行スクリプトです。 @@ -27,10 +28,11 @@ Windowsユーザはmqttloader.bat(バッチファイル)を、Linux等のユ `$ ./mqttloader -h` ``` +MQTTLoader version 0.7.0 usage: mqttloader.Loader -b [-v ] [-p ] [-s ] [-pq ] [-sq ] [-ss] [-r] [-t ] [-d ] [-m ] [-ru ] [-rd ] [-i ] [-st ] [-et ] [-l ] - [-n ] [-tf ] [-lf ] [-h] + [-n ] [-im] [-h] -b,--broker Broker URL. E.g., tcp://127.0.0.1:1883 -v,--version MQTT version ("3" for 3.1.1 or "5" for 5.0). : @@ -49,6 +51,7 @@ MQTTLoaderの動作を確認するだけなら、パブリックブローカを ### 複数台での実行 複数台のマシン上でMQTTLoaderを動かすこともできます。 + 1台のマシン上でpublisherとsubscriberを動かした場合、subscriberの受信負荷によってpublisherの送信スループットが低下する等の可能性があります。 publisherとsubscriberを別マシンで動かすことで、負荷が相互に影響することを避けることができます。 @@ -82,17 +85,16 @@ publisherとsubscriberを別マシンで動かすことで、負荷が相互に | -ss | | Shared subscriptionを有効にするかどうか(デフォルト:無効)。MQTT v5.0でのみ設定可。
有効にすると、各メッセージは全subscriberのうちいずれかひとつに届く。 | | -r | | publisherの送信メッセージにてRetainを有効にするかどうか(デフォルト:無効)。 | | -t \ | mqttloader-test-topic | 測定で用いられるトピック名 | -| -d \ | 20 | publisherが送信するメッセージのデータサイズ(MQTTパケットのペイロードサイズ)。単位はbyte。 | +| -d \ | 20 | publisherが送信するメッセージのデータサイズ(MQTTパケットのペイロードサイズ)。単位はbyte。設定可能な最小値は8。 | | -m \ | 100 | **各**publisherによって送信されるメッセージの数。 | | -ru \ | 0 | ランプアップ時間。単位は秒。
詳細は **4. 測定結果の見方** を参照。 | | -rd \ | 0 | ランプダウン時間。単位は秒。
詳細は **4. 測定結果の見方** を参照。 | | -i \ | 0 | 各publisherがメッセージを送信する間隔。単位はミリ秒。 | | -st \ | 5 | subscriberの受信タイムアウト。単位は秒。 | | -et \ | 60 | 測定の実行時間上限。単位は秒。 | -| -l \ | WARNING | ログレベル。
設定可能な値:`SEVERE`/`WARNING`/`INFO`/`ALL` | +| -l \ | INFO | ログレベル。
設定可能な値:`SEVERE`/`WARNING`/`INFO`/`ALL` | | -n \ | (無し) | NTPサーバのURL。設定すると時刻同期が有効になる(デフォルト:無効)。
例:`ntp.nict.jp`  | -| -tf \ | (無し) | スループットデータを記録するファイル名。デフォルトではファイル出力は無し。 | -| -lf \ | (無し) | レイテンシデータを記録するファイル名。デフォルトではファイル出力は無し。 | +| -im \ | (無し) | MQTTLoaderをメモリ上でのみ動作させる。デフォルトでは、測定レコードはファイルに書き出される。 | | -h | | ヘルプを表示 | MQTTLoaderは、以下の条件をすべて満たすと、クライアントを切断させ終了します。 @@ -100,9 +102,8 @@ MQTTLoaderは、以下の条件をすべて満たすと、クライアントを - 全subscriberのメッセージ受信のうち、最後の受信からパラメータ`-st`で指定した秒数が経過 また、MQTTLoaderは、パラメータ`-et`によって指定される時間が経過すると、メッセージ送受信中であっても、終了します。 -送受信を中断したくない場合は、`-et`は長めに設定しておくと良いでしょう。 - -一定時間の測定を行いたい場合には、`-et`を用いて測定時間を設定し、`-m`で十分に大きな値を設定します。 +**一定数のメッセージ送受信**をテストしたい場合は、`-et`は長めに設定しておくと良いでしょう。 +**一定時間の測定**を行いたい場合には、`-et`を用いて測定時間を設定し、`-m`には十分大きな値を設定します。 パラメータ`-n`を設定すると、MQTTLoaderは指定されたNTPサーバから時刻のオフセット情報(NTPサーバ時刻からのずれ)を取得し、スループットやレイテンシの計算にそれを反映します。 複数のMQTTLoaderを異なるマシン上で実行する場合に、利用を検討してください。 @@ -116,88 +117,72 @@ MQTTLoadは標準出力に以下のような測定結果の情報を出力しま Maximum throughput[msg/s]: 18622 Average throughput[msg/s]: 16666.666666666668 Number of published messages: 100000 -Throughput[msg/s]: 11955, 16427, 18430, 18030, 18622, 16536 +Per second throughput[msg/s]: 11955, 16427, 18430, 18030, 18622, 16536 -----Subscriber----- Maximum throughput[msg/s]: 18620 Average throughput[msg/s]: 16666.666666666668 Number of received messages: 100000 -Throughput[msg/s]: 11218, 16414, 18426, 18026, 18620, 17296 +Per second throughput[msg/s]: 11218, 16414, 18426, 18026, 18620, 17296 Maximum latency[ms]: 81 Average latency[ms]: 42.23691 ``` -MQTTLoaderは、各publisherごとに、毎秒の送信メッセージ数をカウントします。 +MQTTLoaderは、各publisherによるメッセージの送信をカウントします。 QoSレベルが1または2の場合は、それぞれ、PUBACKおよびPUBCOMPを受信したタイミングでカウントされます。 -全てのメッセージ送信が完了したら、MQTTLoaderは全publisherからカウントしたメッセージ数の情報を集めて集計し、最大スループット、平均スループット、送信メッセージ数を計算します。 -`Throughput[msg/s]`の項は、スループット値の列挙です。列挙されているそれぞれの値は、各秒における全publisherの送信メッセージ数を足し合わせたものです。 -なお、測定開始時および終了時に送信メッセージ数が0の期間がある場合は、スループットの計算からは除外されます。 -ふたつのpublisher AとBがメッセージを送信する場合の、スループット集計値の例を以下に示します。 - -| 測定開始からの秒数 | Aの送信メッセージ数 | Bの送信メッセージ数 | スループット集計値 | -|:-----------|:------------|:------------|:------------| -| 0 | 0 | 0 | 集計対象外 | -| 1 | 3 | 0 | 3 | -| 2 | 4 | 3 | 7 | -| 3 | 5 | 5 | 10 | -| 4 | 0 | 0 | 0 | -| 5 | 3 | 4 | 7 | -| 6 | 2 | 2 | 4 | -| 7 | 0 | 0 | 集計対象外 | -| 8 | 0 | 0 | 集計対象外 | - -パラメータ`-ru`と`-rd`を用いると、集計対象データからさらに最初と最後の一定秒数分を計算から除外することができます。 -上記の例にて、 `-ru 1 -rd 1` と設定した場合、以下のデータが集計対象として扱われることになります。 - -| 測定開始からの秒数 | Aの送信メッセージ数 | Bの送信メッセージ数 | スループット集計値 | -|:-----------|:------------|:------------|:------------| -| 2 | 4 | 3 | 7 | -| 3 | 5 | 5 | 10 | -| 4 | 0 | 0 | 0 | -| 5 | 3 | 4 | 7 | +測定が終了したら、MQTTLoaderはカウントしたメッセージ数を集計し、最大スループット、平均スループット、送信メッセージ数を計算します。 +`Per second throughput[msg/s]`は、スループット値の時間変化を秒単位で列挙したものです。 + +パラメータ`-ru`と`-rd`を用いると、測定開始直後と終了直前の一定秒数分を、集計対象データから除外することができます。 +例えば、 `-ru 1 -rd 1` と設定した場合、最初と最後の1秒間のデータは集計対象外となります。 subscriberに関しても、上記と同様にして、受信メッセージのスループットが計算されます。 これに加えて、subscriber側では、最大レイテンシと平均レイテンシも計算されます。 レイテンシは、publisherが送信したメッセージがsubscriberに届くまでの時間です。 各メッセージはペイロード部に送信時刻を格納しており、subscriberは受信時にそれを用いてレイテンシの計算をおこないます。 + レイテンシを正確に算出するためには、publisherとsubscriberの時刻が同期されている必要があります。 このため、複数の異なるマシン上でMQTTLoaderを動かす場合(例えば、publisherとsubscriberを別マシンで動かす場合)には、注意が必要です。 -`-n`パラメータを使うことで、レイテンシ計算の正確性を改善できる可能性があります。 +`-n`パラメータを使うと、MQTTLoaderはNTPサーバから時刻情報を取得し、その情報をもとに送受信時刻やレイテンシを計算するため、マシンの時刻がずれていても(ある程度)正確なレイテンシを得られます。 + +### 送受信レコードファイル +デフォルトでは、MQTTLoaderはMQTTメッセージの送受信記録をファイルに出力します。 +以下のように、 `mqttloader` ディレクトリの直下に、csv形式のファイルとして出力されます。 +ファイル名は測定開始日時から生成されます。 +なお、GradleやIDEから実行した場合には、作業ディレクトリにファイルが作成されます。 + +``` +mqttloader/ ++-- bin/ + +-- mqttloader + +-- mqttloader.bat ++-- lib/ ++-- logging.properties ++-- mqttloader_xxxxxxxx-xxxxxx.csv +``` -### ファイル出力 -パラメータ`-tf`でファイル名を指定することで、以下のようなスループットの詳細データをファイルに書き出すことができます。 +このcsvファイルには、以下のようなデータが記録されます。 ``` -Measurement start time: 2020-09-01 18:33:38.122 JST -Measurement end time: 2020-09-01 18:33:44.104 JST -SLOT, mqttloaderclient-pub000000, mqttloaderclient-sub000000 -0, 11955, 11218 -1, 16427, 16414 -2, 18430, 18426 -3, 18030, 18026 -4, 18622, 18620 -5, 16536, 17296 +1599643916416,ml-EeiE-p-00001,S, +1599643916416,ml-EeiE-p-00000,S, +1599643916419,ml-EeiE-s-00000,R,3 +1599643916422,ml-EeiE-p-00001,S, + : + : ``` -これは、各秒における、各publisherのスループット(送信メッセージ数)を表しています。 -標準出力のサマリ情報のところで述べた、集計対象となっているデータが、ファイルに出力されます。 -パラメータ`-lf`でファイル名を指定することで、以下のようなレイテンシの詳細データをファイルに書き出すことができます。 +各行は、カンマ区切りで、以下の内容となっています。 +送受信種別が `R` の場合のみ、レイテンシも記載されます。 ``` -Measurement start time: 2020-09-01 18:33:38.122 JST -Measurement end time: 2020-09-01 18:33:44.104 JST -mqttloaderclient-sub000000, mqttloaderclient-sub000001 -7, 7 -4, 4 -3, 3 -4, 4 -3, 4 -3, 3 -4, 4 -3, 4 +タイムスタンプ(Unix時間), クライアントID, 送受信種別(S: 送信, R: 受信), レイテンシ ``` -これは、各subscriberが受信した各メッセージのレイテンシを表しています。 + +MQTTLoaderは、測定結果のサマリをコンソールに出力しますが、追加の集計・分析を行いたい場合には上記のファイルを使ってください。 +なお、ファイル出力の負荷を抑えて測定をおこないたい場合には、 `-im` パラメータによりインメモリモードで動作させることができます。 + `-im` パラメータを指定した場合、上記のcsvファイルは作成されません。 --- --- @@ -218,11 +203,10 @@ GitHubからクローンしてください: `$ git clone git@github.com:dist-s リポジトリのディレクトリ構造は下記のようになっています。 ``` -mqttloader -+-- docs -+-- src +mqttloader/ ++-- doc/ ++-- src/ +-- build.gradle -+-- logging.properties : ``` @@ -239,9 +223,9 @@ $ gradle build 成功すると、*\* 配下に *build* ディレクトリが生成されます。 ``` - -+-- build - +-- distributions +/ ++-- build/ + +-- distributions/ +-- mqttloader.tar +-- mqttloader.zip ``` diff --git a/src/dist/logging.properties b/src/dist/logging.properties index 753a897..413968a 100644 --- a/src/dist/logging.properties +++ b/src/dist/logging.properties @@ -1,8 +1,8 @@ mqttloader.handlers=java.util.logging.MemoryHandler -mqttloader.level=WARNING +mqttloader.level=INFO #mqttloader.level=SEVERE -#mqttloader.level=INFO +#mqttloader.level=WARNING #mqttloader.level=FINEST org.eclipse.paho.mqttv5.client.handlers=java.util.logging.MemoryHandler diff --git a/src/main/java/mqttloader/Constants.java b/src/main/java/mqttloader/Constants.java index 0e498db..c70b12c 100644 --- a/src/main/java/mqttloader/Constants.java +++ b/src/main/java/mqttloader/Constants.java @@ -17,12 +17,19 @@ package mqttloader; public class Constants { - public static final String SUB_CLIENT_ID_PREFIX = "mqttloaderclient-sub"; - public static final String PUB_CLIENT_ID_PREFIX = "mqttloaderclient-pub"; + public static final String VERSION = "0.7.0"; + public static final String FILE_NAME_PREFIX = "mqttloader_"; + private static final String HOST_ID = Util.genRandomChars(4); + public static final String SUB_CLIENT_ID_PREFIX = "ml-"+HOST_ID+"-s-"; + public static final String PUB_CLIENT_ID_PREFIX = "ml-"+HOST_ID+"-p-"; + public static final Record STOP_SIGNAL = new Record(); + public static final int MILLISECOND_IN_NANO = 1000000; + public static final int SECOND_IN_NANO = 1000000000; + public static final int SECOND_IN_MILLI = 1000; public enum Opt { BROKER("b", "broker", true, "Broker URL. E.g., tcp://127.0.0.1:1883", null, true), - VERSION("v", "version", true, "MQTT version (\"3\" for 3.1.1 or \"5\" for 5.0).", "5"), + MQTT_VERSION("v", "version", true, "MQTT version (\"3\" for 3.1.1 or \"5\" for 5.0).", "5"), NUM_PUB("p", "npub", true, "Number of publishers.", "1"), NUM_SUB("s", "nsub", true, "Number of subscribers.", "1"), PUB_QOS("pq", "pubqos", true, "QoS level of publishers (0/1/2).", "0"), @@ -37,10 +44,9 @@ public enum Opt { INTERVAL("i", "interval", true, "Publish interval in milliseconds.", "0"), SUB_TIMEOUT("st", "subtimeout", true, "Subscribers' timeout in seconds.", "5"), EXEC_TIME("et", "exectime", true, "Execution time in seconds.", "60"), - LOG_LEVEL("l", "log", true, "Log level (SEVERE/WARNING/INFO/ALL).", "WARNING"), + LOG_LEVEL("l", "log", true, "Log level (SEVERE/WARNING/INFO/ALL).", "INFO"), NTP("n", "ntp", true, "NTP server. E.g., ntp.nict.jp", null), - TH_FILE("tf", "thfile", true, "File name for throughput data.", null), - LT_FILE("lf", "ltfile", true, "File name for latency data.", null), + IN_MEMORY("im", "inmemory", false, "Enable in-memory mode", null), HELP("h", "help", false, "Display help.", null); private String name; diff --git a/src/main/java/mqttloader/Loader.java b/src/main/java/mqttloader/Loader.java index 4c6d7e0..0875e99 100644 --- a/src/main/java/mqttloader/Loader.java +++ b/src/main/java/mqttloader/Loader.java @@ -16,30 +16,39 @@ package mqttloader; +import static java.lang.System.exit; import static mqttloader.Constants.Opt; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStreamReader; import java.net.InetAddress; +import java.net.MalformedURLException; import java.net.SocketException; +import java.net.URISyntaxException; +import java.net.URL; import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.Iterator; +import java.util.StringTokenizer; import java.util.Timer; import java.util.TreeMap; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; -import mqttloader.client.IClient; -import mqttloader.client.Publisher; +import mqttloader.client.AbstractClient; +import mqttloader.client.AbstractPublisher; +import mqttloader.client.PublisherV5; import mqttloader.client.PublisherV3; -import mqttloader.client.Subscriber; +import mqttloader.client.SubscriberV5; import mqttloader.client.SubscriberV3; -import mqttloader.record.Latency; -import mqttloader.record.Throughput; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; @@ -51,12 +60,15 @@ public class Loader { private CommandLine cmd = null; - private ArrayList publishers = new ArrayList<>(); - private ArrayList subscribers = new ArrayList<>(); - public static volatile long startTime; + private ArrayList publishers = new ArrayList<>(); + private ArrayList subscribers = new ArrayList<>(); + public static volatile long startTime = 0; + public static volatile long startNanoTime = 0; private long endTime; - public static volatile long offset = 0; public static volatile long lastRecvTime; + public static ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1000000); + private File file; + private Recorder recorder; public static CountDownLatch countDownLatch; public static Logger logger = Logger.getLogger(Loader.class.getName()); private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS z"); @@ -64,6 +76,10 @@ public class Loader { public Loader(String[] args) { setOptions(args); + String logLevel = cmd.getOptionValue(Opt.LOG_LEVEL.getName(), Opt.LOG_LEVEL.getDefaultValue()); + logger.setLevel(Level.parse(logLevel)); + logger.info("MQTTLoader version " + Constants.VERSION + " starting."); + int numPub = Integer.valueOf(cmd.getOptionValue(Opt.NUM_PUB.getName(), Opt.NUM_PUB.getDefaultValue())); int numSub = Integer.valueOf(cmd.getOptionValue(Opt.NUM_SUB.getName(), Opt.NUM_SUB.getDefaultValue())); if (numSub > 0) { @@ -72,18 +88,17 @@ public Loader(String[] args) { countDownLatch = new CountDownLatch(numPub); } - String logLevel = cmd.getOptionValue(Opt.LOG_LEVEL.getName(), Opt.LOG_LEVEL.getDefaultValue()); - logger.setLevel(Level.parse(logLevel)); - - logger.info("Starting mqttloader tool."); logger.info("Preparing clients."); prepareClients(); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); + boolean inMemory = cmd.hasOption(Opt.IN_MEMORY.getName()); + if(!inMemory) { + file = getFile(); + logger.info("Output file placed at: "+file.getAbsolutePath()); } + recorder = new Recorder(file, inMemory); + Thread fileThread = new Thread(recorder); + fileThread.start(); logger.info("Starting measurement."); startMeasurement(); @@ -95,8 +110,8 @@ public Loader(String[] args) { } int execTime = Integer.valueOf(cmd.getOptionValue(Opt.EXEC_TIME.getName(), Opt.EXEC_TIME.getDefaultValue())); - long holdTime = startTime - Util.getTime(); - if(holdTime > 0) execTime += (int)holdTime; + long holdNanoTime = Util.getElapsedNanoTime(); + if(holdNanoTime > 0) execTime += (int)(holdNanoTime/Constants.MILLISECOND_IN_NANO); try { countDownLatch.await(execTime, TimeUnit.SECONDS); } catch (InterruptedException e) { @@ -114,20 +129,17 @@ public Loader(String[] args) { logger.info("Terminating clients."); disconnectClients(); - endTime = Util.getTime(); - - logger.info("Printing results."); - dataCleansing(); + endTime = Util.getCurrentTimeMillis(); - printThroughput(true); - System.out.println(); - printThroughput(false); - printLatency(); + queue.offer(Constants.STOP_SIGNAL); + try { + fileThread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } - String thFile = cmd.getOptionValue(Opt.TH_FILE.getName(), Opt.TH_FILE.getDefaultValue()); - String ltFile = cmd.getOptionValue(Opt.LT_FILE.getName(), Opt.LT_FILE.getDefaultValue()); - if(thFile!=null) thToFile(); - if(ltFile!=null) ltToFile(); + logger.info("Calculating results."); + calcResult(); } private void setOptions(String[] args) { @@ -143,7 +155,7 @@ private void setOptions(String[] args) { for(String arg: args){ if(arg.equals("-"+Opt.HELP.getName()) || arg.equals("--"+options.getOption(Opt.HELP.getName()).getLongOpt())){ printHelp(options); - System.exit(0); + exit(0); } } @@ -153,19 +165,72 @@ private void setOptions(String[] args) { } catch (ParseException e) { logger.severe("Failed to parse options."); printHelp(options); - System.exit(1); + exit(1); + } + + // Validate arguments. + int version = Integer.valueOf(cmd.getOptionValue(Opt.MQTT_VERSION.getName(), Opt.MQTT_VERSION.getDefaultValue())); + if(version != 3 && version != 5) { + logger.warning("\"-v\" parameter value must be 3 or 5."); + exit(1); + } + int pubqos = Integer.valueOf(cmd.getOptionValue(Opt.PUB_QOS.getName(), Opt.PUB_QOS.getDefaultValue())); + if(pubqos != 0 && pubqos != 1 && pubqos != 2) { + logger.warning("\"-pq\" parameter value must be 0 or 1 or 2."); + exit(1); + } + int subqos = Integer.valueOf(cmd.getOptionValue(Opt.SUB_QOS.getName(), Opt.SUB_QOS.getDefaultValue())); + if(subqos != 0 && subqos != 1 && subqos != 2) { + logger.warning("\"-sq\" parameter value must be 0 or 1 or 2."); + exit(1); + } + if(Integer.valueOf(cmd.getOptionValue(Opt.PAYLOAD.getName(), Opt.PAYLOAD.getDefaultValue())) < 8) { + logger.warning("\"-d\" parameter value must be equal to or larger than 8."); + exit(1); } } private void printHelp(Options options) { + System.out.println("MQTTLoader version " + Constants.VERSION); HelpFormatter help = new HelpFormatter(); help.setOptionComparator(null); help.printHelp(Loader.class.getName(), options, true); } + private File getFile() { + File file; + try { + URL url = Loader.class.getProtectionDomain().getCodeSource().getLocation(); + file = new File(new URL(url.toString()).toURI()); + if(file.getParentFile().getName().equals("lib")){ + file = file.getParentFile().getParentFile(); + } else { + file = new File("").getAbsoluteFile(); + } + } catch (SecurityException | NullPointerException | URISyntaxException | MalformedURLException e) { + file = new File("").getAbsoluteFile(); + } + String date = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date(System.currentTimeMillis()+getOffsetFromNtpServer())); + file = new File(file, Constants.FILE_NAME_PREFIX+date+".csv"); + + if(file.exists()) { + file.delete(); + } + try { + file.createNewFile(); + } catch (IOException e) { + e.printStackTrace(); + } + + return file; + } + private void prepareClients() { String broker = cmd.getOptionValue(Opt.BROKER.getName(), Opt.BROKER.getDefaultValue()); - int version = Integer.valueOf(cmd.getOptionValue(Opt.VERSION.getName(), Opt.VERSION.getDefaultValue())); + if(!broker.startsWith("tcp://") && !broker.startsWith("ssl://")) { + broker = "tcp://"+broker; + } + int version = Integer.valueOf(cmd.getOptionValue(Opt.MQTT_VERSION.getName(), Opt.MQTT_VERSION.getDefaultValue())); int numPub = Integer.valueOf(cmd.getOptionValue(Opt.NUM_PUB.getName(), Opt.NUM_PUB.getDefaultValue())); int numSub = Integer.valueOf(cmd.getOptionValue(Opt.NUM_SUB.getName(), Opt.NUM_SUB.getDefaultValue())); int pubQos = Integer.valueOf(cmd.getOptionValue(Opt.PUB_QOS.getName(), Opt.PUB_QOS.getDefaultValue())); @@ -178,24 +243,16 @@ private void prepareClients() { int pubInterval = Integer.valueOf(cmd.getOptionValue(Opt.INTERVAL.getName(), Opt.INTERVAL.getDefaultValue())); for(int i=0;i list = pub.getThroughputs(); - if(!list.isEmpty()) { - int first = list.get(0).getSlot(); - int last = list.get(list.size()-1).getSlot(); - if(first < pubFirstSlot) pubFirstSlot = first; - if(last > pubLastSlot) pubLastSlot = last; - } - } + private void calcResult() { + if(!cmd.hasOption(Opt.IN_MEMORY.getName())) { + FileInputStream fis = null; + InputStreamReader isr = null; + BufferedReader br = null; + try{ + fis = new FileInputStream(file); + isr = new InputStreamReader(fis); + br = new BufferedReader(isr); + + String str; + while ((str = br.readLine()) != null) { + StringTokenizer st = new StringTokenizer(str, ","); + long timestamp = Long.valueOf(st.nextToken()); + String clientId = st.nextToken(); //client ID + boolean isSend = st.nextToken().equals("S") ? true : false; + int latency = -1; + if (st.hasMoreTokens()) { + latency = Integer.valueOf(st.nextToken()); + } - for(IClient pub: publishers) { - Iterator itr = pub.getThroughputs().iterator(); - while(itr.hasNext()){ - Throughput th = itr.next(); - if(th.getSlot() < rampup+pubFirstSlot) { - itr.remove(); - }else if(th.getSlot() > pubLastSlot-rampdown){ - itr.remove(); + recorder.recordInMemory(new Record(timestamp, clientId, isSend, latency)); } - } - } - int subFirstSlot = Integer.MAX_VALUE; - int subLastSlot = 0; - for(IClient sub: subscribers){ - ArrayList list = sub.getThroughputs(); - if(!list.isEmpty()) { - int first = list.get(0).getSlot(); - int last = list.get(list.size()-1).getSlot(); - if(first < subFirstSlot) subFirstSlot = first; - if(last > subLastSlot) subLastSlot = last; + br.close(); + isr.close(); + fis.close(); + } catch(IOException e){ + e.printStackTrace(); + } finally { + try { + if(br != null) br.close(); + if(isr != null) isr.close(); + if(fis != null) fis.close(); + } catch (IOException e) { + e.printStackTrace(); + System.exit(1); + } } } - for(IClient sub: subscribers){ - Iterator itrTh = sub.getThroughputs().iterator(); - while(itrTh.hasNext()){ - Throughput th = itrTh.next(); - if(th.getSlot() < rampup+subFirstSlot) { - itrTh.remove(); - }else if(th.getSlot() > subLastSlot-rampdown){ - itrTh.remove(); - } - } + TreeMap sendThroughputs = recorder.getSendThroughputs(); + TreeMap recvThroughputs = recorder.getRecvThroughputs(); + TreeMap latencySums = recorder.getLatencySums(); + TreeMap latencyMaxs = recorder.getLatencyMaxs(); - Iterator itrLt = sub.getLatencies().iterator(); - while(itrLt.hasNext()){ - Latency lt = itrLt.next(); - if(lt.getSlot() < rampup+subFirstSlot) { - itrLt.remove(); - }else if(lt.getSlot() > subLastSlot-rampdown){ - itrLt.remove(); - } + int rampup = Integer.valueOf(cmd.getOptionValue(Opt.RAMP_UP.getName(), Opt.RAMP_UP.getDefaultValue())); + int rampdown = Integer.valueOf(cmd.getOptionValue(Opt.RAMP_DOWN.getName(), Opt.RAMP_DOWN.getDefaultValue())); + + trimTreeMap(sendThroughputs, rampup, rampdown); + trimTreeMap(recvThroughputs, rampup, rampdown); + trimTreeMap(latencySums, rampup, rampdown); + trimTreeMap(latencyMaxs, rampup, rampdown); + + paddingTreeMap(sendThroughputs); + paddingTreeMap(recvThroughputs); + + System.out.println("-----Publisher-----"); + printThroughput(sendThroughputs, true); + System.out.println(); + System.out.println("-----Subscriber-----"); + printThroughput(recvThroughputs, false); + + int maxLt = 0; + double aveLt = 0; + long numMsg = 0; + for(int elapsedSecond: latencySums.keySet()) { + if(latencyMaxs.get(elapsedSecond) > maxLt) { + maxLt = latencyMaxs.get(elapsedSecond); } + int numInSec = recvThroughputs.get(elapsedSecond); + numMsg += numInSec; + double aveInSec = (double)latencySums.get(elapsedSecond)/numInSec; + aveLt = aveLt + ((aveInSec-aveLt)*numInSec)/numMsg; } + + System.out.println("Maximum latency[ms]: "+maxLt); + System.out.println("Average latency[ms]: "+aveLt); } - private void printThroughput(boolean forPub) { - TreeMap thTotal = new TreeMap<>(); - ArrayList clients; - if(forPub){ - clients = publishers; - }else{ - clients = subscribers; + private void trimTreeMap(TreeMap map, int rampup, int rampdown) { + if(map.size() == 0) { + return; + } + int firstTime = map.firstKey(); + int lastTime = map.lastKey(); + Iterator itr = map.keySet().iterator(); + while(itr.hasNext()){ + int time = itr.next(); + if(time < rampup+firstTime) { + itr.remove(); + }else if(time > lastTime-rampdown){ + itr.remove(); + } } + } - for(IClient client: clients){ - ArrayList ths = client.getThroughputs(); - for(Throughput th : ths) { - if(thTotal.containsKey(th.getSlot())){ - thTotal.put(th.getSlot(), thTotal.get(th.getSlot())+th.getCount()); - }else{ - thTotal.put(th.getSlot(), th.getCount()); - } - } + private void paddingTreeMap(TreeMap map) { + if(map.size() == 0) { + return; } - if(thTotal.size()>0){ - for(int i=thTotal.firstKey(); i<=thTotal.lastKey(); i++){ - if(!thTotal.containsKey(i)){ - thTotal.put(i, 0); - } + for(int i=map.firstKey();i throughputs, boolean forPublisher) { int maxTh = 0; int sumMsg = 0; - for(int slot: thTotal.keySet()){ - int th = thTotal.get(slot); + for(int elapsedSecond: throughputs.keySet()){ + int th = throughputs.get(elapsedSecond); if(th > maxTh) { maxTh = th; } sumMsg += th; } - double aveTh = thTotal.size()>0 ? (double)sumMsg/thTotal.size() : 0; - if(forPub){ - System.out.println("-----Publisher-----"); - }else{ - System.out.println("-----Subscriber-----"); - } + double aveTh = throughputs.size()>0 ? (double)sumMsg/throughputs.size() : 0; System.out.println("Maximum throughput[msg/s]: "+maxTh); System.out.println("Average throughput[msg/s]: "+aveTh); - if(forPub){ + if(forPublisher){ System.out.println("Number of published messages: "+sumMsg); }else{ System.out.println("Number of received messages: "+sumMsg); } - System.out.print("Throughput[msg/s]: "); - for(int slot: thTotal.keySet()){ - System.out.print(thTotal.get(slot)); - if(slot maxLt) maxLt = lt; - sumLt += lt; - count++; - } - } - double aveLt = count>0 ? (double)sumLt/count : 0; - - System.out.println("Maximum latency[ms]: "+maxLt); - System.out.println("Average latency[ms]: "+aveLt); - } - - private void thToFile(){ - StringBuilder sb = new StringBuilder(); - - String sTime = sdf.format(new Date(startTime)); - String eTime = sdf.format(new Date(endTime)); - sb.append("Measurement start time: "+sTime+"\n"); - sb.append("Measurement end time: "+eTime+"\n"); - - sb.append("SLOT"); - for(int i=0;i - TreeMap> thAggregate = new TreeMap<>(); - for(int i=0;i pubth = publishers.get(i).getThroughputs(); - for(Throughput th: pubth) { - if(!thAggregate.containsKey(th.getSlot())){ - TreeMap map = new TreeMap<>(); - thAggregate.put(th.getSlot(), map); - } - thAggregate.get(th.getSlot()).put(i, th.getCount()); - } - } - for(int i=0;i subth = subscribers.get(i).getThroughputs(); - for(Throughput th: subth) { - if(!thAggregate.containsKey(th.getSlot())){ - TreeMap map = new TreeMap<>(); - thAggregate.put(th.getSlot(), map); - } - thAggregate.get(th.getSlot()).put(i+publishers.size(), th.getCount()); - } - } - - int numClients = publishers.size()+subscribers.size(); - if(thAggregate.size()>0){ - for(int slot=thAggregate.firstKey();slot0) sb.append(", "); - sb.append(subscribers.get(i).getClientId()); - } - sb.append("\n"); - - int index = 0; - while(true) { - StringBuilder lineSb = new StringBuilder(); - boolean hasNext = false; - for(int i=0;iindex){ - lt = subscribers.get(i).getLatencies().get(index).getLatency(); - hasNext = true; - } - if(i>0) lineSb.append(", "); - lineSb.append(lt); - } - lineSb.append("\n"); - if(hasNext){ - index++; - sb.append(lineSb); - }else{ - break; - } - } - - String ltFile = cmd.getOptionValue(Opt.LT_FILE.getName(), Opt.LT_FILE.getDefaultValue()); - Util.output(ltFile, sb.toString(), false); - } - public static void main(String[] args){ new Loader(args); } diff --git a/src/main/java/mqttloader/LogFormatter.java b/src/main/java/mqttloader/LogFormatter.java index e0e5a1d..64317a4 100644 --- a/src/main/java/mqttloader/LogFormatter.java +++ b/src/main/java/mqttloader/LogFormatter.java @@ -25,11 +25,12 @@ public class LogFormatter extends Formatter { @Override public String format(LogRecord record) { return String.format( - "[%s] %s %s#%s %s%n", +// "[%s] %s %s#%s %s%n", + "[%s] %s %s%n", record.getLevel().getName(), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS z").format(record.getMillis()), - record.getSourceClassName(), - record.getSourceMethodName(), +// record.getSourceClassName(), +// record.getSourceMethodName(), record.getMessage() ); } diff --git a/src/main/java/mqttloader/Record.java b/src/main/java/mqttloader/Record.java new file mode 100644 index 0000000..8161476 --- /dev/null +++ b/src/main/java/mqttloader/Record.java @@ -0,0 +1,61 @@ +/* + * Copyright 2020 Distributed Systems Group + * + *

Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mqttloader; + +public class Record { + private long timestamp; + private String clientId; + private boolean isSend; + private int latency; + + private boolean isStopSignal = false; + + public Record(long timestamp, String clientId, boolean isSend, int latency) { + this.timestamp = timestamp; + this.clientId = clientId; + this.isSend = isSend; + this.latency = latency; + } + + public Record(long timestamp, String clientId, boolean isSend) { + this(timestamp, clientId, isSend, -1); + } + + public Record() { + this.isStopSignal = true; + } + + public long getTimestamp() { + return timestamp; + } + + public String getClientId() { + return clientId; + } + + public boolean isSend() { + return isSend; + } + + public int getLatency() { + return latency; + } + + public boolean isStopSignal() { + return isStopSignal; + } +} diff --git a/src/main/java/mqttloader/Recorder.java b/src/main/java/mqttloader/Recorder.java new file mode 100644 index 0000000..e5f50c0 --- /dev/null +++ b/src/main/java/mqttloader/Recorder.java @@ -0,0 +1,158 @@ +/* + * Copyright 2020 Distributed Systems Group + * + *

Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mqttloader; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.TreeMap; + +public class Recorder implements Runnable { + private final boolean inMemory; + + private File file; + private FileOutputStream fos = null; + private OutputStreamWriter osw = null; + private BufferedWriter bw = null; + + private TreeMap sendThroughputs = new TreeMap<>(); + private TreeMap recvThroughputs = new TreeMap<>(); + private TreeMap latencySums = new TreeMap<>(); + private TreeMap latencyMaxs = new TreeMap<>(); + + public Recorder(File file, boolean inMemory) { + this.inMemory = inMemory; + if(!inMemory) { + try { + fos = new FileOutputStream(file, true); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + osw = new OutputStreamWriter(fos); + bw = new BufferedWriter(osw); + } + } + + @Override + public void run() { + Record record = null; + while (true) { + try { + record = Loader.queue.take(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if(record != null) { + if(record.isStopSignal()) { + break; + } + + if(inMemory) { + recordInMemory(record); + } else { + StringBuilder sb = new StringBuilder(); + sb.append(record.getTimestamp()); + sb.append(","); + sb.append(record.getClientId()); + if(record.isSend()) { + sb.append(",S,"); + } else { + sb.append(",R,"); + sb.append(record.getLatency()); + } + + try { + bw.write(new String(sb)); + bw.newLine(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + if(!inMemory) { + try{ + bw.flush(); + bw.close(); + osw.close(); + fos.close(); + } catch(IOException e){ + e.printStackTrace(); + } finally { + try { + if(bw != null) bw.close(); + if(osw != null) osw.close(); + if(fos != null) fos.close(); + } catch (IOException e) { + e.printStackTrace(); + System.exit(1); + } + } + } + } + + public void recordInMemory(Record record) { + int elapsedSecond = (int)((record.getTimestamp()-Loader.startTime)/1000); + if(record.isSend()) { + if(sendThroughputs.containsKey(elapsedSecond)) { + sendThroughputs.put(elapsedSecond, sendThroughputs.get(elapsedSecond)+1); + } else { + sendThroughputs.put(elapsedSecond, 1); + } + } else { + if(recvThroughputs.containsKey(elapsedSecond)) { + recvThroughputs.put(elapsedSecond, recvThroughputs.get(elapsedSecond)+1); + } else { + recvThroughputs.put(elapsedSecond, 1); + } + + if(latencySums.containsKey(elapsedSecond)) { + latencySums.put(elapsedSecond, latencySums.get(elapsedSecond)+(long)record.getLatency()); + } else { + latencySums.put(elapsedSecond, (long)record.getLatency()); + } + + if(latencyMaxs.containsKey(elapsedSecond)) { + if(latencyMaxs.get(elapsedSecond) < record.getLatency()) { + latencyMaxs.put(elapsedSecond, record.getLatency()); + } + } else { + latencyMaxs.put(elapsedSecond, record.getLatency()); + } + } + } + + public TreeMap getSendThroughputs() { + return sendThroughputs; + } + + public TreeMap getRecvThroughputs() { + return recvThroughputs; + } + + public TreeMap getLatencySums() { + return latencySums; + } + + public TreeMap getLatencyMaxs() { + return latencyMaxs; + } +} diff --git a/src/main/java/mqttloader/RecvTimeoutTask.java b/src/main/java/mqttloader/RecvTimeoutTask.java index 2817379..74dc064 100644 --- a/src/main/java/mqttloader/RecvTimeoutTask.java +++ b/src/main/java/mqttloader/RecvTimeoutTask.java @@ -26,6 +26,11 @@ public class RecvTimeoutTask extends TimerTask { private Timer timer; private int subTimeout; + /** + * + * @param timer Timer instance for this task. + * @param subTimeout Timeout value in second. + */ public RecvTimeoutTask(Timer timer, int subTimeout) { this.timer = timer; this.subTimeout = subTimeout; @@ -33,9 +38,9 @@ public RecvTimeoutTask(Timer timer, int subTimeout) { @Override public void run() { - long remainingTime = subTimeout*1000 - (Util.getTime() - lastRecvTime); // - + long remainingTime = subTimeout*Constants.SECOND_IN_MILLI - (Util.getCurrentTimeMillis() - lastRecvTime); // - if (remainingTime <= 0) { - Loader.logger.info("Receiving messages on subscribers timed out."); + Loader.logger.info("Subscribers timed out."); countDownLatch.countDown(); } else { timer.schedule(new RecvTimeoutTask(timer, subTimeout), remainingTime); diff --git a/src/main/java/mqttloader/Util.java b/src/main/java/mqttloader/Util.java index 6c485a9..c940181 100644 --- a/src/main/java/mqttloader/Util.java +++ b/src/main/java/mqttloader/Util.java @@ -16,58 +16,34 @@ package mqttloader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; import java.nio.ByteBuffer; +import java.util.Random; public class Util { - public static void output(String filename, String str, boolean append){ - File file = new File(filename); + private static final String chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"; + private static Random random = new Random(); - if(!file.exists() || file == null){ - try { - file.createNewFile(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - FileOutputStream fos = null; - OutputStreamWriter osw = null; - BufferedWriter bw = null; - try{ - fos = new FileOutputStream(file, append); - osw = new OutputStreamWriter(fos); - bw = new BufferedWriter(osw); - - bw.write(str); - - bw.close(); - osw.close(); - fos.close(); - } catch(IOException e){ - e.printStackTrace(); - } finally { - try { - if(bw != null) bw.close(); - if(osw != null) osw.close(); - if(fos != null) fos.close(); - } catch (IOException e) { - e.printStackTrace(); - System.exit(1); - } + public static String genRandomChars(int length) { + StringBuilder sb = new StringBuilder(); + for(int i=0;i getThroughputs(); - ArrayList getLatencies(); + public abstract void disconnect(); + + public String getClientId() { + return clientId; + } } diff --git a/src/main/java/mqttloader/client/AbstractPublisher.java b/src/main/java/mqttloader/client/AbstractPublisher.java new file mode 100644 index 0000000..6c29077 --- /dev/null +++ b/src/main/java/mqttloader/client/AbstractPublisher.java @@ -0,0 +1,121 @@ +/* + * Copyright 2020 Distributed Systems Group + * + *

Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mqttloader.client; + +import static mqttloader.Constants.PUB_CLIENT_ID_PREFIX; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import mqttloader.Loader; +import mqttloader.Record; +import mqttloader.Util; + +public abstract class AbstractPublisher extends AbstractClient implements Runnable { + protected final String topic; + protected final int payloadSize; + protected int numMessage; + protected final int pubInterval; + + protected ScheduledExecutorService service; + protected ScheduledFuture future; + + protected volatile boolean cancelled = false; + + public AbstractPublisher(int clientNumber, String topic, int payloadSize, int numMessage, int pubInterval) { + super(PUB_CLIENT_ID_PREFIX + String.format("%05d", clientNumber)); + this.topic = topic; + this.payloadSize = payloadSize; + this.numMessage = numMessage; + this.pubInterval = pubInterval; + } + + public void start(long delay) { + service = Executors.newSingleThreadScheduledExecutor(); + if(pubInterval==0){ + future = service.schedule(this, delay, TimeUnit.MILLISECONDS); + }else{ + future = service.scheduleAtFixedRate(this, delay, pubInterval, TimeUnit.MILLISECONDS); + } + } + + @Override + public void run() { + if(pubInterval==0){ + continuousRun(); + }else{ + periodicalRun(); + } + } + + private void continuousRun() { + for(int i=0;i 0) { + if(isConnected()) { + publish(); + } else { + Loader.logger.warning("Failed to publish (" + clientId + ")."); + } + + numMessage--; + if(numMessage==0){ + Loader.logger.info("Completed to publish (" + clientId + ")."); + Loader.countDownLatch.countDown(); + } + } + } + + protected void recordSend(long currentTime) { + Loader.queue.offer(new Record(currentTime, clientId, true)); + Loader.logger.fine("Published a message to topic \"" + topic + "\" (" + clientId + ")."); + } + + protected void terminateTasks() { + if(!future.isDone()) { + cancelled = true; + future.cancel(false); + } + + service.shutdown(); + try { + service.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + protected abstract void publish(); + protected abstract boolean isConnected(); +} diff --git a/src/main/java/mqttloader/client/AbstractSubscriber.java b/src/main/java/mqttloader/client/AbstractSubscriber.java new file mode 100644 index 0000000..2f7a3cd --- /dev/null +++ b/src/main/java/mqttloader/client/AbstractSubscriber.java @@ -0,0 +1,53 @@ +/* + * Copyright 2020 Distributed Systems Group + * + *

Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mqttloader.client; + +import static mqttloader.Constants.SUB_CLIENT_ID_PREFIX; + +import java.nio.ByteBuffer; + +import mqttloader.Loader; +import mqttloader.Record; +import mqttloader.Util; + +public abstract class AbstractSubscriber extends AbstractClient { + public AbstractSubscriber(int clientNumber) { + super(SUB_CLIENT_ID_PREFIX + String.format("%05d", clientNumber)); + } + + protected void recordReceive(String topic, byte[] payload) { + // Skip if preparation has not been completed yet. + // Time calculation methods in Util class, such as Util.getCurrentTimeMillis(), need startTime and startNanoTime have already been set. + if(Loader.startTime==0 || Loader.startNanoTime==0) { + return; + } + + long currentTime = Util.getCurrentTimeMillis(); + long pubTime = ByteBuffer.wrap(payload).getLong(); + + int latency = (int)(currentTime - pubTime); + if (latency < 0) { + // If running MQTTLoader on multiple machines, a slight time error may cause a negative value of latency. + latency = 0; + Loader.logger.fine("Negative value of latency is converted to zero."); + } + + Loader.queue.offer(new Record(currentTime, clientId, false, latency)); + Loader.lastRecvTime = currentTime; + Loader.logger.fine("Received a message on topic \"" + topic + "\" (" + clientId + ")."); + } +} diff --git a/src/main/java/mqttloader/client/Publisher.java b/src/main/java/mqttloader/client/Publisher.java deleted file mode 100644 index e4bc578..0000000 --- a/src/main/java/mqttloader/client/Publisher.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Copyright 2020 Distributed Systems Group - * - *

Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *

http://www.apache.org/licenses/LICENSE-2.0 - * - *

Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package mqttloader.client; - -import static mqttloader.Constants.PUB_CLIENT_ID_PREFIX; - -import java.util.ArrayList; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import mqttloader.Loader; -import mqttloader.Util; -import mqttloader.record.Latency; -import mqttloader.record.Throughput; -import org.eclipse.paho.mqttv5.client.MqttClient; -import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; -import org.eclipse.paho.mqttv5.common.MqttException; -import org.eclipse.paho.mqttv5.common.MqttMessage; - -public class Publisher implements Runnable, IClient { - private MqttClient client; - private final String clientId; - private final String topic; - private final int payloadSize; - private int numMessage; - private final int pubInterval; - private MqttMessage message = new MqttMessage(); - - // If change publisher to be multi-threaded, throughputs (and others) should be thread-safe. - private ArrayList throughputs = new ArrayList<>(); - - private ScheduledExecutorService service; - private ScheduledFuture future; - - private volatile boolean cancelled = false; - - public Publisher(int clientNumber, String broker, int qos, boolean retain, String topic, int payloadSize, int numMessage, int pubInterval) { - message.setQos(qos); - message.setRetained(retain); - this.topic = topic; - this.payloadSize = payloadSize; - this.numMessage = numMessage; - this.pubInterval = pubInterval; - - clientId = PUB_CLIENT_ID_PREFIX + String.format("%06d", clientNumber); - MqttConnectionOptions options = new MqttConnectionOptions(); - try { - client = new MqttClient(broker, clientId); - client.connect(options); - Loader.logger.info("Publisher client is connected: "+clientId); - } catch (MqttException e) { - Loader.logger.warning("Publisher client fails to connect: "+clientId); - e.printStackTrace(); - } - } - - @Override - public void start(long delay) { - service = Executors.newSingleThreadScheduledExecutor(); - if(pubInterval==0){ - future = service.schedule(this, delay, TimeUnit.MILLISECONDS); - }else{ - future = service.scheduleAtFixedRate(this, delay, pubInterval, TimeUnit.MILLISECONDS); - } - } - - @Override - public void run() { - if(pubInterval==0){ - continuousRun(); - }else{ - periodicalRun(); - } - } - - public void continuousRun() { - for(int i=0;i 0) { - if(client.isConnected()) { - publish(); - } else { - Loader.logger.warning("On sending publish, client was not connected: "+clientId); - } - - numMessage--; - if(numMessage==0){ - Loader.logger.info("Publisher finishes to send publish: "+clientId); - Loader.countDownLatch.countDown(); - } - } - } - - public void publish() { - message.setPayload(Util.genPayloads(payloadSize)); - try{ - client.publish(topic, message); - } catch(MqttException me) { - Loader.logger.warning("On sending publish, MqttException occurred: "+clientId); - me.printStackTrace(); - } - - int slot = (int)((Util.getTime()-Loader.startTime)/1000); - if(throughputs.size()>0){ - Throughput lastTh = throughputs.get(throughputs.size()-1); - if(lastTh.getSlot() == slot) { - lastTh.setCount(lastTh.getCount()+1); - }else{ - throughputs.add(new Throughput(slot, 1)); - } - }else{ - throughputs.add(new Throughput(slot, 1)); - } - - Loader.logger.fine("Published a message (" + topic + "): "+clientId); - } - - @Override - public void disconnect() { - if(!future.isDone()) { - cancelled = true; - future.cancel(false); - } - - service.shutdown(); - try { - service.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - if (client.isConnected()) { - try { - client.disconnect(); - Loader.logger.info("Publisher client is disconnected: "+clientId); - } catch (MqttException e) { - e.printStackTrace(); - } - } - } - - @Override - public String getClientId() { - return clientId; - } - - @Override - public ArrayList getThroughputs() { - return throughputs; - } - - @Override - public ArrayList getLatencies(){ - return null; - } -} diff --git a/src/main/java/mqttloader/client/PublisherV3.java b/src/main/java/mqttloader/client/PublisherV3.java index bcd821c..79cafcf 100644 --- a/src/main/java/mqttloader/client/PublisherV3.java +++ b/src/main/java/mqttloader/client/PublisherV3.java @@ -16,173 +16,66 @@ package mqttloader.client; -import static mqttloader.Constants.PUB_CLIENT_ID_PREFIX; - -import java.util.ArrayList; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - import mqttloader.Loader; import mqttloader.Util; -import mqttloader.record.Latency; -import mqttloader.record.Throughput; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -public class PublisherV3 implements Runnable, IClient { +public class PublisherV3 extends AbstractPublisher { private MqttClient client; - private final String clientId; - private final String topic; - private final int payloadSize; - private int numMessage; - private final int pubInterval; private MqttMessage message = new MqttMessage(); - // If change publisher to be multi-threaded, throughputs (and others) should be thread-safe. - private ArrayList throughputs = new ArrayList<>(); - - private ScheduledExecutorService service; - private ScheduledFuture future; - - private volatile boolean cancelled = false; - public PublisherV3(int clientNumber, String broker, int qos, boolean retain, String topic, int payloadSize, int numMessage, int pubInterval) { + super(clientNumber, topic, payloadSize, numMessage, pubInterval); message.setQos(qos); message.setRetained(retain); - this.topic = topic; - this.payloadSize = payloadSize; - this.numMessage = numMessage; - this.pubInterval = pubInterval; - clientId = PUB_CLIENT_ID_PREFIX + String.format("%06d", clientNumber); MqttConnectOptions options = new MqttConnectOptions(); options.setMqttVersion(4); + options.setCleanSession(true); try { - client = new MqttClient(broker, clientId); + client = new MqttClient(broker, clientId, new MemoryPersistence()); client.connect(options); - Loader.logger.info("Publisher client is connected: "+clientId); + Loader.logger.info("Publisher " + clientId + " connected."); } catch (MqttException e) { - Loader.logger.warning("Publisher client fails to connect: "+clientId); + Loader.logger.warning("Publisher failed to connect (" + clientId + ")."); e.printStackTrace(); + System.exit(1); } } @Override - public void start(long delay) { - service = Executors.newSingleThreadScheduledExecutor(); - if(pubInterval==0){ - future = service.schedule(this, delay, TimeUnit.MILLISECONDS); - }else{ - future = service.scheduleAtFixedRate(this, delay, pubInterval, TimeUnit.MILLISECONDS); - } - } - - @Override - public void run() { - if(pubInterval==0){ - continuousRun(); - }else{ - periodicalRun(); - } - } - - public void continuousRun() { - for(int i=0;i 0) { - if (client.isConnected()) { - publish(); - } else { - Loader.logger.warning("On sending publish, client was not connected: "+clientId); - } - - numMessage--; - if(numMessage==0){ - Loader.logger.info("Publisher finishes to send publish: "+clientId); - Loader.countDownLatch.countDown(); - } - } - } - - public void publish() { - message.setPayload(Util.genPayloads(payloadSize)); + protected void publish() { + long currentTime = Util.getCurrentTimeMillis(); + message.setPayload(Util.genPayloads(payloadSize, currentTime)); try { client.publish(topic, message); - } catch (MqttException e) { - Loader.logger.warning("On sending publish, MqttException occurred: "+clientId); - e.printStackTrace(); + } catch (MqttException me) { + me.printStackTrace(); } - int slot = (int)((Util.getTime()-Loader.startTime)/1000); - if(throughputs.size()>0){ - Throughput lastTh = throughputs.get(throughputs.size()-1); - if(lastTh.getSlot() == slot) { - lastTh.setCount(lastTh.getCount()+1); - }else{ - throughputs.add(new Throughput(slot, 1)); - } - }else{ - throughputs.add(new Throughput(slot, 1)); - } + recordSend(currentTime); + } - Loader.logger.fine("Published a message (" + topic + "): "+clientId); + @Override + protected boolean isConnected() { + return client.isConnected(); } @Override public void disconnect() { - if(!future.isDone()) { - cancelled = true; - future.cancel(false); - } - - service.shutdown(); - try { - service.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } + terminateTasks(); if (client.isConnected()) { try { client.disconnect(); - Loader.logger.info("Publisher client is disconnected: "+clientId); + Loader.logger.info("Publisher " + clientId + " disconnected."); } catch (MqttException e) { e.printStackTrace(); } } } - - @Override - public String getClientId() { - return clientId; - } - - @Override - public ArrayList getThroughputs() { - return throughputs; - } - - @Override - public ArrayList getLatencies(){ - return null; - } } diff --git a/src/main/java/mqttloader/client/PublisherV5.java b/src/main/java/mqttloader/client/PublisherV5.java new file mode 100644 index 0000000..0ef05b4 --- /dev/null +++ b/src/main/java/mqttloader/client/PublisherV5.java @@ -0,0 +1,80 @@ +/* + * Copyright 2020 Distributed Systems Group + * + *

Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mqttloader.client; + +import mqttloader.Loader; +import mqttloader.Util; +import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; + +public class PublisherV5 extends AbstractPublisher { + private MqttClient client; + private MqttMessage message = new MqttMessage(); + + public PublisherV5(int clientNumber, String broker, int qos, boolean retain, String topic, int payloadSize, int numMessage, int pubInterval) { + super(clientNumber, topic, payloadSize, numMessage, pubInterval); + message.setQos(qos); + message.setRetained(retain); + + MqttConnectionOptions options = new MqttConnectionOptions(); + options.setCleanStart(true); + try { + client = new MqttClient(broker, clientId, new MemoryPersistence()); + client.connect(options); + Loader.logger.info("Publisher " + clientId + " connected."); + } catch (MqttException e) { + Loader.logger.warning("Publisher failed to connect (" + clientId + ")."); + e.printStackTrace(); + System.exit(1); + } + } + + @Override + protected void publish() { + long currentTime = Util.getCurrentTimeMillis(); + message.setPayload(Util.genPayloads(payloadSize, currentTime)); + try { + client.publish(topic, message); + } catch (MqttException me) { + me.printStackTrace(); + } + + recordSend(currentTime); + } + + @Override + protected boolean isConnected() { + return client.isConnected(); + } + + @Override + public void disconnect() { + terminateTasks(); + + if (client.isConnected()) { + try { + client.disconnect(); + Loader.logger.info("Publisher " + clientId + " disconnected."); + } catch (MqttException e) { + e.printStackTrace(); + } + } + } +} diff --git a/src/main/java/mqttloader/client/SubscriberV3.java b/src/main/java/mqttloader/client/SubscriberV3.java index 59d2fff..62f9f4e 100644 --- a/src/main/java/mqttloader/client/SubscriberV3.java +++ b/src/main/java/mqttloader/client/SubscriberV3.java @@ -16,106 +16,56 @@ package mqttloader.client; -import static mqttloader.Constants.SUB_CLIENT_ID_PREFIX; - -import java.nio.ByteBuffer; -import java.util.ArrayList; - import mqttloader.Loader; -import mqttloader.Util; -import mqttloader.record.Latency; -import mqttloader.record.Throughput; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -public class SubscriberV3 implements MqttCallback, IClient { +public class SubscriberV3 extends AbstractSubscriber implements MqttCallback { private MqttClient client; - private final String clientId; - - private ArrayList throughputs = new ArrayList<>(); - private ArrayList latencies = new ArrayList<>(); public SubscriberV3(int clientNumber, String broker, int qos, String topic) { - clientId = SUB_CLIENT_ID_PREFIX + String.format("%06d", clientNumber); + super(clientNumber); MqttConnectOptions options = new MqttConnectOptions(); options.setMqttVersion(4); + options.setCleanSession(true); try { - client = new MqttClient(broker, clientId); + client = new MqttClient(broker, clientId, new MemoryPersistence()); client.setCallback(this); client.connect(options); - Loader.logger.info("Subscriber client is connected: "+clientId); + Loader.logger.info("Subscriber " + clientId + " connected."); client.subscribe(topic, qos); - Loader.logger.info("Subscribed (" + topic + ", QoS:" + qos + "): " + clientId); + Loader.logger.info("Subscribed to topic \"" + topic + "\" with QoS " + qos + " (" + clientId + ")."); } catch (MqttException e) { - Loader.logger.warning("Subscriber client fails to connect: "+clientId); + Loader.logger.warning("Subscriber failed to connect (" + clientId + ")."); e.printStackTrace(); + System.exit(1); } } - @Override - public void start(long delay){ - } - @Override public void disconnect() { if (client.isConnected()) { try { client.disconnect(); - Loader.logger.info("Subscriber client is disconnected: "+clientId); + Loader.logger.info("Subscriber " + clientId + " disconnected."); } catch (MqttException e) { e.printStackTrace(); } } } - @Override - public String getClientId() { - return clientId; - } - - @Override - public ArrayList getThroughputs() { - return throughputs; - } - - @Override - public ArrayList getLatencies() { - return latencies; - } - @Override public void connectionLost(Throwable cause) {} @Override public void messageArrived(String topic, MqttMessage message) throws Exception { - long time = Util.getTime(); - int slot = (int)((time-Loader.startTime)/1000); - synchronized (throughputs) { - if(throughputs.size()>0){ - Throughput lastTh = throughputs.get(throughputs.size()-1); - if(lastTh.getSlot() == slot) { - lastTh.setCount(lastTh.getCount()+1); - }else{ - throughputs.add(new Throughput(slot, 1)); - } - }else{ - throughputs.add(new Throughput(slot, 1)); - } - } - - long pubTime = ByteBuffer.wrap(message.getPayload()).getLong(); - synchronized (latencies) { - latencies.add(new Latency(slot, (int)(time-pubTime))); - } - - Loader.lastRecvTime = time; - - Loader.logger.fine("Received a message (" + topic + "): "+clientId); + recordReceive(topic, message.getPayload()); } @Override diff --git a/src/main/java/mqttloader/client/Subscriber.java b/src/main/java/mqttloader/client/SubscriberV5.java similarity index 50% rename from src/main/java/mqttloader/client/Subscriber.java rename to src/main/java/mqttloader/client/SubscriberV5.java index 62b202e..176bb51 100644 --- a/src/main/java/mqttloader/client/Subscriber.java +++ b/src/main/java/mqttloader/client/SubscriberV5.java @@ -16,39 +16,29 @@ package mqttloader.client; -import static mqttloader.Constants.SUB_CLIENT_ID_PREFIX; - -import java.nio.ByteBuffer; -import java.util.ArrayList; - import mqttloader.Loader; -import mqttloader.Util; -import mqttloader.record.Latency; -import mqttloader.record.Throughput; import org.eclipse.paho.mqttv5.client.IMqttToken; import org.eclipse.paho.mqttv5.client.MqttCallback; import org.eclipse.paho.mqttv5.client.MqttClient; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse; +import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; -public class Subscriber implements MqttCallback, IClient { +public class SubscriberV5 extends AbstractSubscriber implements MqttCallback { private MqttClient client; - private final String clientId; - - private ArrayList throughputs = new ArrayList<>(); - private ArrayList latencies = new ArrayList<>(); - public Subscriber(int clientNumber, String broker, int qos, boolean shSub, String topic) { - clientId = SUB_CLIENT_ID_PREFIX + String.format("%06d", clientNumber); + public SubscriberV5(int clientNumber, String broker, int qos, boolean shSub, String topic) { + super(clientNumber); MqttConnectionOptions options = new MqttConnectionOptions(); + options.setCleanStart(true); try { - client = new MqttClient(broker, clientId); + client = new MqttClient(broker, clientId, new MemoryPersistence()); client.setCallback(this); client.connect(options); - Loader.logger.info("Subscriber client is connected: "+clientId); + Loader.logger.info("Subscriber " + clientId + " connected."); String t; if(shSub){ t = "$share/mqttload/"+topic; @@ -56,44 +46,26 @@ public Subscriber(int clientNumber, String broker, int qos, boolean shSub, Strin t = topic; } client.subscribe(t, qos); - Loader.logger.info("Subscribed (" + t + ", QoS:" + qos + "): " + clientId); + Loader.logger.info("Subscribed to topic \"" + t + "\" with QoS " + qos + " (" + clientId + ")."); } catch (MqttException e) { - Loader.logger.warning("Subscriber client fails to connect: "+clientId); + Loader.logger.warning("Subscriber failed to connect (" + clientId + ")."); e.printStackTrace(); + System.exit(1); } } - @Override - public void start(long delay){ - } - @Override public void disconnect() { if (client.isConnected()) { try { client.disconnect(); - Loader.logger.info("Subscriber client is disconnected: "+clientId); + Loader.logger.info("Subscriber " + clientId + " disconnected."); } catch (MqttException e) { e.printStackTrace(); } } } - @Override - public String getClientId() { - return clientId; - } - - @Override - public ArrayList getThroughputs() { - return throughputs; - } - - @Override - public ArrayList getLatencies() { - return latencies; - } - @Override public void disconnected(MqttDisconnectResponse disconnectResponse) {} @@ -102,29 +74,7 @@ public void mqttErrorOccurred(MqttException exception) {} @Override public void messageArrived(String topic, MqttMessage message) throws Exception { - long time = Util.getTime(); - int slot = (int)((time-Loader.startTime)/1000); - synchronized (throughputs) { - if(throughputs.size()>0){ - Throughput lastTh = throughputs.get(throughputs.size()-1); - if(lastTh.getSlot() == slot) { - lastTh.setCount(lastTh.getCount()+1); - }else{ - throughputs.add(new Throughput(slot, 1)); - } - }else{ - throughputs.add(new Throughput(slot, 1)); - } - } - - long pubTime = ByteBuffer.wrap(message.getPayload()).getLong(); - synchronized (latencies) { - latencies.add(new Latency(slot, (int)(time-pubTime))); - } - - Loader.lastRecvTime = time; - - Loader.logger.fine("Received a message (" + topic + "): "+clientId); + recordReceive(topic, message.getPayload()); } @Override diff --git a/src/main/java/mqttloader/record/Latency.java b/src/main/java/mqttloader/record/Latency.java deleted file mode 100644 index 00bf181..0000000 --- a/src/main/java/mqttloader/record/Latency.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2020 Distributed Systems Group - * - *

Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *

http://www.apache.org/licenses/LICENSE-2.0 - * - *

Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package mqttloader.record; - -public class Latency { - private final int slot; - private final int latency; - - public Latency(int slot, int latency) { - this.slot = slot; - this.latency = latency; - } - - public int getSlot() { - return slot; - } - - public int getLatency() { - return latency; - } -} diff --git a/src/main/java/mqttloader/record/Throughput.java b/src/main/java/mqttloader/record/Throughput.java deleted file mode 100644 index f6af83b..0000000 --- a/src/main/java/mqttloader/record/Throughput.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2020 Distributed Systems Group - * - *

Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *

http://www.apache.org/licenses/LICENSE-2.0 - * - *

Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package mqttloader.record; - -public class Throughput { - private final int slot; - private int count; - - public Throughput(int slot, int count) { - this.slot = slot; - this.count = count; - } - - public int getSlot() { - return slot; - } - - public int getCount() { - return count; - } - - public void setCount(int count) { - this.count = count; - } -}