Skip to content

Commit

Permalink
Update Arrow writers to use SaveOption
Browse files Browse the repository at this point in the history
  • Loading branch information
JesseMckinzie committed Oct 23, 2023
1 parent 5526415 commit 04b6952
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 25 deletions.
12 changes: 5 additions & 7 deletions src/nyx/arrow_output_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@

#ifdef USE_ARROW

std::shared_ptr<ApacheArrowWriter> ArrowOutputStream::create_arrow_file(const std::string& arrow_file_type,
std::shared_ptr<ApacheArrowWriter> ArrowOutputStream::create_arrow_file(const SaveOption& arrow_file_type,
const std::string& arrow_file_path,
const std::vector<std::string>& header) {

std::string arrow_file_type_upper = Nyxus::toupper(arrow_file_type);

if(arrow_file_path != "" && !fs::is_directory(arrow_file_path) && !(Nyxus::ends_with_substr(arrow_file_path, ".arrow") || Nyxus::ends_with_substr(arrow_file_path, ".feather") || Nyxus::ends_with_substr(arrow_file_path, ".parquet"))) {
throw std::invalid_argument("The arrow file path must end in \".arrow\"");
}

if (!(arrow_file_type_upper == "ARROW" || arrow_file_type_upper == "ARROWIPC" || arrow_file_type_upper == "PARQUET")) {
throw std::invalid_argument("The valid file types are ARROW, ARROWIPC, or PARQUET");
if (arrow_file_type != SaveOption::saveArrowIPC && arrow_file_type != SaveOption::saveParquet) {
throw std::invalid_argument("The valid save options are SaveOption::saveArrowIPC or SaveOption::saveParquet.");
}

std::string extension = (arrow_file_type_upper == "PARQUET") ? ".parquet" : ".arrow";
std::string extension = (arrow_file_type == SaveOption::saveParquet) ? ".parquet" : ".arrow";

if (arrow_file_path == "") {
arrow_file_path_ = "NyxusFeatures" + extension;
Expand Down Expand Up @@ -49,7 +47,7 @@ std::string ArrowOutputStream::get_arrow_path() {

#else

std::shared_ptr<ApacheArrowWriter> ArrowOutputStream::create_arrow_file(const std::string& arrow_file_type,
std::shared_ptr<ApacheArrowWriter> ArrowOutputStream::create_arrow_file(const SaveOption& arrow_file_type,
const std::string& arrow_file_path,
const std::vector<std::string>& header) {

Expand Down
7 changes: 3 additions & 4 deletions src/nyx/arrow_output_stream.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
#pragma once



#include <string>
#include <memory>

#include "output_writers.h"
#include "helpers/helpers.h"
#include "globals.h"

#ifdef USE_ARROW
#include <arrow/table.h>
Expand Down Expand Up @@ -37,7 +36,7 @@ class ArrowOutputStream {
std::shared_ptr<arrow::Table> arrow_table_ = nullptr;

public:
std::shared_ptr<ApacheArrowWriter> create_arrow_file(const std::string& arrow_file_type,
std::shared_ptr<ApacheArrowWriter> create_arrow_file(const SaveOption& arrow_file_type,
const std::string& arrow_file_path,
const std::vector<std::string>& header);
std::shared_ptr<arrow::Table> get_arrow_table(const std::string& file_path);
Expand Down Expand Up @@ -67,7 +66,7 @@ class ArrowOutputStream {
std::shared_ptr<arrow::Table> arrow_table_ = nullptr;

public:
std::shared_ptr<ApacheArrowWriter> create_arrow_file(const std::string& arrow_file_type,
std::shared_ptr<ApacheArrowWriter> create_arrow_file(const SaveOption& arrow_file_type,
const std::string& arrow_file_path,
const std::vector<std::string>& header);
std::shared_ptr<arrow::Table> get_arrow_table(const std::string& file_path);
Expand Down
2 changes: 1 addition & 1 deletion src/nyx/globals.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace Nyxus
extern FeatureManager theFeatureMgr;
extern ImageLoader theImLoader;

enum class SaveOption {saveCSV, saveArrow, saveBuffer};
enum class SaveOption {saveCSV, saveBuffer, saveArrowIPC, saveParquet};

bool scanFilePairParallel(const std::string& intens_fpath, const std::string& label_fpath, int num_fastloader_threads, int num_sensemaker_threads, int filepair_index, int tot_num_filepairs);
std::string getPureFname(const std::string& fpath);
Expand Down
8 changes: 7 additions & 1 deletion src/nyx/main_nyxus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ int main (int argc, char** argv)
int min_online_roi_size = 0;

SaveOption saveOption = [](){
if (theEnvironment.use_apache_writers) return SaveOption::saveArrow;
if (theEnvironment.use_apache_writers) {
if (Nyxus::toupper(theEnvironment.arrow_output_type) == "ARROW") {
return SaveOption::saveArrowIPC;
} else {
return SaveOption::saveParquet;
}
}
else if (theEnvironment.useCsv) {return SaveOption::saveCSV;}
else {return SaveOption::saveBuffer;}
}();
Expand Down
26 changes: 23 additions & 3 deletions src/nyx/python/new_bindings_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,13 @@ py::tuple featurize_directory_imp (
int min_online_roi_size = 0;

SaveOption saveOption = [](){
if (theEnvironment.use_apache_writers) return SaveOption::saveArrow;
if (theEnvironment.use_apache_writers) {
if (Nyxus::toupper(theEnvironment.arrow_output_type) == "ARROW") {
return SaveOption::saveArrowIPC;
} else {
return SaveOption::saveParquet;
}
}
else {return SaveOption::saveBuffer;}
}();

Expand Down Expand Up @@ -269,10 +275,18 @@ py::tuple featurize_montage_imp (
std::string error_message = "";

SaveOption saveOption = [](){
if (theEnvironment.use_apache_writers) return SaveOption::saveArrow;
if (theEnvironment.use_apache_writers) {
if (Nyxus::toupper(theEnvironment.arrow_output_type) == "ARROW") {
return SaveOption::saveArrowIPC;
} else {
return SaveOption::saveParquet;
}
}
else {return SaveOption::saveBuffer;}
}();



int errorCode = processMontage(
intensity_images,
label_images,
Expand Down Expand Up @@ -354,7 +368,13 @@ py::tuple featurize_fname_lists_imp (const py::list& int_fnames, const py::list
int errorCode;

SaveOption saveOption = [](){
if (theEnvironment.use_apache_writers) return SaveOption::saveArrow;
if (theEnvironment.use_apache_writers) {
if (Nyxus::toupper(theEnvironment.arrow_output_type) == "ARROW") {
return SaveOption::saveArrowIPC;
} else {
return SaveOption::saveParquet;
}
}
else {return SaveOption::saveBuffer;}
}();

Expand Down
20 changes: 11 additions & 9 deletions src/nyx/scan_fastloader_way.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,15 @@ namespace Nyxus
// One-time initialization
init_feature_buffers();

bool write_apache = (saveOption == SaveOption::saveArrowIPC || saveOption == SaveOption::saveParquet);

// initialize arrow writer if needed
if (saveOption == SaveOption::saveArrow) {
if (write_apache) {

theEnvironment.arrow_stream = ArrowOutputStream();

try {
theEnvironment.arrow_writer = theEnvironment.arrow_stream.create_arrow_file(theEnvironment.arrow_output_type, outputDir, Nyxus::get_header(theFeatureSet.getEnabledFeatures()));
theEnvironment.arrow_writer = theEnvironment.arrow_stream.create_arrow_file(saveOption, outputDir, Nyxus::get_header(theFeatureSet.getEnabledFeatures()));
} catch (const std::exception &err) {
std::cout << "Error creating Arrow file: " << err.what() << std::endl;
return 1;
Expand Down Expand Up @@ -275,7 +276,7 @@ namespace Nyxus
}


if (saveOption == SaveOption::saveArrow) {
if (write_apache) {

auto status = theEnvironment.arrow_writer->write(Nyxus::get_feature_values());

Expand Down Expand Up @@ -346,7 +347,7 @@ namespace Nyxus
}
#endif

if (saveOption == SaveOption::saveArrow) {
if (write_apache) {
// close arrow file after use
auto status = theEnvironment.arrow_writer->close();

Expand All @@ -372,13 +373,14 @@ namespace Nyxus
SaveOption saveOption,
const std::string& outputDir)
{
bool write_apache = (saveOption == SaveOption::saveArrowIPC || saveOption == SaveOption::saveParquet);

if (saveOption == SaveOption::saveArrow) {
if (write_apache) {

theEnvironment.arrow_stream = ArrowOutputStream();

try {
theEnvironment.arrow_writer = theEnvironment.arrow_stream.create_arrow_file(theEnvironment.arrow_output_type, outputDir, Nyxus::get_header(theFeatureSet.getEnabledFeatures()));
theEnvironment.arrow_writer = theEnvironment.arrow_stream.create_arrow_file(saveOption, outputDir, Nyxus::get_header(theFeatureSet.getEnabledFeatures()));
} catch (const std::exception &err) {
error_message = err.what();
return 1;
Expand Down Expand Up @@ -409,8 +411,8 @@ namespace Nyxus
}


if (saveOption == SaveOption::saveArrow) {

if (write_apache) {
auto status = theEnvironment.arrow_writer->write(Nyxus::get_feature_values());

if (!status.ok()) {
Expand Down Expand Up @@ -449,7 +451,7 @@ namespace Nyxus
}


if (saveOption == SaveOption::saveArrow) {
if (write_apache) {
// close arrow file after use
auto status = theEnvironment.arrow_writer->close();

Expand Down

0 comments on commit 04b6952

Please sign in to comment.