Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-262: [C++] Support async io prefetch for orc c++ lib #2048

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
26 changes: 26 additions & 0 deletions c++/include/orc/OrcFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#ifndef ORC_FILE_HH
#define ORC_FILE_HH

#include <future>
#include <string>

#include "orc/Reader.hh"
Expand All @@ -36,6 +37,20 @@ namespace orc {
*/
class InputStream {
public:
using Buffer = DataBuffer<char>;
using BufferPtr = std::shared_ptr<Buffer>;

struct BufferSlice {
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
BufferSlice() : buffer(nullptr), offset(0), length(0) {}

BufferSlice(BufferPtr buffer, uint64_t offset, uint64_t length)
: buffer(std::move(buffer)), offset(offset), length(length) {}

BufferPtr buffer;
uint64_t offset;
uint64_t length;
};

virtual ~InputStream();

/**
Expand All @@ -58,6 +73,17 @@ namespace orc {
*/
virtual void read(void* buf, uint64_t length, uint64_t offset) = 0;

/**
* Read data asynchronously.
* @param offset the position in the stream to read from.
* @param length the number of bytes to read.
* @return a future that will be set to the buffer when the read is complete.
*/
virtual std::future<BufferPtr> readAsync(uint64_t /*offset*/, uint64_t /*length*/,
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
MemoryPool& /*pool*/) {
throw NotImplementedYet("readAsync not supported yet");
}

/**
* Get the name of the stream for error messages.
*/
Expand Down
22 changes: 22 additions & 0 deletions c++/include/orc/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ namespace orc {
// classes that hold data members so we can maintain binary compatibility
struct ReaderOptionsPrivate;
struct RowReaderOptionsPrivate;
struct CacheOptions;
class InputStream;

namespace proto {
class Footer;
class Metadata;
}; // namespace proto

/**
* Expose the reader metrics including the latency and
Expand Down Expand Up @@ -605,6 +612,21 @@ namespace orc {
*/
virtual std::map<uint32_t, BloomFilterIndex> getBloomFilters(
uint32_t stripeIndex, const std::set<uint32_t>& included) const = 0;

/**
* Get the input stream for the ORC file.
*/
virtual InputStream* getStream() const = 0;

/**
* Get the schema of the ORC file.
*/
virtual const proto::Metadata* getMetadata() const = 0;

virtual void preBuffer(const std::vector<int>& stripes, const std::list<uint64_t>& includeTypes,
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
const CacheOptions& options) = 0;

virtual void releaseBuffer(uint64_t boundary) = 0;
};

/**
Expand Down
1 change: 1 addition & 0 deletions c++/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ set(SOURCE_FILES
orc_proto.pb.h
io/InputStream.cc
io/OutputStream.cc
io/Cache.cc
sargs/ExpressionTree.cc
sargs/Literal.cc
sargs/PredicateLeaf.cc
Expand Down
79 changes: 76 additions & 3 deletions c++/src/Reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ namespace orc {
buildTypeNameIdMap(contents_->schema.get());
}

RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> contents, const RowReaderOptions& opts)
RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> contents, const RowReaderOptions& opts,
std::shared_ptr<ReadRangeCache> readCache)
: localTimezone_(getLocalTimezone()),
contents_(contents),
throwOnHive11DecimalOverflow_(opts.getThrowOnHive11DecimalOverflow()),
Expand All @@ -255,7 +256,8 @@ namespace orc {
firstRowOfStripe_(*contents_->pool, 0),
enableEncodedBlock_(opts.getEnableLazyDecoding()),
readerTimezone_(getTimezoneByName(opts.getTimezoneName())),
schemaEvolution_(opts.getReadType(), contents_->schema.get()) {
schemaEvolution_(opts.getReadType(), contents_->schema.get()),
readCache_(std::move(readCache)) {
uint64_t numberOfStripes;
numberOfStripes = static_cast<uint64_t>(footer_->stripes_size());
currentStripe_ = numberOfStripes;
Expand Down Expand Up @@ -838,7 +840,7 @@ namespace orc {
// load stripe statistics for PPD
readMetadata();
}
return std::make_unique<RowReaderImpl>(contents_, opts);
return std::make_unique<RowReaderImpl>(contents_, opts, readCache_);
}

uint64_t maxStreamsForType(const proto::Type& type) {
Expand Down Expand Up @@ -1474,6 +1476,77 @@ namespace orc {
return ret;
}

void ReaderImpl::releaseBuffer(uint64_t boundary) {
if (readCache_) {
readCache_->evictEntriesBefore(boundary);
}
}

void ReaderImpl::preBuffer(const std::vector<int>& stripes,
const std::list<uint64_t>& includeTypes, const CacheOptions& options) {
if (stripes.empty() || includeTypes.empty()) {
return;
}

orc::RowReaderOptions row_reader_options;
row_reader_options.includeTypes(includeTypes);
ColumnSelector column_selector(contents_.get());
std::vector<bool> selected_columns;
column_selector.updateSelected(selected_columns, row_reader_options);

std::vector<ReadRange> ranges;
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
ranges.reserve(includeTypes.size());
for (auto stripe : stripes) {
// get stripe information
const auto& stripe_info = footer_->stripes(stripe);
uint64_t stripe_footer_start =
stripe_info.offset() + stripe_info.index_length() + stripe_info.data_length();
uint64_t stripe_footer_length = stripe_info.footer_length();

// get stripe footer
std::unique_ptr<SeekableInputStream> pb_stream = createDecompressor(
contents_->compression,
std::make_unique<SeekableFileInputStream>(contents_->stream.get(), stripe_footer_start,
stripe_footer_length, *contents_->pool),
contents_->blockSize, *contents_->pool, contents_->readerMetrics);
proto::StripeFooter stripe_footer;
if (!stripe_footer.ParseFromZeroCopyStream(pb_stream.get())) {
throw ParseError(std::string("bad StripeFooter from ") + pb_stream->getName());
}

// traverse all streams in stripe footer, choose selected streams to prebuffer
uint64_t offset = stripe_info.offset();
for (int i = 0; i < stripe_footer.streams_size(); i++) {
const proto::Stream& stream = stripe_footer.streams(i);
if (offset + stream.length() > stripe_footer_start) {
std::stringstream msg;
msg << "Malformed stream meta at stream index " << i << " in stripe " << stripe
<< ": streamOffset=" << offset << ", streamLength=" << stream.length()
<< ", stripeOffset=" << stripe_info.offset()
<< ", stripeIndexLength=" << stripe_info.index_length()
<< ", stripeDataLength=" << stripe_info.data_length();
throw ParseError(msg.str());
}

if (stream.has_kind() && selected_columns[stream.column()]) {
const auto& kind = stream.kind();
if (kind == proto::Stream_Kind_DATA || kind == proto::Stream_Kind_DICTIONARY_DATA ||
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
kind == proto::Stream_Kind_PRESENT || kind == proto::Stream_Kind_LENGTH ||
kind == proto::Stream_Kind_SECONDARY) {
ranges.emplace_back(offset, stream.length());
}
}

offset += stream.length();
}

if (!readCache_)
readCache_ = std::make_shared<ReadRangeCache>(getStream(), options, contents_->pool);

readCache_->cache(std::move(ranges));
}
}

RowReader::~RowReader() {
// PASS
}
Expand Down
27 changes: 21 additions & 6 deletions c++/src/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "RLE.hh"
#include "SchemaEvolution.hh"
#include "TypeImpl.hh"
#include "io/Cache.hh"
#include "sargs/SargsApplier.hh"

namespace orc {
Expand Down Expand Up @@ -176,6 +177,8 @@ namespace orc {
// match read and file types
SchemaEvolution schemaEvolution_;

std::shared_ptr<ReadRangeCache> readCache_;

// load stripe index if not done so
void loadStripeIndex();

Expand Down Expand Up @@ -218,7 +221,8 @@ namespace orc {
* @param contents of the file
* @param options options for reading
*/
RowReaderImpl(std::shared_ptr<FileContents> contents, const RowReaderOptions& options);
RowReaderImpl(std::shared_ptr<FileContents> contents, const RowReaderOptions& options,
std::shared_ptr<ReadRangeCache> readCache = {});

// Select the columns from the options object
const std::vector<bool> getSelectedColumns() const override;
Expand All @@ -245,6 +249,10 @@ namespace orc {
const SchemaEvolution* getSchemaEvolution() const {
return &schemaEvolution_;
}

std::shared_ptr<ReadRangeCache> getReadCache() const {
return readCache_;
}
};

class ReaderImpl : public Reader {
Expand All @@ -260,6 +268,9 @@ namespace orc {
// footer
proto::Footer* footer_;
uint64_t numberOfStripes_;

// cached io ranges. only valid when preBuffer is invoked.
std::shared_ptr<ReadRangeCache> readCache_;
uint64_t getMemoryUse(int stripeIx, std::vector<bool>& selectedColumns);

// internal methods
Expand Down Expand Up @@ -352,18 +363,18 @@ namespace orc {
return contents_->blockSize;
}

const proto::Footer* getFooter() const {
return contents_->footer.get();
}

const Type* getSchema() const {
return contents_->schema.get();
}

InputStream* getStream() const {
InputStream* getStream() const override {
return contents_->stream.get();
}

const proto::Metadata* getMetadata() const override {
return contents_->metadata.get();
}

uint64_t getMemoryUse(int stripeIx = -1) override;

uint64_t getMemoryUseByFieldId(const std::list<uint64_t>& include, int stripeIx = -1) override;
Expand All @@ -374,6 +385,10 @@ namespace orc {

std::map<uint32_t, BloomFilterIndex> getBloomFilters(
uint32_t stripeIndex, const std::set<uint32_t>& included) const override;

void preBuffer(const std::vector<int>& stripes, const std::list<uint64_t>& includeTypes,
const CacheOptions& options) override;
void releaseBuffer(uint64_t boundary) override;
};
} // namespace orc

Expand Down
25 changes: 20 additions & 5 deletions c++/src/StripeStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "StripeStream.hh"
#include "RLE.hh"
#include "Reader.hh"
#include "io/Cache.hh"
#include "orc/Exceptions.hh"

#include "wrap/coded-stream-wrapper.h"
Expand All @@ -37,7 +38,8 @@ namespace orc {
stripeStart_(stripeStart),
input_(input),
writerTimezone_(writerTimezone),
readerTimezone_(readerTimezone) {
readerTimezone_(readerTimezone),
readCache_(reader.getReadCache()) {
// PASS
}

Expand Down Expand Up @@ -89,7 +91,6 @@ namespace orc {
if (stream.has_kind() && stream.kind() == kind &&
stream.column() == static_cast<uint64_t>(columnId)) {
uint64_t streamLength = stream.length();
uint64_t myBlock = shouldStream ? input_.getNaturalReadSize() : streamLength;
if (offset + streamLength > dataEnd) {
std::stringstream msg;
msg << "Malformed stream meta at stream index " << i << " in stripe " << stripeIndex_
Expand All @@ -99,9 +100,23 @@ namespace orc {
<< ", stripeDataLength=" << stripeInfo_.data_length();
throw ParseError(msg.str());
}
return createDecompressor(reader_.getCompression(),
std::make_unique<SeekableFileInputStream>(
&input_, offset, stream.length(), *pool, myBlock),

InputStream::BufferSlice slice;
if (readCache_) {
ReadRange range{offset, streamLength};
slice = readCache_->read(range);
}

uint64_t myBlock = shouldStream ? input_.getNaturalReadSize() : streamLength;
std::unique_ptr<SeekableInputStream> seekableInput;
if (slice.buffer) {
seekableInput = std::make_unique<SeekableArrayInputStream>(
slice.buffer->data() + slice.offset, slice.length);
} else {
seekableInput = std::make_unique<SeekableFileInputStream>(&input_, offset, streamLength,
*pool, myBlock);
}
return createDecompressor(reader_.getCompression(), std::move(seekableInput),
reader_.getCompressionSize(), *pool,
reader_.getFileContents().readerMetrics);
}
Expand Down
2 changes: 2 additions & 0 deletions c++/src/StripeStream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
namespace orc {

class RowReaderImpl;
class ReadRangeCache;

/**
* StripeStream Implementation
Expand All @@ -45,6 +46,7 @@ namespace orc {
InputStream& input_;
const Timezone& writerTimezone_;
const Timezone& readerTimezone_;
std::shared_ptr<ReadRangeCache> readCache_;

public:
StripeStreamsImpl(const RowReaderImpl& reader, uint64_t index,
Expand Down
Loading
Loading