Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added MILLIS_PER_BLOCK Streaming mode to the simulator #248

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions simulator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,17 @@ There are 2 configuration sets:
### BlockStreamConfig
Uses the prefix `blockStream` so all properties should start with `blockStream.`

| Key | Description | Default Value |
|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------|
| `generationMode` | The desired generation Mode to use, it can only be `DIR` or `AD_HOC` | `DIR` |
| `folderRootPath` | If the generationMode is DIR this will be used as the source of the recording to stream to the Block-Node | `` |
| `delayBetweenBlockItems` | The delay between each block item in nanoseconds | `1_500_000` |
| `managerImplementation` | The desired implementation of the BlockStreamManager to use, it can only be `BlockAsDirBlockStreamManager`, `BlockAsFileBlockStreamManager` or `BlockAsFileLargeDataSets` | `BlockAsFileBlockStreamManager` |
| `maxBlockItemsToStream` | exit condition for the simulator and the circular implementations such as `BlockAsDir` or `BlockAsFile` implementations | `10_000` |
| `paddedLength` | on the `BlockAsFileLargeDataSets` implementation, the length of the padded left zeroes `000001.blk.gz` | 36 |
| `fileExtension` | on the `BlockAsFileLargeDataSets` implementation, the extension of the files to be streamed | `.blk.gz` |
| Key | Description | Default Value |
|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------|
| `generationMode` | The desired generation Mode to use, it can only be `DIR` or `AD_HOC` | `DIR` |
| `folderRootPath` | If the generationMode is DIR this will be used as the source of the recording to stream to the Block-Node | `` |
| `delayBetweenBlockItems` | The delay between each block item in nanoseconds, only applicable when streamingMode=CONSTANT_RATE | `1_500_000` |
| `managerImplementation` | The desired implementation of the BlockStreamManager to use, it can only be `BlockAsDirBlockStreamManager`, `BlockAsFileBlockStreamManager` or `BlockAsFileLargeDataSets` | `BlockAsFileBlockStreamManager` |
| `maxBlockItemsToStream` | exit condition for the simulator and the circular implementations such as `BlockAsDir` or `BlockAsFile` implementations | `10_000` |
| `paddedLength` | on the `BlockAsFileLargeDataSets` implementation, the length of the padded left zeroes `000001.blk.gz` | 36 |
| `fileExtension` | on the `BlockAsFileLargeDataSets` implementation, the extension of the files to be streamed | `.blk.gz` |
| `streamingMode` | can either be `CONSTANT_RATE` or `MILLIS_PER_BLOCK`, if `CONSTANT_RATE` | `CONSTANT_RATE` |
| `millisecondsPerBlock` | if streamingMode is `MILLIS_PER_BLOCK` this will be the time to wait between blocks in milliseconds | `1_000` |

### GrpcConfig
Uses the prefix `grpc` so all properties should start with `grpc.`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package com.hedera.block.simulator;

