Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Ape base #222

Open
wants to merge 2 commits into
base: ape
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion oap-ape/ape-native/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ if(NOT REDIS_PLUS_PLUS_LIB)
message(FATAL_ERROR "redis++ library not found")
endif()

set(PARSE_SRC utils/FilterExpression.cc utils/JsonConvertor.cc utils/Type.cc utils/UnaryFilter.cc)
set(PARSE_SRC utils/FilterExpression.cc utils/JsonConvertor.cc utils/Type.cc utils/UnaryFilter.cc utils/PredicateExpression.cc)
set(DECIMAL_SRC utils/DecimalUtil.cc utils/DecimalConvertor.cc)
set(AGG_SRC utils/AggExpression.cc)

Expand Down
51 changes: 49 additions & 2 deletions oap-ape/ape-native/src/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void Reader::init(std::string fileName, std::string hdfsHost, int hdfsPort,
ARROW_LOG(DEBUG) << "hdfsHost " << hdfsHost << " port " << hdfsPort;

options->ConfigureEndPoint(hdfsHost, hdfsPort);
// todo: if we delete `options`, it will core dump, seems like free twice.
// TODO: if we delete `options`, it will core dump, seems like free twice.
auto result = arrow::fs::HadoopFileSystem::Make(*options);
if (!result.ok()) {
ARROW_LOG(WARNING) << "HadoopFileSystem Make failed! err msg:"
Expand Down Expand Up @@ -68,6 +68,7 @@ void Reader::init(std::string fileName, std::string hdfsHost, int hdfsPort,

fileMetaData = parquetReader->metadata();

this->useRowGroupFilter = false;
this->firstRowGroupIndex = firstRowGroup;
this->totalRowGroups = rowGroupToRead;

Expand Down Expand Up @@ -457,7 +458,7 @@ int Reader::allocateExtraBuffers(int batchSize, std::vector<int64_t>& buffersPtr
allocateFilterBuffers(batchSize);
}

if (aggExprs.size()) { // todo: group by agg size
if (aggExprs.size()) { // TODO: group by agg size
allocateAggBuffers(batchSize);
}

Expand Down Expand Up @@ -564,6 +565,18 @@ bool Reader::checkEndOfRowGroup() {
// if a splitFile contains rowGroup [2,5], currentRowGroup is 2
// rowGroupReaders index starts from 0
ARROW_LOG(DEBUG) << "totalRowsLoadedSoFar: " << totalRowsLoadedSoFar;
//find next row group passing the predicate filter
if(useRowGroupFilter)
{
while(!doPredicateFilter(currentRowGroup)){
currentRowGroup++;
totalRowGroupsRead++;
}
if(currentRowGroup > firstRowGroupIndex + totalRowGroups - 1){
return true;
}
}

rowGroupReader = rowGroupReaders[currentRowGroup - firstRowGroupIndex];
currentRowGroup++;
totalRowGroupsRead++;
Expand Down Expand Up @@ -593,6 +606,15 @@ void Reader::setFilter(std::string filterJsonStr) {
filterExpression = std::make_shared<RootFilterExpression>(
"root", std::dynamic_pointer_cast<FilterExpression>(tmpExpression));

//set predicate filter
if(useRowGroupFilter)
{
std::shared_ptr<PredicateExpression> tmpExpressionP =
JsonConvertor::parseToPredicateExpression(filterJsonStr);

predicateExpression = std::make_shared<RootPredicateExpression>("root", tmpExpressionP);
}

// get column names from expression
filterColumnNames.clear();
setFilterColumnNames(tmpExpression);
Expand Down Expand Up @@ -879,4 +901,29 @@ bool Reader::isNativeEnabled() {
arrow::internal::CpuInfo::Vendor::Intel;
}

bool Reader::doPredicateFilter(int rowGroupIndex){
int8_t res;

if(!predicateExpression or rowGroupIndex > firstRowGroupIndex + totalRowGroups - 1){
//std::cout<<"rowGroupIndex: "<<rowGroupIndex<<" firstRowGroupIndex: "<<firstRowGroupIndex<<" totalRowGroups: "<<totalRowGroups<<std::endl;
return true;
}

std::unique_ptr<parquet::RowGroupMetaData> urgMataData = fileMetaData->RowGroup(rowGroupIndex);
std::shared_ptr<parquet::RowGroupMetaData> rgMataData = std::move(urgMataData);

predicateExpression->setSchema(schema);
predicateExpression->setStatistic(rgMataData);
predicateExpression->PredicateWithParam(res);

if(res > 0){
ARROW_LOG(DEBUG) <<"predicate pass.";
return true;
}
else{
ARROW_LOG(DEBUG) <<"predicate not pass.";
return false;
}
}

} // namespace ape
5 changes: 5 additions & 0 deletions oap-ape/ape-native/src/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "utils/AggExpression.h"
#include "utils/FilterExpression.h"
#include "utils/PredicateExpression.h"
#include "utils/PlasmaCacheManager.h"
#include "utils/JsonConvertor.h"
#include "utils/Type.h"
Expand Down Expand Up @@ -66,6 +67,8 @@ class Reader {

void setPreBufferEnabled(bool isEnabled);

bool doPredicateFilter(int rowGroupIndex);

static bool isNativeEnabled();

private:
Expand Down Expand Up @@ -127,11 +130,13 @@ class Reader {
int64_t totalRowsLoadedSoFar = 0;

std::shared_ptr<RootFilterExpression> filterExpression;
std::shared_ptr<RootPredicateExpression> predicateExpression;
std::chrono::duration<double> filterTime = std::chrono::nanoseconds::zero();
std::chrono::duration<double> aggTime = std::chrono::nanoseconds::zero();

std::vector<char*> extraByteArrayBuffers;

bool useRowGroupFilter = false;
bool filterReset = false;
int currentBatchSize = 0;
int initRequiredColumnCount = 0;
Expand Down
6 changes: 5 additions & 1 deletion oap-ape/ape-native/src/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ set(ARROW_STATIC -Wl,-Bstatic; -Wl,--whole-archive; arrow_static; -Wl,--no-whole
set(THREAD -Wl,-Bdynamic;-Wl,--as-needed;Threads::Threads;-Wl,--no-as-needed)

add_executable(convertorTest convertorTest.cc)
target_link_libraries(convertorTest gtest_main parse ${ARROW_STATIC} ${THREAD})
target_link_libraries(convertorTest gtest_main parquet_jni ${ARROW_STATIC} ${THREAD})
add_test(NAME convertorTest COMMAND ${EXECUTABLE_OUTPUT_PATH}/convertorTest)

add_executable(decimalTest decimalTest.cc)
Expand All @@ -76,6 +76,10 @@ target_link_libraries(parquetHdfsTest gtest_main parquet_jni ${ARROW_STATIC} ${T
add_executable(plasmaTest plasmaTest.cc)
target_link_libraries(plasmaTest gtest_main parquet_jni ${ARROW_STATIC} ${THREAD})

add_executable(predicateFilterTest predicateFilterTest.cc)
target_link_libraries(predicateFilterTest gtest_main parquet_jni ${ARROW_STATIC} ${THREAD})
add_test(NAME predicateFilterTest COMMAND ${EXECUTABLE_OUTPUT_PATH}/predicateFilterTest)

if(APE_CI)
# Will not run these test
else()
Expand Down
18 changes: 13 additions & 5 deletions oap-ape/ape-native/src/test/parquetHdfsTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
#include <parquet/api/reader.h>
#include <gtest/gtest.h>

#include <nlohmann/json.hpp>

TEST(ParquetHdfsTest, ReadTest) {
arrow::fs::HdfsOptions options_;

std::string hdfs_host = "sr585";
int hdfs_port = 9000;
std::string hdfs_host = "clx06-AEP";
int hdfs_port = 8020;
// std::string hdfs_user = "kunshang";

options_.ConfigureEndPoint(hdfs_host, hdfs_port);
Expand All @@ -42,8 +44,8 @@ TEST(ParquetHdfsTest, ReadTest) {
std::make_shared<arrow::fs::SubTreeFileSystem>("", *result);

std::string file_name =
"/tpcds_10g/store_sales/"
"part-00000-74feb3b4-1954-4be7-802d-a50912793bea-c000.snappy.parquet";
"/user/hive/warehouse/tpcds_hdfs_parquet_10.db/store_sales/ss_sold_date_sk=2450816/"
"part-00009-0828d1ab-ef1f-4b55-bf94-c071fb76c353.c000.snappy.parquet";

auto file_result = fs_->OpenInputFile(file_name);
EXPECT_TRUE(file_result.ok()) << "Open hdfs file failed";
Expand Down Expand Up @@ -84,7 +86,7 @@ TEST(ParquetHdfsTest, ReadTest) {
column_reader = row_group_reader->Column(1);
parquet::Int32Reader* int32_reader =
static_cast<parquet::Int32Reader*>(column_reader.get());
int batch_size = 10000;
int batch_size = 1000;
int32_t* values_rb = (int32_t*)std::malloc(batch_size * sizeof(int32_t));
int64_t values_read = 0;
int64_t rows_read = 0;
Expand All @@ -105,10 +107,15 @@ TEST(ParquetHdfsTest, ReadTest) {

// ReadBatchSpaced will record a null bitmap.
std::cout << std::endl << "test ReadBatchSpaced API" << std::endl;
std::cout<<"malloc values_rbs"<<std::endl;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove these logs?

int32_t* values_rbs = (int32_t*)std::malloc(batch_size * sizeof(int32_t));
std::cout<<"malloc def_levels"<<std::endl;
int16_t* def_levels = (int16_t*)std::malloc(batch_size * sizeof(int16_t));
std::cout<<"malloc rep_devels"<<std::endl;
int16_t* rep_levels = (int16_t*)std::malloc(batch_size * sizeof(int16_t));
std::cout<<"malloc valid_bits"<<std::endl;
uint8_t* valid_bits = (uint8_t*)std::malloc(batch_size * sizeof(uint8_t));
std::cout<<"***********************"<<std::endl;
int64_t levels_read = 0;
int64_t null_count = 0;

Expand All @@ -132,4 +139,5 @@ TEST(ParquetHdfsTest, ReadTest) {
std::free(def_levels);
std::free(rep_levels);
std::free(valid_bits);
std::cout<<"parquetHdfsTest done!"<<std::endl;
}
170 changes: 170 additions & 0 deletions oap-ape/ape-native/src/test/predicateFilterTest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#include <stdlib.h>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add license


#include <iostream>
#include <memory>

#include <arrow/api.h>
#include <arrow/filesystem/api.h>
#include <parquet/api/reader.h>
#include <gtest/gtest.h>
#include <vector>

#include "src/reader.h"
#include "src/utils/PredicateExpression.h"

TEST(predicateFilterTest, minMaxTest)
{
arrow::fs::HdfsOptions options_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format this file?


std::string hdfs_host = "clx06-AEP";
int hdfs_port = 8020;

options_.ConfigureEndPoint(hdfs_host, hdfs_port);

auto result = arrow::fs::HadoopFileSystem::Make(options_);
EXPECT_TRUE(result.ok()) << "HadoopFileSystem Make failed";

std::shared_ptr<arrow::fs::FileSystem> fs_ =
std::make_shared<arrow::fs::SubTreeFileSystem>("", *result);

std::string file_name =
"/user/hive/warehouse/tpcds_hdfs_parquet_10.db/store_sales/ss_sold_date_sk=2450817/"
"part-00013-0828d1ab-ef1f-4b55-bf94-c071fb76c353.c000.snappy.parquet";

auto file_result = fs_->OpenInputFile(file_name);
EXPECT_TRUE(file_result.ok()) << "Open hdfs file failed";

std::shared_ptr<arrow::io::RandomAccessFile> file = file_result.ValueOrDie();
std::cout << "file size is " << file->GetSize().ValueOrDie() << std::endl;

parquet::ReaderProperties properties;
// std::shared_ptr<parquet::FileMetaData> metadata;
std::unique_ptr<parquet::ParquetFileReader> parquetReader =
parquet::ParquetFileReader::Open(file, properties, NULLPTR);

std::shared_ptr<parquet::FileMetaData> fileMetaData = parquetReader->metadata();

int numRows = fileMetaData->num_rows();
int numCols = fileMetaData->num_columns();
int numRowGroups = fileMetaData->num_row_groups();
std::string schema = fileMetaData->schema()->ToString();

std::cout<<"parquet file has "<<numRows<<" rows"<<std::endl
<<numCols<<" cols"<<std::endl
<<numRowGroups<<" row groups"<<std::endl;

std::cout<<"schema: "<<schema<<std::endl;

if(numRowGroups < 1)
{
std::cout<<"not enough row groups for test"<<std::endl;
return;
}

int rowGroupIndex = 0;
std::unique_ptr<parquet::RowGroupMetaData> urgMataData = fileMetaData->RowGroup(rowGroupIndex);
std::shared_ptr<parquet::RowGroupMetaData> rgMataData = std::move(urgMataData);

//std::vector<std::unique_ptr<parquet::ColumnChunkMetaData>> columnChunkMeta;
//columnChunkMeta.resize(numCols);
std::unique_ptr<parquet::ColumnChunkMetaData> columnChunkMeta;

for(int i=0; i<numCols; i++)
{
columnChunkMeta = rgMataData->ColumnChunk(i);
std::string column_name = fileMetaData->schema()->Column(i)->name();
parquet::Type::type column_type = fileMetaData->schema()->Column(i)->physical_type();
std::cout<<"current column["<<i<<"]: "<<column_name
<<" type "<<column_type
<<std::endl;

std::shared_ptr<parquet::Statistics> statistic = columnChunkMeta->statistics();
if(!statistic->HasMinMax())
{
std::cout<<"This column does not have valid min max value."<<std::endl;
continue;
}
switch (column_type)
{
case parquet::Type::BOOLEAN:{
auto boolStatistic = std::static_pointer_cast<parquet::BoolStatistics>(statistic);
std::cout<<"min: "<<boolStatistic->min()<<" max: "<<boolStatistic->max()<<std::endl;
break;}
case parquet::Type::INT32:{
auto int32Statistic = std::static_pointer_cast<parquet::Int32Statistics>(statistic);
std::cout<<"min: "<<int32Statistic->min()<<" max: "<<int32Statistic->max()<<std::endl;
break;}
case parquet::Type::INT64:{
auto int64Statistic = std::static_pointer_cast<parquet::Int64Statistics>(statistic);
std::cout<<"min: "<<int64Statistic->min()<<" max: "<<int64Statistic->max()<<std::endl;
break;}
case parquet::Type::FLOAT:{
auto floatStatistic = std::static_pointer_cast<parquet::FloatStatistics>(statistic);
std::cout<<"min: "<<floatStatistic->min()<<" max: "<<floatStatistic->max()<<std::endl;
break;}

default:
break;
}

}

}

TEST(predicateFilterTest, predicateTest) {

std::string hdfs_host = "clx06-AEP";
int hdfs_port = 8020;

std::string file_name =
"/user/hive/warehouse/tpcds_hdfs_parquet_10.db/store_sales/ss_sold_date_sk=2450817/"
"part-00013-0828d1ab-ef1f-4b55-bf94-c071fb76c353.c000.snappy.parquet";

std::string requiredColumnName = R"(
{
"name":"my_schema",
"fields":[
{
"name":"ss_sold_time_sk",
"value":true
},
{
"name":"ss_sold_time_sk",
"value":true
}
]
}
)";

std::string filterJson = R"(
{
"FilterTypeName":"and",
"LeftNode":{
"FilterTypeName":"lt",
"ColumnType":"Integer",
"ColumnName":"ss_sold_time_sk",
"Value":"28855"
},
"RightNode":{
"FilterTypeName":"gt",
"ColumnType":"Integer",
"ColumnName":"ss_item_sk",
"Value":"3"
}
}

)";

int firstRowGroup = 0;
int rowGroupToRead = 1;

ape::Reader test_reader;
test_reader.init(file_name, hdfs_host, hdfs_port,requiredColumnName, firstRowGroup, rowGroupToRead);
test_reader.setFilter(filterJson);

bool res = test_reader.doPredicateFilter(0);
std::cout<<"predicateFiltere: "<<res<<std::endl;

test_reader.close();

}
Loading