Skip to content

Commit

Permalink
feat: use FileChannel for PCES (#15604)
Browse files Browse the repository at this point in the history
Signed-off-by: Lazar Petrovic <[email protected]>
  • Loading branch information
lpetrovic05 authored Oct 7, 2024
1 parent b1f1e05 commit debdc9a
Show file tree
Hide file tree
Showing 7 changed files with 405 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.swirlds.platform.core.jmh;

import com.swirlds.common.io.utility.FileUtils;
import com.swirlds.common.test.fixtures.Randotron;
import com.swirlds.platform.event.AncientMode;
import com.swirlds.platform.event.PlatformEvent;
import com.swirlds.platform.event.preconsensus.PcesFile;
import com.swirlds.platform.event.preconsensus.PcesMutableFile;
import com.swirlds.platform.test.fixtures.event.TestingEventBuilder;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;

@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 3, time = 10)
public class PcesWriterBenchmark {

@Param({"true", "false"})
public boolean useFileChannelWriter;

@Param({"true", "false"})
public boolean syncEveryEvent;

private PlatformEvent event;
private Path directory;
private PcesMutableFile mutableFile;

@Setup(Level.Iteration)
public void setup() throws IOException {
final Randotron r = Randotron.create(0);

event = new TestingEventBuilder(r)
.setAppTransactionCount(3)
.setSystemTransactionCount(1)
.setSelfParent(new TestingEventBuilder(r).build())
.setOtherParent(new TestingEventBuilder(r).build())
.build();
directory = Files.createTempDirectory("PcesWriterBenchmark");
final PcesFile file = PcesFile.of(AncientMode.GENERATION_THRESHOLD, r.nextInstant(), 1, 0, 100, 0, directory);

mutableFile = file.getMutableFile(useFileChannelWriter, syncEveryEvent);
}

@TearDown(Level.Iteration)
public void cleanup() throws IOException {
mutableFile.close();
FileUtils.deleteDirectory(directory);
}
/*
Results on a M1 Max MacBook Pro:
Benchmark (syncEveryEvent) (useFileChannelWriter) Mode Cnt Score Error Units
PcesWriterBenchmark.writeEvent true true thrpt 3 12440.268 ± 42680.146 ops/s
PcesWriterBenchmark.writeEvent true false thrpt 3 16244.412 ± 38461.148 ops/s
PcesWriterBenchmark.writeEvent false true thrpt 3 411138.079 ± 110692.138 ops/s
PcesWriterBenchmark.writeEvent false false thrpt 3 643582.781 ± 154393.415 ops/s
*/
@Benchmark
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void writeEvent() throws IOException {
mutableFile.writeEvent(event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -392,15 +392,27 @@ public Path getPath() {
return path;
}

/**
* Same as {@link #getMutableFile(boolean, boolean)} with both parameters set to false.
*/
@NonNull
public PcesMutableFile getMutableFile() throws IOException {
return new PcesMutableFile(this, false, false);
}

/**
* Get an object that can be used to write events to this file. Throws if there already exists a file on disk with
* the same path.
*
* @param useFileChannelWriter if true, use a {@link java.nio.channels.FileChannel} to write to the file. Otherwise,
* use a {@link java.io.FileOutputStream}.
* @param syncEveryEvent if true, sync the file after every event is written
* @return a writer for this file
*/
@NonNull
public PcesMutableFile getMutableFile() throws IOException {
return new PcesMutableFile(this);
public PcesMutableFile getMutableFile(final boolean useFileChannelWriter, final boolean syncEveryEvent)
throws IOException {
return new PcesMutableFile(this, useFileChannelWriter, syncEveryEvent);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.swirlds.platform.event.preconsensus;

import com.hedera.hapi.platform.event.GossipEvent;
import com.hedera.pbj.runtime.io.WritableSequentialData;
import com.hedera.pbj.runtime.io.buffer.BufferedData;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

/**
* Writes preconsensus events to a file using a {@link FileChannel}.
*/
public class PcesFileChannelWriter implements PcesFileWriter {
/** The capacity of the ByteBuffer used to write events */
private static final int BUFFER_CAPACITY = 1024 * 1024 * 10;
/** The file channel for writing events */
private final FileChannel channel;
/** The buffer used to hold data being written to the file */
private final ByteBuffer buffer;
/** Wraps a ByteBuffer so that the protobuf codec can write to it */
private final WritableSequentialData writableSequentialData;
/** Tracks the size of the file in bytes */
private int fileSize;

/**
* Create a new writer that writes events to a file using a {@link FileChannel}.
*
* @param filePath the path to the file to write to
* @param syncEveryEvent if true, the file will be synced after every event is written
* @throws IOException if an error occurs while opening the file
*/
public PcesFileChannelWriter(@NonNull final Path filePath, final boolean syncEveryEvent) throws IOException {
if (syncEveryEvent) {
channel = FileChannel.open(
filePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE, StandardOpenOption.DSYNC);
} else {
channel = FileChannel.open(filePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
}
buffer = ByteBuffer.allocateDirect(BUFFER_CAPACITY);
writableSequentialData = BufferedData.wrap(buffer);
}

@Override
public void writeVersion(final int version) throws IOException {
buffer.putInt(version);
flipWriteClear();
}

@Override
public void writeEvent(@NonNull final GossipEvent event) throws IOException {
buffer.putInt(GossipEvent.PROTOBUF.measureRecord(event));
GossipEvent.PROTOBUF.write(event, writableSequentialData);
flipWriteClear();
}

/**
* Writes the data in the buffer to the file. This method expects that the buffer will have data that is written to
* it. The buffer will be flipped so that it can be read from, the data will be written to the file, and the buffer
* will be cleared so that it can be used again.
*/
private void flipWriteClear() throws IOException {
buffer.flip();
final int bytesWritten = channel.write(buffer);
fileSize += bytesWritten;
if (bytesWritten != buffer.limit()) {
throw new IOException(
"Failed to write data to file. Wrote " + bytesWritten + " bytes out of " + buffer.limit());
}
buffer.clear();
}

@Override
public void flush() throws IOException {
// benchmarks show that this has horrible performance
channel.force(false);
}

@Override
public void close() throws IOException {
channel.close();
}

@Override
public long fileSize() {
return fileSize;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.swirlds.platform.event.preconsensus;

import com.hedera.hapi.platform.event.GossipEvent;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;

/**
* Interface for writing events to a file.
*/
public interface PcesFileWriter {

/**
* Write the version of the file format to the file.
*
* @param version the version of the file format
*/
void writeVersion(final int version) throws IOException;

/**
* Write an event to the file.
*
* @param event the event to write
*/
void writeEvent(@NonNull final GossipEvent event) throws IOException;

/**
* Flush the file.
*/
void flush() throws IOException;

/**
* Close the file.
*/
void close() throws IOException;

/**
* @return the size of the file in bytes
*/
long fileSize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,8 @@

package com.swirlds.platform.event.preconsensus;

import com.hedera.hapi.platform.event.GossipEvent;
import com.swirlds.common.io.extendable.ExtendableOutputStream;
import com.swirlds.common.io.extendable.extensions.CountingStreamExtension;
import com.swirlds.common.io.streams.SerializableDataOutputStream;
import com.swirlds.platform.event.PlatformEvent;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
Expand All @@ -38,40 +32,34 @@ public class PcesMutableFile {
*/
private final PcesFile descriptor;

/**
* Counts the bytes written to the file.
*/
private final CountingStreamExtension counter;
private final PcesFileWriter writer;

/**
* The highest ancient indicator of all events written to the file.
*/
private long highestAncientIdentifierInFile;

/**
* The output stream to write to.
*/
private final SerializableDataOutputStream out;

/**
* Create a new preconsensus event file that can be written to.
*
* @param descriptor a description of the file
* @param descriptor a description of the file
* @param useFileChannelWriter whether to use a FileChannel to write to the file as opposed to an OutputStream
* @param syncEveryEvent whether to sync the file after every event
*/
PcesMutableFile(@NonNull final PcesFile descriptor) throws IOException {
PcesMutableFile(
@NonNull final PcesFile descriptor, final boolean useFileChannelWriter, final boolean syncEveryEvent)
throws IOException {
if (Files.exists(descriptor.getPath())) {
throw new IOException("File " + descriptor.getPath() + " already exists");
}

Files.createDirectories(descriptor.getPath().getParent());

this.descriptor = descriptor;
counter = new CountingStreamExtension(false);
out = new SerializableDataOutputStream(new ExtendableOutputStream(
new BufferedOutputStream(
new FileOutputStream(descriptor.getPath().toFile())),
counter));
out.writeInt(PcesFileVersion.currentVersionNumber());
writer = useFileChannelWriter
? new PcesFileChannelWriter(descriptor.getPath(), syncEveryEvent)
: new PcesOutputStreamFileWriter(descriptor.getPath(), syncEveryEvent);
writer.writeVersion(PcesFileVersion.currentVersionNumber());
highestAncientIdentifierInFile = descriptor.getLowerBound();
}

Expand All @@ -95,7 +83,7 @@ public void writeEvent(final PlatformEvent event) throws IOException {
throw new IllegalStateException("Cannot write event " + event.getHash() + " with ancient indicator "
+ event.getAncientIndicator(descriptor.getFileType()) + " to file " + descriptor);
}
out.writePbjRecord(event.getGossipEvent(), GossipEvent.PROTOBUF);
writer.writeEvent(event.getGossipEvent());
highestAncientIdentifierInFile =
Math.max(highestAncientIdentifierInFile, event.getAncientIndicator(descriptor.getFileType()));
}
Expand Down Expand Up @@ -130,14 +118,14 @@ public PcesFile compressSpan(final long upperBoundInPreviousFile) {
* Flush the file.
*/
public void flush() throws IOException {
out.flush();
writer.flush();
}

/**
* Close the file.
*/
public void close() throws IOException {
out.close();
writer.close();
}

/**
Expand All @@ -146,7 +134,7 @@ public void close() throws IOException {
* @return the size of the file in bytes
*/
public long fileSize() {
return counter.getCount();
return writer.fileSize();
}

/**
Expand Down
Loading

0 comments on commit debdc9a

Please sign in to comment.