import com.hedera.block.simulator.config.data.BlockStreamConfig;
import com.hedera.block.simulator.config.types.StreamingMode;
import com.hedera.block.simulator.exception.BlockSimulatorParsingException;
import com.hedera.block.simulator.generator.BlockStreamManager;
import com.hedera.block.simulator.grpc.PublishStreamGrpcClient;
import com.hedera.hapi.block.stream.Block;
import com.hedera.hapi.block.stream.BlockItem;
import com.swirlds.config.api.Configuration;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand All @@ -33,13 +35,13 @@ public class BlockStreamSimulatorApp {
private static final System.Logger LOGGER =
System.getLogger(BlockStreamSimulatorApp.class.getName());

Configuration configuration;
BlockStreamManager blockStreamManager;
PublishStreamGrpcClient publishStreamGrpcClient;
BlockStreamConfig blockStreamConfig;
private final BlockStreamManager blockStreamManager;
private final PublishStreamGrpcClient publishStreamGrpcClient;
private final BlockStreamConfig blockStreamConfig;
private final StreamingMode streamingMode;

private final int delayBetweenBlockItems;

private final int millisecondsPerBlock;
private final AtomicBoolean isRunning = new AtomicBoolean(false);

/**
Expand All @@ -54,12 +56,13 @@ public BlockStreamSimulatorApp(
@NonNull Configuration configuration,
@NonNull BlockStreamManager blockStreamManager,
@NonNull PublishStreamGrpcClient publishStreamGrpcClient) {
this.configuration = configuration;
this.blockStreamManager = blockStreamManager;
this.publishStreamGrpcClient = publishStreamGrpcClient;

blockStreamConfig = configuration.getConfigData(BlockStreamConfig.class);

streamingMode = blockStreamConfig.streamingMode();
millisecondsPerBlock = blockStreamConfig.millisecondsPerBlock();
delayBetweenBlockItems = blockStreamConfig.delayBetweenBlockItems();
}

Expand All @@ -71,12 +74,48 @@ public BlockStreamSimulatorApp(
* @throws IOException if an I/O error occurs
*/
public void start() throws InterruptedException, BlockSimulatorParsingException, IOException {
int delayMSBetweenBlockItems = delayBetweenBlockItems / 1_000_000;
int delayNSBetweenBlockItems = delayBetweenBlockItems % 1_000_000;

isRunning.set(true);
LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has started");

if (streamingMode == StreamingMode.MILLIS_PER_BLOCK) {
millisPerBlockStreaming();
} else {
constantRateStreaming();
}

LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped");
}

private void millisPerBlockStreaming()
throws IOException, InterruptedException, BlockSimulatorParsingException {

final long secondsPerBlockNanos = millisecondsPerBlock * 1_000_000L;

Block nextBlock = blockStreamManager.getNextBlock();
while (nextBlock != null) {
long startTime = System.nanoTime();
publishStreamGrpcClient.streamBlock(nextBlock);
long elapsedTime = System.nanoTime() - startTime;
long timeToDelay = secondsPerBlockNanos - elapsedTime;
if (timeToDelay > 0) {
Thread.sleep(timeToDelay / 1_000_000, (int) (timeToDelay % 1_000_000));
} else {
LOGGER.log(
System.Logger.Level.WARNING,
"Block Server is running behind, Streaming took longer than max expected: "
+ millisecondsPerBlock
+ " milliseconds");
}
nextBlock = blockStreamManager.getNextBlock();
}
LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped");
}

private void constantRateStreaming()
throws InterruptedException, IOException, BlockSimulatorParsingException {
int delayMSBetweenBlockItems = delayBetweenBlockItems / 1_000_000;
int delayNSBetweenBlockItems = delayBetweenBlockItems % 1_000_000;
boolean streamBlockItem = true;
int blockItemsStreamed = 0;

Expand Down Expand Up @@ -104,8 +143,6 @@ public void start() throws InterruptedException, BlockSimulatorParsingException,
streamBlockItem = false;
}
}

LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@

package com.hedera.block.simulator;

/** The Constants class defines the constants for the block simulator. */
public class Constants {

// The file extension for block files.
/** Constructor to prevent instantiation. this is only a utility class */
private Constants() {}

/** The file extension for block files. */
public static final String RECORD_EXTENSION = "blk";
// postfix for gzipped files

/** postfix for gzip files */
public static final String GZ_EXTENSION = ".gz";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.hedera.block.simulator.config.data;

import com.hedera.block.simulator.config.types.GenerationMode;
import com.hedera.block.simulator.config.types.StreamingMode;
import com.swirlds.config.api.ConfigData;
import com.swirlds.config.api.ConfigProperty;
import java.nio.file.Files;
Expand All @@ -33,6 +34,8 @@
* @param maxBlockItemsToStream the maximum number of block items to stream
* @param paddedLength the padded length of 0 the block file format
* @param fileExtension the file extension of the block file format
* @param streamingMode the mode of streaming for the block stream
* @param millisecondsPerBlock the milliseconds per block
*/
@ConfigData("blockStream")
public record BlockStreamConfig(
Expand All @@ -43,7 +46,9 @@ public record BlockStreamConfig(
String managerImplementation,
@ConfigProperty(defaultValue = "10_000") int maxBlockItemsToStream,
@ConfigProperty(defaultValue = "36") int paddedLength,
@ConfigProperty(defaultValue = ".blk.gz") String fileExtension) {
@ConfigProperty(defaultValue = ".blk.gz") String fileExtension,
@ConfigProperty(defaultValue = "MILLIS_PER_BLOCK") StreamingMode streamingMode,
@ConfigProperty(defaultValue = "1000") int millisecondsPerBlock) {

/**
* Constructor to set the default root path if not provided, it will be set to the data
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.simulator.config.types;

/** The StreamingMode enum defines the different modes for streaming blocks. */
public enum StreamingMode {

/** It will wait X Nanos between each block. */
CONSTANT_RATE,

/** It will attempt to send a block each X Millis. */
MILLIS_PER_BLOCK;

/**
* Converts a string to a StreamingMode.
*
* @param mode the string to convert
* @return the StreamingMode
*/
public static StreamingMode fromString(String mode) {
return switch (mode) {
case "CONSTANT_RATE" -> CONSTANT_RATE;
case "MILLIS_PER_BLOCK" -> MILLIS_PER_BLOCK;
default -> throw new IllegalArgumentException("Invalid mode: " + mode);
};
}
}
Loading
Loading