Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Arch changes to address performance bottlenecks #171

Merged
merged 63 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
ae74a4b
fix: initial commit
mattp-swirldslabs Sep 4, 2024
97608c9
wip: dep cycle - broken
mattp-swirldslabs Sep 5, 2024
43f56fc
wip: added new interface, added builders
mattp-swirldslabs Sep 5, 2024
86cd8b8
fix: tests compiling
mattp-swirldslabs Sep 5, 2024
e45b83e
fix: moved the BlocksPersisted metric
mattp-swirldslabs Sep 5, 2024
8cd088b
feat: responses and dashboards working
mattp-swirldslabs Sep 5, 2024
7c4e857
fix: Fixed consumer gauge name
mattp-swirldslabs Sep 5, 2024
64602c4
wip: trying to set up unsubscribe operations
mattp-swirldslabs Sep 5, 2024
43de7bf
fix: producer timeouts working
mattp-swirldslabs Sep 5, 2024
e6d7ae5
fix: fixed tests, made the mediator ringBufferSize configurable disab…
mattp-swirldslabs Sep 6, 2024
b133c3e
fix: created a common subscription base class to be used by the media…
mattp-swirldslabs Sep 6, 2024
20f786e
fix: changed T to V for the generics in the base to align with the in…
mattp-swirldslabs Sep 6, 2024
57059d7
fix: added interface and check for expiry to remove stale subscribers
mattp-swirldslabs Sep 6, 2024
178f3b5
fix: passing config params from the concrete impls to the base for th…
mattp-swirldslabs Sep 9, 2024
dcdbcc3
fix: added first cut at exception handling path. repaired tests
mattp-swirldslabs Sep 9, 2024
9af0e7f
fix: removed tests that no longer apply. will move that coverage to I…
mattp-swirldslabs Sep 9, 2024
c93379a
fix: added config tests
mattp-swirldslabs Sep 9, 2024
70c9ad7
fix: added Notifier test coverage
mattp-swirldslabs Sep 9, 2024
101a563
wip: added another test
mattp-swirldslabs Sep 10, 2024
9e886e6
wip: properties file testing
mattp-swirldslabs Sep 10, 2024
df49217
wip: adding servicestate
mattp-swirldslabs Sep 10, 2024
d79ff1f
refactor: created LivenessCalculator and moved classes to events
mattp-swirldslabs Sep 10, 2024
5e8f4c0
clean up
mattp-swirldslabs Sep 10, 2024
ebf7f79
fix: enable streamvalidator to trigger serverstatus
mattp-swirldslabs Sep 10, 2024
f93dd97
fix: added additional call to test
mattp-swirldslabs Sep 10, 2024
2b8f2e8
fix: added test coverage
mattp-swirldslabs Sep 10, 2024
c21e8f2
fix: added test coverage
mattp-swirldslabs Sep 10, 2024
46e7e87
fix: added test coverage
mattp-swirldslabs Sep 10, 2024
c8753a8
fix: added test coverage
mattp-swirldslabs Sep 10, 2024
fc7c7c9
fix: moved ServiceStatus to a new package, adjusted one of the method…
mattp-swirldslabs Sep 11, 2024
e8cbb16
fix: added ServiceConfig to allow ServiceStatus to receive config pro…
mattp-swirldslabs Sep 11, 2024
77ccaac
fix: added test coverage for ServiceConfig
mattp-swirldslabs Sep 11, 2024
28991e0
wip: expanding tests
mattp-swirldslabs Sep 11, 2024
5791cb4
wip: bigger test almost working
mattp-swirldslabs Sep 11, 2024
7dd2b89
fix: enabled additional parts of a large test
mattp-swirldslabs Sep 11, 2024
33100ad
fix: spotless clean up
mattp-swirldslabs Sep 11, 2024
f8a5049
fix: enabled another test
mattp-swirldslabs Sep 11, 2024
7f62368
fix: refactor package from validator to verifier
mattp-swirldslabs Sep 12, 2024
a6974ee
Added metric for the StreamVerifier
mattp-swirldslabs Sep 12, 2024
598a665
fix: refactored BlockStreamService to remove Notifiable interface and…
mattp-swirldslabs Sep 12, 2024
4c4b75a
fix: making one of the tests more deterministic
mattp-swirldslabs Sep 12, 2024
2dba315
fix: added tests
mattp-swirldslabs Sep 12, 2024
8901af4
fix: added test coverage for StreamVerifier
mattp-swirldslabs Sep 12, 2024
57b25a4
fix: reduced access to builder only. javadoc.
mattp-swirldslabs Sep 12, 2024
01592e4
wip: docs
mattp-swirldslabs Sep 12, 2024
e8af235
fix: refactor StreamVerifierBuilder to make notifier mandatory
mattp-swirldslabs Sep 16, 2024
51d7692
fix: added test
mattp-swirldslabs Sep 16, 2024
0c6ca56
fix: removed stream verifier builder
mattp-swirldslabs Sep 16, 2024
156249a
fix: rename
mattp-swirldslabs Sep 16, 2024
7561e21
fix: refactor to the persistence package
mattp-swirldslabs Sep 16, 2024
54269b4
fix: adjust test properties
mattp-swirldslabs Sep 16, 2024
6573840
fix: added javadoc
mattp-swirldslabs Sep 16, 2024
2d9a194
fix: another attempt at providing app.properties for testing
mattp-swirldslabs Sep 16, 2024
dbe0560
fix: attempting to fix the tests with config
mattp-swirldslabs Sep 16, 2024
f60e588
fix: fixed metrics
mattp-swirldslabs Sep 17, 2024
2e64882
fix: rollback previous fix. changed the default to work with the test…
mattp-swirldslabs Sep 17, 2024
1af6030
fix: using LOGGER to print out the config for all config records
mattp-swirldslabs Sep 17, 2024
022cbae
fix: added power of 2 checks and tests
mattp-swirldslabs Sep 17, 2024
099b213
fix: added subscription test
mattp-swirldslabs Sep 17, 2024
1b3ad49
fix: javadoc
mattp-swirldslabs Sep 17, 2024
d7396b3
fix: removed NotifierBuilder and set up dagger to use NotifierImpl
mattp-swirldslabs Sep 18, 2024
d437561
fix: Adjusted the mediator dagger interface to correctly return singl…
mattp-swirldslabs Sep 19, 2024
33eb36b
fix: removed top-level app.properties and added production values for…
mattp-swirldslabs Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
684 changes: 626 additions & 58 deletions server/docker/metrics/dashboards/block-node-server.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.lang.System.Logger.Level.INFO;

