Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Reader async #223

Open
wants to merge 7 commits into
base: ape
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file not shown.
Binary file added oap-ape/ape-native/build1202/lib/libparse.so
Binary file not shown.
Binary file not shown.
Binary file added oap-ape/ape-native/buildwithqpl/lib/libparse.so
Binary file not shown.
640 changes: 554 additions & 86 deletions oap-ape/ape-native/src/reader.cc

Large diffs are not rendered by default.

42 changes: 29 additions & 13 deletions oap-ape/ape-native/src/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,30 @@
#include "src/utils/GroupByUtils.h"
#include "src/utils/DumpUtils.h"

struct readReady
{
std::vector<int64_t>* buffersPtr;
int64_t* buffersPtr_;
std::vector<int64_t>* nullsPtr;
int64_t* nullsPtr_;
int totalRowGroups ;
int totalRowGroupsRead ;
int totalColumns ;
int64_t totalRows ;
int firstRowGroupIndex ;
int currentRowGroup ;
int64_t totalRowsRead ;
int64_t totalRowsLoadedSoFar ;
int rowsToRead;

int currentBufferedRowGroup ;

int currentBatchSize = 0;
int initRequiredColumnCount = 0;
int initPlusFilterRequiredColumnCount = 0;
int32_t dumpAggCursor = 0;
};

namespace ape {
class Reader {
public:
Expand Down Expand Up @@ -99,6 +123,7 @@ class Reader {
int dumpBufferAfterAgg(int groupBySize, int aggExprsSize, const std::vector<Key>& keys,
const std::vector<DecimalVector>& results, int64_t* oriBufferPtr,
int64_t* oriNullsPtr, int32_t offset, int32_t length);
//void copyStruct(struct readReady *readPart, struct readReady *filterPart);

arrow::Result<std::shared_ptr<arrow::fs::HadoopFileSystem>> fsResult;
arrow::fs::HdfsOptions* options;
Expand All @@ -116,15 +141,9 @@ class Reader {
std::vector<std::shared_ptr<parquet::ColumnReader>> columnReaders;
std::vector<int> requiredRowGroupId;

int totalRowGroups = 0;
int totalRowGroupsRead = 0;
int totalColumns = 0;
int64_t totalRows = 0;
int firstRowGroupIndex = 0;
struct readReady* readPart;


int currentRowGroup = 0;
int64_t totalRowsRead = 0;
int64_t totalRowsLoadedSoFar = 0;

std::shared_ptr<RootFilterExpression> filterExpression;
std::chrono::duration<double> filterTime = std::chrono::nanoseconds::zero();
Expand All @@ -133,13 +152,10 @@ class Reader {
std::vector<char*> extraByteArrayBuffers;

bool filterReset = false;
int currentBatchSize = 0;
int initRequiredColumnCount = 0;
std::vector<std::string> filterColumnNames;
std::vector<char*> filterDataBuffers;
std::vector<char*> filterNullBuffers;

int initPlusFilterRequiredColumnCount = 0;
bool aggReset = false;
std::vector<std::string> aggColumnNames;
std::vector<char*> aggDataBuffers;
Expand All @@ -154,7 +170,7 @@ class Reader {
std::shared_ptr<sw::redis::ConnectionOptions> redisConnectionOptions;

bool preBufferEnabled = false;
int currentBufferedRowGroup = -1;


std::vector<int> usedInitBufferIndex;
std::vector<parquet::Type::type> typeVector = std::vector<parquet::Type::type>();
Expand All @@ -163,6 +179,6 @@ class Reader {
std::vector<Key> keys = std::vector<Key>();
ApeHashMap map;

int32_t dumpAggCursor = 0;

};
} // namespace ape