Skip to content

Commit

Permalink
metrics server modify
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkAfCod committed Aug 30, 2023
1 parent 801bfb4 commit 0051972
Show file tree
Hide file tree
Showing 34 changed files with 1,702 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.optimism.batcher.publisher.ChannelDataPublisher;
import io.optimism.batcher.publisher.PublisherConfig;
import io.optimism.type.BlockId;
import io.optimism.type.L1BlockRef;
import io.optimism.utilities.derive.stages.Frame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -52,8 +51,6 @@ public class BatcherSubmitter extends AbstractExecutionThreadService {

private volatile boolean isShutdownTriggered = false;

private L1BlockRef lastL1Tip;

/**
* Constructor of BatcherSubmitter.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
* specific language governing permissions and limitations under the License.
*/

package io.optimism;
package io.optimism.batcher;

import io.optimism.batcher.cli.Cli;
import picocli.CommandLine;

/**
* Batcher main method.
Expand All @@ -33,9 +36,7 @@ public HildrBatcher() {}
* @param args Starts arguments
*/
public static void main(String[] args) {
// todo start batcherSubmitter
// todo start server
// todo start metrics server
// todo listen close signal
int exitCode = new CommandLine(new Cli()).execute(args);
System.exit(exitCode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.optimism.batcher.channel;

import io.optimism.batcher.config.Config;
import io.optimism.batcher.telemetry.BatcherMetrics;

/**
* ChannelConfig class.
Expand All @@ -27,6 +28,7 @@
* @param maxFrameSize The maximum byte-size a frame can have.
* @param seqWindowSize The maximum byte-size a frame can have.
* @param subSafetyMargin The maximum byte-size a frame can have.
* @param metrics Batcher metrics
* @author thinkAfCod
* @since 0.1.1
*/
Expand All @@ -35,7 +37,8 @@ public record ChannelConfig(
long maxChannelDuration,
int maxFrameSize,
long seqWindowSize,
long subSafetyMargin) {
long subSafetyMargin,
BatcherMetrics metrics) {

/**
* Create a ChannelConfig instance from Config instance.
Expand All @@ -44,6 +47,6 @@ public record ChannelConfig(
* @return ChannelConfig instance
*/
public static ChannelConfig from(Config config) {
return new ChannelConfig(30000, 0, 120_000, 3600, 10);
return new ChannelConfig(30000, 0, 120_000, 3600, 10, config.metrics());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.optimism.batcher.compressor.Compressor;
import io.optimism.batcher.exception.UnsupportedException;
import io.optimism.batcher.telemetry.BatcherMetrics;
import io.optimism.type.BlockId;
import io.optimism.type.L1BlockInfo;
import io.optimism.utilities.derive.stages.Batch;
Expand Down Expand Up @@ -58,25 +59,27 @@ public class ChannelImpl implements Channel {

private final ChannelConfig chConfig;

private final BatcherMetrics metrics;

private final BigInteger seqWindowTimeout;

private final BigInteger id;

private AtomicInteger frameNumber;
private final AtomicInteger frameNumber;

private List<Frame> outputFrames;
private final List<Frame> outputFrames;

private Map<String, Frame> pendingTxs;
private final Map<String, Frame> pendingTxs;

private Map<String, BlockId> confirmedTxs;
private final Map<String, BlockId> confirmedTxs;

private List<EthBlock.Block> blocks;
private final List<EthBlock.Block> blocks;

private BigInteger timeoutBlock;

private Compressor compressor;
private final Compressor compressor;

private AtomicInteger rlpLength;
private final AtomicInteger rlpLength;

private volatile boolean isFull;

Expand All @@ -90,6 +93,7 @@ public class ChannelImpl implements Channel {
*/
public ChannelImpl(ChannelConfig chConfig, Compressor compressor) {
this.chConfig = chConfig;
this.metrics = chConfig.metrics();
this.seqWindowTimeout =
BigInteger.valueOf(chConfig.seqWindowSize() - chConfig.subSafetyMargin());
this.compressor = compressor;
Expand Down Expand Up @@ -175,7 +179,7 @@ public boolean hasFrame() {

@Override
public void txFailed(Frame tx) {
// todo metrics record batch tx failed.
this.metrics.recordBatchTxFailed();
var code = tx.code();
if (!this.pendingTxs.containsKey(code)) {
LOGGER.warn(
Expand All @@ -190,7 +194,7 @@ public void txFailed(Frame tx) {

@Override
public List<EthBlock.Block> txConfirmed(Frame tx, BlockId inclusionBlock) {
// todo metrics RecordBatchTxSubmitted
this.metrics.recordBatchTxSubmitted();
LOGGER.debug(
"marked tx as confirmed: chId: {}; frameNum: {}; block: {}",
tx.channelId(),
Expand All @@ -214,12 +218,12 @@ public List<EthBlock.Block> txConfirmed(Frame tx, BlockId inclusionBlock) {
.subtract(BigInteger.valueOf(chConfig.subSafetyMargin()));
this.updateTimeout(timeout);
if (this.isTimeout()) {
// todo metrics recordChannelTimeout
this.metrics.recordChannelTimedOut(tx);
LOGGER.warn("Channel timeout: chId:{}", tx.channelId());
return this.blocks;
}
if (this.isFullySubmitted()) {
// todo metrics RecordChannelFullySubmitted
this.metrics.recordChannelFullySubmitted(tx);
LOGGER.info("Channel is fully submitted: chId:{}", tx.channelId());
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.optimism.batcher.compressor.CompressorConfig;
import io.optimism.batcher.compressor.Compressors;
import io.optimism.batcher.telemetry.BatcherMetrics;
import io.optimism.type.BlockId;
import io.optimism.type.L1BlockInfo;
import io.optimism.type.L2BlockRef;
Expand Down Expand Up @@ -45,6 +46,8 @@ public class ChannelManager {

private final ChannelConfig chConfig;

private final BatcherMetrics metrics;

private final CompressorConfig compressorConfig;

private List<EthBlock.Block> blocks;
Expand All @@ -67,6 +70,7 @@ public class ChannelManager {
*/
public ChannelManager(final ChannelConfig chConfig, final CompressorConfig compressorConfig) {
this.chConfig = chConfig;
this.metrics = chConfig.metrics();
this.compressorConfig = compressorConfig;
this.blocks = new ArrayList<>(256);
this.channels = new ArrayList<>(256);
Expand All @@ -85,9 +89,9 @@ public void addL2Block(EthBlock.Block block) {
if (!StringUtils.isEmpty(latestBlockHash) && !latestBlockHash.equals(block.getParentHash())) {
throw new ReorgException("block does not extend existing chain");
}
// todo metrics pending block
this.blocks.add(block);
this.latestBlockHash = block.getHash();
this.metrics.recordL2BlockInPendingQueue(block);
}

/**
Expand Down Expand Up @@ -155,7 +159,7 @@ public void txFailed(final Frame tx) {
* @param inclusionBlock inclusion block id
*/
public void txConfirmed(final Frame tx, final BlockId inclusionBlock) {
// todo metrics RecordBatchTxSubmitted
this.metrics.recordBatchTxSubmitted();
LOGGER.debug(
"marked transaction as confirmed: chId: {}; frameNum: {};block: {}",
tx.channelId(),
Expand Down Expand Up @@ -222,21 +226,21 @@ private Channel openChannel(final BlockId l1Head) {
Channel ch = new ChannelImpl(this.chConfig, Compressors.create(this.compressorConfig));
LOGGER.info(
"Created a channel: id:{}, l1Head: {}, blocksPending:{}", ch, l1Head, this.blocks.size());
// todo metrics record opened channel
this.metrics.recordChannelOpened(null, this.blocks.size());
return ch;
}

private void pushBlocks(final Channel lastChannel) {
int blocksAdded = 0;
L2BlockRef unused = null;
L2BlockRef l2Ref = null;
try {
for (final EthBlock.Block block : this.blocks) {
final L1BlockInfo l1Info = lastChannel.addBlock(block);
unused = L2BlockRef.fromBlockAndL1Info(block, l1Info);
// todo metrics recordL2BlockInChannel
l2Ref = L2BlockRef.fromBlockAndL1Info(block, l1Info);
if (latestChannel.isFull()) {
break;
}
this.metrics.recordL2BlockInChannel(block);
blocksAdded += 1;
}
} catch (ChannelException e) {
Expand All @@ -252,7 +256,12 @@ private void pushBlocks(final Channel lastChannel) {
this.blocks = this.blocks.stream().skip(blocksAdded).collect(Collectors.toList());
}

// todo metrics RecordL2BlocksAdded
this.metrics.recordL2BlocksAdded(
l2Ref,
blocksAdded,
this.blocks.size(),
this.latestChannel.inputBytesLength(),
this.latestChannel.readyBytesLength());

LOGGER.debug(
"Added blocks to channel:"
Expand Down
140 changes: 140 additions & 0 deletions hildr-batcher/src/main/java/io/optimism/batcher/cli/Cli.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2023 [email protected]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.optimism.batcher.cli;

import io.micrometer.tracing.Tracer;
import io.optimism.batcher.BatcherSubmitter;
import io.optimism.batcher.config.Config;
import io.optimism.batcher.exception.BatcherExecutionException;
import io.optimism.batcher.telemetry.BatcherMetricsServer;
import io.optimism.batcher.telemetry.BatcherPrometheusMetrics;
import io.optimism.utilities.telemetry.Logging;
import io.optimism.utilities.telemetry.TracerTaskWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import sun.misc.Signal;

/**
* CLI handler.
*
* @author thinkAfCod
* @since 2023.05
*/
@Command(name = "hildr", mixinStandardHelpOptions = true, description = "")
public class Cli implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(Cli.class);

@Option(names = "--l1-rpc-url", required = true, description = "The base chain RPC URL")
String l1RpcUrl;

@Option(names = "--l2-rpc-url", required = true, description = "The L2 engine RPC URL")
String l2RpcUrl;

@Option(names = "--rollup-rpc-url", required = true, description = "The rollup node RPC URL")
String rollupRpcUrl;

@Option(names = "--l1-signer", required = true, description = "The base chain private key")
String l1Signer;

@Option(
names = "--batch-inbox-address",
required = true,
description = "The address of batch inbox contract")
String batchInboxAddress;

@Option(names = "--sub-safety-margin", required = true, description = "")
Long subSafetyMargin;

@Option(names = "--pull-interval", required = true, description = "")
Long pollInterval;

@Option(names = "--max-l1-tx-size", required = true, description = "")
Long maxL1TxSize;

@Option(names = "--target-frame-size", required = true, description = "")
Integer targetFrameSize;

@Option(names = "--target-num-frames", required = true, description = "")
Integer targetNumFrames;

@Option(names = "--approx-compr-ratio", required = true, description = "")
String approxComprRatio;

@Option(
names = "--enable-metrics",
description = "If not contains this option, will not open metrics server")
boolean enableMetrics;

@Option(
names = "--metrics-port",
defaultValue = "9200",
required = true,
description = "The port of metrics server ")
Integer metricsPort;

/** the Cli constructor. */
public Cli() {}

@Override
public void run() {
TracerTaskWrapper.setTracerSupplier(Logging.INSTANCE::getTracer);

// listen close signal
Signal.handle(new Signal("INT"), sig -> System.exit(0));
Signal.handle(new Signal("TERM"), sig -> System.exit(0));

Tracer tracer = Logging.INSTANCE.getTracer("hildr-batcher-cli");
var span = tracer.nextSpan().name("batcher-submitter").start();
try (var unused = tracer.withSpan(span)) {
// start metrics server
if (this.enableMetrics) {
BatcherMetricsServer.start(this.metricsPort);
}
// start batcher submitter
BatcherSubmitter submitter = new BatcherSubmitter(this.optionToConfig());
submitter.startAsync().awaitTerminated();
} catch (Exception e) {
LOGGER.error("hildr batcher: ", e);
throw new BatcherExecutionException(e);
} finally {
if (this.enableMetrics) {
LOGGER.info("stop metrics");
BatcherMetricsServer.stop();
}
span.end();
}
}

private Config optionToConfig() {
return new Config(
this.l1RpcUrl,
this.l2RpcUrl,
this.rollupRpcUrl,
this.l1Signer,
this.batchInboxAddress,
this.subSafetyMargin,
this.pollInterval,
this.maxL1TxSize,
this.targetFrameSize,
this.targetNumFrames,
this.approxComprRatio,
new BatcherPrometheusMetrics(BatcherMetricsServer.getRegistry(), "hildr_batcher"));
}
}
Loading

0 comments on commit 0051972

Please sign in to comment.