Skip to content

Commit

Permalink
feat: create inline pces wiring (#15785)
Browse files Browse the repository at this point in the history
Signed-off-by: Lazar Petrovic <[email protected]>
Co-authored-by: litt <[email protected]>
  • Loading branch information
lpetrovic05 and litt3 authored Oct 8, 2024
1 parent debdc9a commit 14625b6
Show file tree
Hide file tree
Showing 8 changed files with 319 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.swirlds.platform.network.SocketConfig;
import com.swirlds.platform.system.status.PlatformStatusConfig;
import com.swirlds.platform.uptime.UptimeConfig;
import com.swirlds.platform.wiring.ComponentWiringConfig;
import com.swirlds.platform.wiring.PlatformSchedulersConfig;
import com.swirlds.virtualmap.config.VirtualMapConfig;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Down Expand Up @@ -89,7 +90,8 @@ public Set<Class<? extends Record>> getConfigDataTypes() {
UptimeConfig.class,
VirtualMapConfig.class,
WiringConfig.class,
InternalLoggingConfig.class);
InternalLoggingConfig.class,
ComponentWiringConfig.class);
}

@NonNull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.swirlds.platform.event.preconsensus;

import com.swirlds.platform.consensus.EventWindow;
import com.swirlds.platform.event.PlatformEvent;
import edu.umd.cs.findbugs.annotations.NonNull;

