Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Move out concrete working implementation and add working modes #307

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private BlockStreamSimulator() {}
public static void main(String[] args)
throws IOException, InterruptedException, BlockSimulatorParsingException {

LOGGER.log(INFO, "Starting Block Stream Simulator");
LOGGER.log(INFO, "Starting Block Stream Simulator!");

ConfigurationBuilder configurationBuilder =
ConfigurationBuilder.create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package com.hedera.block.simulator;

import com.hedera.block.simulator.config.data.BlockStreamConfig;
import com.hedera.block.simulator.config.types.StreamingMode;
import com.hedera.block.simulator.config.types.SimulatorMode;
import com.hedera.block.simulator.exception.BlockSimulatorParsingException;
import com.hedera.block.simulator.generator.BlockStreamManager;
import com.hedera.block.simulator.grpc.PublishStreamGrpcClient;
import com.hedera.hapi.block.stream.Block;
import com.hedera.block.simulator.mode.CombinedModeHandler;
import com.hedera.block.simulator.mode.ConsumerModeHandler;
import com.hedera.block.simulator.mode.PublisherModeHandler;
import com.hedera.block.simulator.mode.SimulatorModeHandler;
import com.swirlds.config.api.Configuration;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
Expand All @@ -37,10 +40,7 @@ public class BlockStreamSimulatorApp {
private final BlockStreamManager blockStreamManager;
private final PublishStreamGrpcClient publishStreamGrpcClient;
private final BlockStreamConfig blockStreamConfig;
private final StreamingMode streamingMode;

private final int delayBetweenBlockItems;
private final int millisecondsPerBlock;
private final SimulatorModeHandler simulatorModeHandler;
private final AtomicBoolean isRunning = new AtomicBoolean(false);

/**
Expand All @@ -60,9 +60,15 @@ public BlockStreamSimulatorApp(

blockStreamConfig = configuration.getConfigData(BlockStreamConfig.class);

streamingMode = blockStreamConfig.streamingMode();
millisecondsPerBlock = blockStreamConfig.millisecondsPerBlock();
delayBetweenBlockItems = blockStreamConfig.delayBetweenBlockItems();
SimulatorMode simulatorMode = blockStreamConfig.simulatorMode();
if (simulatorMode == SimulatorMode.PUBLISHER) {
simulatorModeHandler =
new PublisherModeHandler(blockStreamConfig, publishStreamGrpcClient);
} else if (simulatorMode == SimulatorMode.CONSUMER) {
simulatorModeHandler = new ConsumerModeHandler(blockStreamConfig);
} else {
simulatorModeHandler = new CombinedModeHandler(blockStreamConfig);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might leverage a switch statement here like:

switch (simulatorMode) {
            case PUBLISHER:
                // code
                break;
            case CONSUMER:
                // code
                break;
            default:
                // code
                break;
        }

}

/**
Expand All @@ -73,77 +79,13 @@ public BlockStreamSimulatorApp(
* @throws IOException if an I/O error occurs
*/
public void start() throws InterruptedException, BlockSimulatorParsingException, IOException {

LOGGER.log(
System.Logger.Level.INFO,
"Block Stream Simulator started initializing components...");
publishStreamGrpcClient.init();
isRunning.set(true);
LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has started");

if (streamingMode == StreamingMode.MILLIS_PER_BLOCK) {
millisPerBlockStreaming();
} else {
constantRateStreaming();
}

LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped");
}

private void millisPerBlockStreaming()
throws IOException, InterruptedException, BlockSimulatorParsingException {

final long secondsPerBlockNanos = millisecondsPerBlock * 1_000_000L;

Block nextBlock = blockStreamManager.getNextBlock();
while (nextBlock != null) {
long startTime = System.nanoTime();
publishStreamGrpcClient.streamBlock(nextBlock);
long elapsedTime = System.nanoTime() - startTime;
long timeToDelay = secondsPerBlockNanos - elapsedTime;
if (timeToDelay > 0) {
Thread.sleep(timeToDelay / 1_000_000, (int) (timeToDelay % 1_000_000));
} else {
LOGGER.log(
System.Logger.Level.WARNING,
"Block Server is running behind. Streaming took: "
+ (elapsedTime / 1_000_000)
+ "ms - Longer than max expected of: "
+ millisecondsPerBlock
+ " milliseconds");
}
nextBlock = blockStreamManager.getNextBlock();
}
LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped");
}

private void constantRateStreaming()
throws InterruptedException, IOException, BlockSimulatorParsingException {
int delayMSBetweenBlockItems = delayBetweenBlockItems / 1_000_000;
int delayNSBetweenBlockItems = delayBetweenBlockItems % 1_000_000;
boolean streamBlockItem = true;
int blockItemsStreamed = 0;

while (streamBlockItem) {
// get block
Block block = blockStreamManager.getNextBlock();

if (block == null) {
LOGGER.log(
System.Logger.Level.INFO,
"Block Stream Simulator has reached the end of the block items");
break;
}

publishStreamGrpcClient.streamBlock(block);
blockItemsStreamed += block.items().size();

Thread.sleep(delayMSBetweenBlockItems, delayNSBetweenBlockItems);

if (blockItemsStreamed >= blockStreamConfig.maxBlockItemsToStream()) {
LOGGER.log(
System.Logger.Level.INFO,
"Block Stream Simulator has reached the maximum number of block items to"
+ " stream");
streamBlockItem = false;
}
}
simulatorModeHandler.start(this.blockStreamManager);
}

/**
Expand All @@ -155,8 +97,9 @@ public boolean isRunning() {
return isRunning.get();
}

/** Stops the block stream simulator. */
/** Stops the Block Stream Simulator and closes off all grpc channels. */
public void stop() {
publishStreamGrpcClient.shutdown();
isRunning.set(false);
LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package com.hedera.block.simulator.config.data;

import com.hedera.block.simulator.config.types.SimulatorMode;
import com.hedera.block.simulator.config.types.StreamingMode;
import com.swirlds.config.api.ConfigData;
import com.swirlds.config.api.ConfigProperty;

/**
* Defines the configuration data for the block stream in the Hedera Block Simulator.
*
* @param simulatorMode the mode of the simulator, in terms of publishing, consuming or both
* @param delayBetweenBlockItems the delay in microseconds between streaming each block item
* @param maxBlockItemsToStream the maximum number of block items to stream before stopping
* @param streamingMode the mode of streaming for the block stream (e.g., time-based, count-based)
Expand All @@ -31,6 +33,7 @@
*/
@ConfigData("blockStream")
public record BlockStreamConfig(
@ConfigProperty(defaultValue = "PUBLISHER") SimulatorMode simulatorMode,
@ConfigProperty(defaultValue = "1_500_000") int delayBetweenBlockItems,
@ConfigProperty(defaultValue = "100_000") int maxBlockItemsToStream,
@ConfigProperty(defaultValue = "MILLIS_PER_BLOCK") StreamingMode streamingMode,
Expand All @@ -50,6 +53,7 @@ public static Builder builder() {
* A builder for creating instances of {@link BlockStreamConfig}.
*/
public static class Builder {
private SimulatorMode simulatorMode = SimulatorMode.PUBLISHER;
private int delayBetweenBlockItems = 1_500_000;
private int maxBlockItemsToStream = 10_000;
private StreamingMode streamingMode = StreamingMode.MILLIS_PER_BLOCK;
Expand All @@ -63,6 +67,17 @@ public Builder() {
// Default constructor
}

/**
* Sets the simulator mode for the block stream.
*
* @param simulatorMode the {@link SimulatorMode} to use
* @return this {@code Builder} instance
*/
public Builder simulatorMode(SimulatorMode simulatorMode) {
this.simulatorMode = simulatorMode;
return this;
}

/**
* Sets the delay between streaming each block item.
*
Expand Down Expand Up @@ -125,6 +140,7 @@ public Builder blockItemsBatchSize(int blockItemsBatchSize) {
*/
public BlockStreamConfig build() {
return new BlockStreamConfig(
simulatorMode,
delayBetweenBlockItems,
maxBlockItemsToStream,
streamingMode,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.simulator.config.types;

/** The SimulatorMode enum defines the work modes of the block stream simulator. */
public enum SimulatorMode {
/**
* Indicates a work mode in which the simulator is working as both consumer and publisher.
*/
BOTH,
/**
* Indicates a work mode in which the simulator is working in consumer mode.
*/
CONSUMER,
/**
* Indicates a work mode in which the simulator is working in publisher mode.
*/
PUBLISHER
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
* The PublishStreamGrpcClient interface provides the methods to stream the block and block item.
*/
public interface PublishStreamGrpcClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the new init() and shutdown() methods 👍

/**
* Initialize, opens a gRPC channel and creates the needed stubs with the passed configuration.
*/
void init();

/**
* Streams the block item.
*
Expand All @@ -39,4 +44,9 @@ public interface PublishStreamGrpcClient {
* @return true if the block is streamed successfully, false otherwise
*/
boolean streamBlock(Block block);

/**
* Shutdowns the channel.
*/
void shutdown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
*/
public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient {

private final BlockStreamServiceGrpc.BlockStreamServiceStub stub;
private final StreamObserver<PublishStreamRequest> requestStreamObserver;
private BlockStreamServiceGrpc.BlockStreamServiceStub stub;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can move this declaration of stub to the init() method (doesn't need to be a member variable).

private StreamObserver<PublishStreamRequest> requestStreamObserver;
private final BlockStreamConfig blockStreamConfig;
private final GrpcConfig grpcConfig;
private ManagedChannel channel;

/**
* Creates a new PublishStreamGrpcClientImpl instance.
Expand All @@ -50,14 +52,22 @@ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient {
@Inject
public PublishStreamGrpcClientImpl(
@NonNull GrpcConfig grpcConfig, @NonNull BlockStreamConfig blockStreamConfig) {
ManagedChannel channel =
this.grpcConfig = grpcConfig;
this.blockStreamConfig = blockStreamConfig;
}

/**
* Initialize the channel and stub for publishBlockStream with the desired configuration.
*/
@Override
public void init() {
channel =
ManagedChannelBuilder.forAddress(grpcConfig.serverAddress(), grpcConfig.port())
.usePlaintext()
.build();
stub = BlockStreamServiceGrpc.newStub(channel);
PublishStreamObserver publishStreamObserver = new PublishStreamObserver();
requestStreamObserver = stub.publishBlockStream(publishStreamObserver);
this.blockStreamConfig = blockStreamConfig;
}

/**
Expand Down Expand Up @@ -99,4 +109,9 @@ public boolean streamBlock(Block block) {

return true;
}

@Override
public void shutdown() {
channel.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.simulator.mode;

import static java.util.Objects.requireNonNull;

import com.hedera.block.simulator.config.data.BlockStreamConfig;
import com.hedera.block.simulator.generator.BlockStreamManager;
import edu.umd.cs.findbugs.annotations.NonNull;

/**
* The {@code CombinedModeHandler} class implements the {@link SimulatorModeHandler} interface
* and provides the behavior for a mode where both consuming and publishing of block data
* occur simultaneously.
*
* <p>This mode handles dual operations in the block streaming process, utilizing the
* {@link BlockStreamConfig} for configuration settings. It is designed for scenarios where
* the simulator needs to handle both the consumption and publication of blocks in parallel.
*
* <p>For now, the actual start behavior is not implemented, as indicated by the
* {@link UnsupportedOperationException}.
*/
public class CombinedModeHandler implements SimulatorModeHandler {
private final BlockStreamConfig blockStreamConfig;

/**
* Constructs a new {@code CombinedModeHandler} with the specified block stream configuration.
*
* @param blockStreamConfig the configuration data for managing block streams
*/
public CombinedModeHandler(@NonNull final BlockStreamConfig blockStreamConfig) {
requireNonNull(blockStreamConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does @NonNull in the parameter section give you the same safeguards as requireNonNull()? You might be able to remove that line ^^

this.blockStreamConfig = blockStreamConfig;
}

/**
* Starts the simulator in combined mode, handling both consumption and publication
* of block stream. However, this method is currently not implemented, and will throw
* an {@link UnsupportedOperationException}.
*
* @param blockStreamManager the {@link BlockStreamManager} responsible for managing block streams
* @throws UnsupportedOperationException as the method is not yet implemented
*/
@Override
public void start(@NonNull BlockStreamManager blockStreamManager) {
throw new UnsupportedOperationException();
}
}
Loading
Loading