import com.hedera.block.server.health.HealthService;
import com.hedera.block.server.service.ServiceStatus;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.WebServerConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import com.hedera.block.server.health.HealthInjectionModule;
import com.hedera.block.server.mediator.MediatorInjectionModule;
import com.hedera.block.server.metrics.MetricsInjectionModule;
import com.hedera.block.server.notifier.NotifierInjectionModule;
import com.hedera.block.server.persistence.PersistenceInjectionModule;
import com.hedera.block.server.service.ServiceInjectionModule;
import com.swirlds.config.api.Configuration;
import dagger.BindsInstance;
import dagger.Component;
Expand All @@ -30,6 +32,8 @@
@Singleton
@Component(
modules = {
NotifierInjectionModule.class,
ServiceInjectionModule.class,
BlockNodeAppInjectionModule.class,
HealthInjectionModule.class,
PersistenceInjectionModule.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,6 @@
@Module
public interface BlockNodeAppInjectionModule {

/**
* Binds the service status to the service status implementation.
*
* @param serviceStatus needs a service status implementation
* @return the service status implementation
*/
@Singleton
@Binds
ServiceStatus bindServiceStatus(ServiceStatusImpl serviceStatus);

/**
* Provides a block node context singleton.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,21 @@
import com.google.protobuf.InvalidProtocolBufferException;
import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.consumer.ConsumerStreamResponseObserver;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.events.BlockNodeEventHandler;
import com.hedera.block.server.events.ObjectEvent;
import com.hedera.block.server.mediator.LiveStreamMediator;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.block.server.notifier.Notifier;
import com.hedera.block.server.persistence.storage.read.BlockReader;
import com.hedera.block.server.producer.ProducerBlockItemObserver;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.hapi.block.SingleBlockRequest;
import com.hedera.hapi.block.SingleBlockResponse;
import com.hedera.hapi.block.SingleBlockResponseCode;
import com.hedera.hapi.block.SubscribeStreamResponse;
import com.hedera.hapi.block.SubscribeStreamResponseCode;
import com.hedera.hapi.block.protoc.BlockService;
import com.hedera.hapi.block.stream.Block;
import com.hedera.hapi.block.stream.BlockItem;
import com.hedera.pbj.runtime.ParseException;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.stub.StreamObserver;
Expand All @@ -63,12 +65,15 @@ public class BlockStreamService implements GrpcService {

private final Logger LOGGER = System.getLogger(getClass().getName());

private final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;
private final LiveStreamMediator streamMediator;
private final ServiceStatus serviceStatus;
private final BlockReader<Block> blockReader;

private final BlockNodeContext blockNodeContext;
private final MetricsService metricsService;

private final Notifier notifier;

/**
* Constructor for the BlockStreamService class. It initializes the BlockStreamService with the
* given parameters.
Expand All @@ -82,17 +87,22 @@ public class BlockStreamService implements GrpcService {
*/
@Inject
BlockStreamService(
@NonNull
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>>
streamMediator,
@NonNull final LiveStreamMediator streamMediator,
@NonNull final BlockReader<Block> blockReader,
@NonNull final ServiceStatus serviceStatus,
@NonNull
final BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponse>>
streamPersistenceHandler,
@NonNull final Notifier notifier,
@NonNull final BlockNodeContext blockNodeContext) {
this.streamMediator = streamMediator;
this.blockReader = blockReader;
this.serviceStatus = serviceStatus;
this.notifier = notifier;
this.blockNodeContext = blockNodeContext;
this.metricsService = blockNodeContext.metricsService();

streamMediator.subscribe(streamPersistenceHandler);
this.streamMediator = streamMediator;
}

/**
Expand Down Expand Up @@ -139,8 +149,23 @@ StreamObserver<com.hedera.hapi.block.protoc.PublishStreamRequest> protocPublishB
publishStreamResponseObserver) {
LOGGER.log(DEBUG, "Executing bidirectional publishBlockStream gRPC method");

return new ProducerBlockItemObserver(
streamMediator, publishStreamResponseObserver, blockNodeContext, serviceStatus);
// Unsubscribe any expired notifiers
notifier.unsubscribeAllExpired();

final var producerBlockItemObserver =
new ProducerBlockItemObserver(
Clock.systemDefaultZone(),
streamMediator,
notifier,
publishStreamResponseObserver,
blockNodeContext,
serviceStatus);

// Register the producer observer with the notifier to publish responses back to the
// producer
notifier.subscribe(producerBlockItemObserver);

return producerBlockItemObserver;
}

void protocSubscribeBlockStream(
Expand All @@ -152,16 +177,18 @@ void protocSubscribeBlockStream(
subscribeStreamResponseObserver) {
LOGGER.log(DEBUG, "Executing Server Streaming subscribeBlockStream gRPC method");

// Return a custom StreamObserver to handle streaming blocks from the producer.
if (serviceStatus.isRunning()) {
final var streamObserver =
// Unsubscribe any expired notifiers
streamMediator.unsubscribeAllExpired();

final var consumerStreamResponseObserver =
new ConsumerStreamResponseObserver(
blockNodeContext,
Clock.systemDefaultZone(),
streamMediator,
subscribeStreamResponseObserver);
subscribeStreamResponseObserver,
blockNodeContext);

streamMediator.subscribe(streamObserver);
streamMediator.subscribe(consumerStreamResponseObserver);
} else {
LOGGER.log(
ERROR,
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static void main(final String[] args) throws IOException {
Config.global(config);

// Init BlockNode Configuration
Configuration configuration =
final Configuration configuration =
ConfigurationBuilder.create()
.withSource(SystemEnvironmentConfigSource.getInstance())
.withSource(SystemPropertiesConfigSource.getInstance())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

import com.google.auto.service.AutoService;
import com.hedera.block.server.consumer.ConsumerConfig;
import com.hedera.block.server.mediator.MediatorConfig;
import com.hedera.block.server.notifier.NotifierConfig;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.hedera.block.server.service.ServiceConfig;
import com.swirlds.common.metrics.config.MetricsConfig;
import com.swirlds.common.metrics.platform.prometheus.PrometheusConfig;
import com.swirlds.config.api.ConfigurationExtension;
Expand All @@ -43,6 +46,9 @@ public BlockNodeConfigExtension() {
@Override
public Set<Class<? extends Record>> getConfigDataTypes() {
return Set.of(
ServiceConfig.class,
MediatorConfig.class,
NotifierConfig.class,
MetricsConfig.class,
PrometheusConfig.class,
ConsumerConfig.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.hedera.block.server.config;

import com.hedera.block.server.consumer.ConsumerConfig;
import com.hedera.block.server.mediator.MediatorConfig;
import com.hedera.block.server.notifier.NotifierConfig;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.swirlds.common.metrics.config.MetricsConfig;
import com.swirlds.common.metrics.platform.prometheus.PrometheusConfig;
Expand Down Expand Up @@ -79,4 +81,28 @@ static PrometheusConfig providePrometheusConfig(Configuration configuration) {
static ConsumerConfig provideConsumerConfig(Configuration configuration) {
return configuration.getConfigData(ConsumerConfig.class);
}

/**
* Provides a mediator configuration singleton using the configuration.
*
* @param configuration is the configuration singleton
* @return a mediator configuration singleton
*/
@Singleton
@Provides
static MediatorConfig provideMediatorConfig(Configuration configuration) {
return configuration.getConfigData(MediatorConfig.class);
}

/**
* Provides a notifier configuration singleton using the configuration.
*
* @param configuration is the configuration singleton
* @return a notifier configuration singleton
*/
@Singleton
@Provides
static NotifierConfig provideNotifierConfig(Configuration configuration) {
return configuration.getConfigData(NotifierConfig.class);
}
AlfredoG87 marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,21 @@
* timed out and will be disconnected
*/
@ConfigData("consumer")
public record ConsumerConfig(@ConfigProperty(defaultValue = "1500") long timeoutThresholdMillis) {}
public record ConsumerConfig(@ConfigProperty(defaultValue = "1500") long timeoutThresholdMillis) {
private static final System.Logger LOGGER = System.getLogger(ConsumerConfig.class.getName());

/**
* Validate the configuration.
*
* @throws IllegalArgumentException if the configuration is invalid
*/
public ConsumerConfig {
if (timeoutThresholdMillis <= 0) {
throw new IllegalArgumentException("Timeout threshold must be greater than 0");
}

LOGGER.log(
System.Logger.Level.INFO,
"Consumer configuration timeoutThresholdMillis: " + timeoutThresholdMillis);
}
}
Loading
Loading