/**
* A no-op implementation of {@link InlinePcesWriter} that does nothing, just returns the event it receives.
*/
public class NoOpInlinePcesWriter implements InlinePcesWriter {
@Override
public void beginStreamingNewEvents() {}

@NonNull
@Override
public PlatformEvent writeEvent(@NonNull final PlatformEvent event) {
return event;
}

@Override
public void registerDiscontinuity(@NonNull final Long newOriginRound) {}

@Override
public void updateNonAncientEventBoundary(@NonNull final EventWindow nonAncientBoundary) {}

@Override
public void setMinimumAncientIdentifierToStore(@NonNull final Long minimumAncientIdentifierToStore) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.swirlds.platform.wiring;

import com.swirlds.config.api.ConfigData;
import com.swirlds.config.api.ConfigProperty;

/**
* Configuration related to how platform components are wired together.
*
* @param inlinePces if true, pre-consensus events will be written to disk before being gossipped, this will ensure that
* a node can never lose an event that it has created due to a crash
*/
@ConfigData("platformWiring")
public record ComponentWiringConfig(@ConfigProperty(defaultValue = "false") boolean inlinePces) {}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.swirlds.platform.event.creation.EventCreationManager;
import com.swirlds.platform.event.deduplication.EventDeduplicator;
import com.swirlds.platform.event.orphan.OrphanBuffer;
import com.swirlds.platform.event.preconsensus.InlinePcesWriter;
import com.swirlds.platform.event.preconsensus.durability.RoundDurabilityBuffer;
import com.swirlds.platform.event.stale.StaleEventDetector;
import com.swirlds.platform.event.stale.StaleEventDetectorOutput;
Expand All @@ -43,6 +44,7 @@
import com.swirlds.platform.wiring.components.GossipWiring;
import com.swirlds.platform.wiring.components.StateAndRound;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -74,6 +76,7 @@ public class PlatformCoordinator {
private final ComponentWiring<StatusStateMachine, PlatformStatus> statusStateMachineWiring;
private final ComponentWiring<BranchDetector, PlatformEvent> branchDetectorWiring;
private final ComponentWiring<BranchReporter, Void> branchReporterWiring;
private final ComponentWiring<InlinePcesWriter, PlatformEvent> pcesInlineWriterWiring;

/**
* Constructor
Expand All @@ -96,6 +99,7 @@ public class PlatformCoordinator {
* @param statusStateMachineWiring the status state machine wiring
* @param branchDetectorWiring the branch detector wiring
* @param branchReporterWiring the branch reporter wiring
* @param pcesInlineWriterWiring the inline PCES writer wiring
*/
public PlatformCoordinator(
@NonNull final Runnable flushTheEventHasher,
Expand All @@ -111,15 +115,16 @@ public PlatformCoordinator(
final ComponentWiring<StateSignatureCollector, List<ReservedSignedState>>
stateSignatureCollectorWiring,
@NonNull final ComponentWiring<TransactionHandler, StateAndRound> transactionHandlerWiring,
@NonNull final ComponentWiring<RoundDurabilityBuffer, List<ConsensusRound>> roundDurabilityBufferWiring,
@Nullable final ComponentWiring<RoundDurabilityBuffer, List<ConsensusRound>> roundDurabilityBufferWiring,
@NonNull final ComponentWiring<StateHasher, StateAndRound> stateHasherWiring,
@NonNull
final ComponentWiring<StaleEventDetector, List<RoutableData<StaleEventDetectorOutput>>>
staleEventDetectorWiring,
@NonNull final ComponentWiring<TransactionPool, Void> transactionPoolWiring,
@NonNull final ComponentWiring<StatusStateMachine, PlatformStatus> statusStateMachineWiring,
@NonNull final ComponentWiring<BranchDetector, PlatformEvent> branchDetectorWiring,
@NonNull final ComponentWiring<BranchReporter, Void> branchReporterWiring) {
@NonNull final ComponentWiring<BranchReporter, Void> branchReporterWiring,
@Nullable final ComponentWiring<InlinePcesWriter, PlatformEvent> pcesInlineWriterWiring) {

this.flushTheEventHasher = Objects.requireNonNull(flushTheEventHasher);
this.internalEventValidatorWiring = Objects.requireNonNull(internalEventValidatorWiring);
Expand All @@ -132,13 +137,14 @@ public PlatformCoordinator(
this.applicationTransactionPrehandlerWiring = Objects.requireNonNull(applicationTransactionPrehandlerWiring);
this.stateSignatureCollectorWiring = Objects.requireNonNull(stateSignatureCollectorWiring);
this.transactionHandlerWiring = Objects.requireNonNull(transactionHandlerWiring);
this.roundDurabilityBufferWiring = Objects.requireNonNull(roundDurabilityBufferWiring);
this.roundDurabilityBufferWiring = roundDurabilityBufferWiring;
this.stateHasherWiring = Objects.requireNonNull(stateHasherWiring);
this.staleEventDetectorWiring = Objects.requireNonNull(staleEventDetectorWiring);
this.transactionPoolWiring = Objects.requireNonNull(transactionPoolWiring);
this.statusStateMachineWiring = Objects.requireNonNull(statusStateMachineWiring);
this.branchDetectorWiring = Objects.requireNonNull(branchDetectorWiring);
this.branchReporterWiring = Objects.requireNonNull(branchReporterWiring);
this.pcesInlineWriterWiring = pcesInlineWriterWiring;
}

/**
Expand All @@ -157,6 +163,9 @@ public void flushIntakePipeline() {
eventDeduplicatorWiring.flush();
eventSignatureValidatorWiring.flush();
orphanBufferWiring.flush();
if (pcesInlineWriterWiring != null) {
pcesInlineWriterWiring.flush();
}
gossipWiring.flush();
consensusEngineWiring.flush();
applicationTransactionPrehandlerWiring.flush();
Expand Down Expand Up @@ -197,7 +206,9 @@ public void clear() {
flushIntakePipeline();
stateHasherWiring.flush();
stateSignatureCollectorWiring.flush();
roundDurabilityBufferWiring.flush();
if (roundDurabilityBufferWiring != null) {
roundDurabilityBufferWiring.flush();
}
transactionHandlerWiring.flush();
staleEventDetectorWiring.flush();
branchDetectorWiring.flush();
Expand All @@ -219,7 +230,11 @@ public void clear() {
.getInputWire(StateSignatureCollector::clear)
.inject(NoInput.getInstance());
eventCreationManagerWiring.getInputWire(EventCreationManager::clear).inject(NoInput.getInstance());
roundDurabilityBufferWiring.getInputWire(RoundDurabilityBuffer::clear).inject(NoInput.getInstance());
if (roundDurabilityBufferWiring != null) {
roundDurabilityBufferWiring
.getInputWire(RoundDurabilityBuffer::clear)
.inject(NoInput.getInstance());
}
staleEventDetectorWiring.getInputWire(StaleEventDetector::clear).inject(NoInput.getInstance());
transactionPoolWiring.getInputWire(TransactionPool::clear).inject(NoInput.getInstance());
branchDetectorWiring.getInputWire(BranchDetector::clear).inject(NoInput.getInstance());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public record PlatformSchedulersConfig(
TaskSchedulerConfiguration stateSigner,
@ConfigProperty(defaultValue = "SEQUENTIAL_THREAD CAPACITY(500) UNHANDLED_TASK_METRIC")
TaskSchedulerConfiguration pcesWriter,
@ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC BUSY_FRACTION_METRIC")
TaskSchedulerConfiguration pcesInlineWriter,
@ConfigProperty(defaultValue = "DIRECT") TaskSchedulerConfiguration pcesSequencer,
@ConfigProperty(defaultValue = "CONCURRENT CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC")
TaskSchedulerConfiguration applicationTransactionPrehandler,
Expand Down
Loading

0 comments on commit 14625b6

Please sign in to comment.