Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-262: [C++] Support async io prefetch for orc c++ lib #2048

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
28 changes: 27 additions & 1 deletion c++/include/orc/OrcFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#ifndef ORC_FILE_HH
#define ORC_FILE_HH

#include <future>
#include <string>

#include "orc/Reader.hh"
Expand All @@ -36,6 +37,20 @@ namespace orc {
*/
class InputStream {
public:
using Buffer = DataBuffer<char>;
using BufferPtr = std::shared_ptr<Buffer>;

struct BufferSlice {
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
BufferSlice() : buffer(nullptr), offset(0), length(0) {}

BufferSlice(BufferPtr buffer, uint64_t offset, uint64_t length)
: buffer(std::move(buffer)), offset(offset), length(length) {}

BufferPtr buffer;
uint64_t offset;
uint64_t length;
};

virtual ~InputStream();

/**
Expand All @@ -58,6 +73,17 @@ namespace orc {
*/
virtual void read(void* buf, uint64_t length, uint64_t offset) = 0;

/**
* Read data asynchronously.
* @param offset the position in the stream to read from.
* @param length the number of bytes to read.
* @return a future that will be set to the buffer when the read is complete.
*/
virtual std::future<BufferPtr> readAsync(uint64_t /*offset*/, uint64_t /*length*/,
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
MemoryPool& /*pool*/) {
throw NotImplementedYet("readAsync not supported yet");
}

/**
* Get the name of the stream for error messages.
*/
Expand Down Expand Up @@ -153,4 +179,4 @@ namespace orc {
const WriterOptions& options);
} // namespace orc

#endif
#endif
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
27 changes: 27 additions & 0 deletions c++/include/orc/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ namespace orc {
// classes that hold data members so we can maintain binary compatibility
struct ReaderOptionsPrivate;
struct RowReaderOptionsPrivate;
struct CacheOptions;
class InputStream;

namespace proto {
class Footer;
class Metadata;
}; // namespace proto

/**
* Expose the reader metrics including the latency and
Expand Down Expand Up @@ -605,6 +612,26 @@ namespace orc {
*/
virtual std::map<uint32_t, BloomFilterIndex> getBloomFilters(
uint32_t stripeIndex, const std::set<uint32_t>& included) const = 0;

/**
* Get the input stream for the ORC file.
*/
virtual InputStream* getStream() const = 0;

/**
* Get the footer of the ORC file.
*/
virtual const proto::Footer* getFooter() const = 0;
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved

/**
* Get the schema of the ORC file.
*/
virtual const proto::Metadata* getMetadata() const = 0;

virtual void preBuffer(const std::vector<int>& stripes, const std::list<uint64_t>& includeTypes,
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
const CacheOptions& options) = 0;

virtual void releaseBuffer(uint64_t boundary) = 0;
};

/**
Expand Down
1 change: 1 addition & 0 deletions c++/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ set(SOURCE_FILES
orc_proto.pb.h
io/InputStream.cc
io/OutputStream.cc
io/Cache.cc
sargs/ExpressionTree.cc
sargs/Literal.cc
sargs/PredicateLeaf.cc
Expand Down
34 changes: 0 additions & 34 deletions c++/src/MemoryPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ namespace orc {
for (uint64_t i = currentSize_; i > newSize; --i) {
(buf_ + i - 1)->~T();
}
} else if (newSize > currentSize_) {
for (uint64_t i = currentSize_; i < newSize; ++i) {
new (buf_ + i) T();
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
}
}
currentSize_ = newSize;
}
Expand Down Expand Up @@ -134,9 +130,6 @@ namespace orc {
template <>
void DataBuffer<char>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
memset(buf_ + currentSize_, 0, newSize - currentSize_);
}
currentSize_ = newSize;
}

Expand All @@ -152,9 +145,6 @@ namespace orc {
template <>
void DataBuffer<char*>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(char*));
}
currentSize_ = newSize;
}

Expand All @@ -170,9 +160,6 @@ namespace orc {
template <>
void DataBuffer<double>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(double));
}
currentSize_ = newSize;
}

Expand All @@ -188,9 +175,6 @@ namespace orc {
template <>
void DataBuffer<float>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(float));
}
currentSize_ = newSize;
}

Expand All @@ -206,9 +190,6 @@ namespace orc {
template <>
void DataBuffer<int64_t>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int64_t));
}
currentSize_ = newSize;
}

Expand All @@ -224,9 +205,6 @@ namespace orc {
template <>
void DataBuffer<int32_t>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int32_t));
}
currentSize_ = newSize;
}

Expand All @@ -242,9 +220,6 @@ namespace orc {
template <>
void DataBuffer<int16_t>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int16_t));
}
currentSize_ = newSize;
}

Expand All @@ -260,9 +235,6 @@ namespace orc {
template <>
void DataBuffer<int8_t>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int8_t));
}
currentSize_ = newSize;
}

Expand All @@ -278,9 +250,6 @@ namespace orc {
template <>
void DataBuffer<uint64_t>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(uint64_t));
}
currentSize_ = newSize;
}

Expand All @@ -296,9 +265,6 @@ namespace orc {
template <>
void DataBuffer<unsigned char>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, newSize - currentSize_);
}
currentSize_ = newSize;
}

Expand Down
79 changes: 76 additions & 3 deletions c++/src/Reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ namespace orc {
buildTypeNameIdMap(contents_->schema.get());
}

RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> contents, const RowReaderOptions& opts)
RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> contents, const RowReaderOptions& opts,
std::shared_ptr<ReadRangeCache> cachedSource)
: localTimezone_(getLocalTimezone()),
contents_(contents),
throwOnHive11DecimalOverflow_(opts.getThrowOnHive11DecimalOverflow()),
Expand All @@ -255,7 +256,8 @@ namespace orc {
firstRowOfStripe_(*contents_->pool, 0),
enableEncodedBlock_(opts.getEnableLazyDecoding()),
readerTimezone_(getTimezoneByName(opts.getTimezoneName())),
schemaEvolution_(opts.getReadType(), contents_->schema.get()) {
schemaEvolution_(opts.getReadType(), contents_->schema.get()),
cachedSource_(std::move(cachedSource)) {
uint64_t numberOfStripes;
numberOfStripes = static_cast<uint64_t>(footer_->stripes_size());
currentStripe_ = numberOfStripes;
Expand Down Expand Up @@ -838,7 +840,7 @@ namespace orc {
// load stripe statistics for PPD
readMetadata();
}
return std::make_unique<RowReaderImpl>(contents_, opts);
return std::make_unique<RowReaderImpl>(contents_, opts, cachedSource_);
}

uint64_t maxStreamsForType(const proto::Type& type) {
Expand Down Expand Up @@ -1474,6 +1476,77 @@ namespace orc {
return ret;
}

void ReaderImpl::releaseBuffer(uint64_t boundary) {
if (cachedSource_) {
cachedSource_->evictEntriesBefore(boundary);
}
}

void ReaderImpl::preBuffer(const std::vector<int>& stripes,
const std::list<uint64_t>& includeTypes, const CacheOptions& options) {
if (stripes.empty() || includeTypes.empty()) {
return;
}

orc::RowReaderOptions row_reader_options;
row_reader_options.includeTypes(includeTypes);
ColumnSelector column_selector(contents_.get());
std::vector<bool> selected_columns;
column_selector.updateSelected(selected_columns, row_reader_options);

std::vector<ReadRange> ranges;
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
ranges.reserve(includeTypes.size());
for (auto stripe : stripes) {
// get stripe information
const auto& stripe_info = footer_->stripes(stripe);
uint64_t stripe_footer_start =
stripe_info.offset() + stripe_info.index_length() + stripe_info.data_length();
uint64_t stripe_footer_length = stripe_info.footer_length();

// get stripe footer
std::unique_ptr<SeekableInputStream> pb_stream = createDecompressor(
contents_->compression,
std::make_unique<SeekableFileInputStream>(contents_->stream.get(), stripe_footer_start,
stripe_footer_length, *contents_->pool),
contents_->blockSize, *contents_->pool, contents_->readerMetrics);
proto::StripeFooter stripe_footer;
if (!stripe_footer.ParseFromZeroCopyStream(pb_stream.get())) {
throw ParseError(std::string("bad StripeFooter from ") + pb_stream->getName());
}

// traverse all streams in stripe footer, choose selected streams to prebuffer
uint64_t offset = stripe_info.offset();
for (int i = 0; i < stripe_footer.streams_size(); i++) {
const proto::Stream& stream = stripe_footer.streams(i);
if (offset + stream.length() > stripe_footer_start) {
std::stringstream msg;
msg << "Malformed stream meta at stream index " << i << " in stripe " << stripe
<< ": streamOffset=" << offset << ", streamLength=" << stream.length()
<< ", stripeOffset=" << stripe_info.offset()
<< ", stripeIndexLength=" << stripe_info.index_length()
<< ", stripeDataLength=" << stripe_info.data_length();
throw ParseError(msg.str());
}

if (stream.has_kind() && selected_columns[stream.column()]) {
const auto& kind = stream.kind();
if (kind == proto::Stream_Kind_DATA || kind == proto::Stream_Kind_DICTIONARY_DATA ||
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
kind == proto::Stream_Kind_PRESENT || kind == proto::Stream_Kind_LENGTH ||
kind == proto::Stream_Kind_SECONDARY) {
ranges.emplace_back(offset, stream.length());
}
}

offset += stream.length();
}

if (!cachedSource_)
cachedSource_ = std::make_shared<ReadRangeCache>(getStream(), options, contents_->pool);

cachedSource_->cache(std::move(ranges));
}
}

RowReader::~RowReader() {
// PASS
}
Expand Down
Loading
Loading