Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Apache Arrow stream writers #147

Merged
merged 30 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f40c692
Add Apache Arrow stream writers
JesseMckinzie Sep 20, 2023
63000b8
Update Apache Arrow writers to use stream writers
JesseMckinzie Sep 20, 2023
142c80d
Add unit tests for get_arrow_table
JesseMckinzie Sep 20, 2023
0bb41eb
Update name of Gabor parameter
JesseMckinzie Sep 20, 2023
a5b3d2b
Update main_nyxus.cpp
JesseMckinzie Sep 20, 2023
3d121a1
Update get_arrow_table methods
JesseMckinzie Sep 20, 2023
1e69ff4
Remove old arrow writers
JesseMckinzie Sep 20, 2023
1f1ff92
Update new_bindings_py.cpp
JesseMckinzie Sep 21, 2023
13a397a
Update main_nyxus.cpp
JesseMckinzie Sep 21, 2023
83efd66
Fix build on windows
JesseMckinzie Sep 21, 2023
64870fc
Link arrow to GTests
JesseMckinzie Sep 21, 2023
b80dc70
Update get_arrow_file methods
JesseMckinzie Sep 21, 2023
8dead40
Update arrow writers
JesseMckinzie Sep 28, 2023
5fcbfad
Update write method for arrow output
JesseMckinzie Sep 28, 2023
9a6bd2e
Add arrow writers to gtests
JesseMckinzie Sep 28, 2023
bbb808a
Update arrow writers
JesseMckinzie Sep 28, 2023
40d972c
Update arrow writers
JesseMckinzie Sep 28, 2023
6c6bd97
Update test_nyxus.py
JesseMckinzie Sep 28, 2023
28e6370
Initial writer to be null
JesseMckinzie Sep 29, 2023
870dbfb
Update output_2_buffer.cpp
JesseMckinzie Sep 29, 2023
355df10
Remove old unneeded headers
JesseMckinzie Sep 29, 2023
f882c60
Update error handling for arrow output
JesseMckinzie Sep 29, 2023
4f3547b
Update main_nyxus.cpp
JesseMckinzie Sep 29, 2023
2d17221
Update output_writers.cpp
JesseMckinzie Sep 29, 2023
012921e
Update output_writers.h
JesseMckinzie Sep 29, 2023
16db4c1
Update arrow error handling for processing montage
JesseMckinzie Sep 29, 2023
6ae187a
Remove previous implementation of Arrow writers
JesseMckinzie Oct 2, 2023
19434dc
Remove old method for alling get_feature_values
JesseMckinzie Oct 2, 2023
4d100df
Update Arrow writer error handling
JesseMckinzie Oct 4, 2023
d67b72c
Add arrow_table field to ArrowOutputStream
JesseMckinzie Oct 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake-modules")

#==== Source files
set(SOURCE
src/nyx/arrow_output_stream.cpp
src/nyx/features/basic_morphology.cpp
src/nyx/features/caliper_feret.cpp
src/nyx/features/caliper_martin.cpp
Expand Down Expand Up @@ -149,6 +150,7 @@ set(SOURCE
src/nyx/image_loader.cpp
src/nyx/output_2_buffer.cpp
src/nyx/output_2_csv.cpp
src/nyx/output_writers.cpp
src/nyx/parallel.cpp
src/nyx/phase1.cpp
src/nyx/phase2.cpp
Expand Down
117 changes: 0 additions & 117 deletions src/nyx/arrow_output.h

This file was deleted.

49 changes: 49 additions & 0 deletions src/nyx/arrow_output_stream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#ifdef USE_ARROW

#include "arrow_output_stream.h"

std::shared_ptr<ApacheArrowWriter> ArrowOutputStream::create_arrow_file(const std::string& arrow_file_type,
const std::string& arrow_file_path,
const std::vector<std::string>& header) {

std::string arrow_file_type_upper = Nyxus::toupper(arrow_file_type);

if(arrow_file_path != "" && !fs::is_directory(arrow_file_path) && !(Nyxus::ends_with_substr(arrow_file_path, ".arrow") || Nyxus::ends_with_substr(arrow_file_path, ".feather") || Nyxus::ends_with_substr(arrow_file_path, ".parquet"))) {
throw std::invalid_argument("The arrow file path must end in \".arrow\"");
}

if (!(arrow_file_type_upper == "ARROW" || arrow_file_type_upper == "ARROWIPC" || arrow_file_type_upper == "PARQUET")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How a user can specify via arrow_file_type_upper that the desired format is ".feather" ?

throw std::invalid_argument("The valid file types are ARROW, ARROWIPC, or PARQUET");
}

std::string extension = (arrow_file_type_upper == "PARQUET") ? ".parquet" : ".arrow";

if (arrow_file_path == "") {
arrow_file_path_ = "NyxusFeatures" + extension;
} else {
arrow_file_path_ = arrow_file_path;
}

if (fs::is_directory(arrow_file_path)) {
arrow_file_path_ += "/NyxusFeatures" + extension;
}

writer_ = WriterFactory::create_writer(arrow_file_path_, header);

return writer_;
}


std::shared_ptr<arrow::Table> ArrowOutputStream::get_arrow_table(const std::string& file_path, arrow::Status& table_status) {

if (this->arrow_table_ != nullptr) return this->arrow_table_;

this->arrow_table_ = writer_->get_arrow_table(file_path, table_status);

return this->arrow_table_;
}

std::string ArrowOutputStream::get_arrow_path() {
return arrow_file_path_;
}
#endif
45 changes: 45 additions & 0 deletions src/nyx/arrow_output_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#pragma once

#ifdef USE_ARROW

#include <string>
#include <memory>

#include "output_writers.h"
#include "helpers/helpers.h"

#include <arrow/table.h>

#if __has_include(<filesystem>)
#include <filesystem>
namespace fs = std::filesystem;
#elif __has_include(<experimental/filesystem>)
#include <experimental/filesystem>
namespace fs = std::experimental::filesystem;
#else
error "Missing the <filesystem> header."
#endif

/**
* @brief Class to write to Apache Arrow formats
*
* This class provides methods for writing to the Arrow IPC and Parquet formats.
*
*/
class ArrowOutputStream {

private:

std::string arrow_file_path_ = "";
std::shared_ptr<ApacheArrowWriter> writer_ = nullptr;
std::string arrow_output_type_ = "";
std::shared_ptr<arrow::Table> arrow_table_ = nullptr;

public:
std::shared_ptr<ApacheArrowWriter> create_arrow_file(const std::string& arrow_file_type,
const std::string& arrow_file_path,
const std::vector<std::string>& header);
std::shared_ptr<arrow::Table> get_arrow_table(const std::string& file_path, arrow::Status& table_status);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function call should change in a subsequent PR not to pass a ref to retrieve an error/status code. Instead the caller code will check for nullptr on the return value.

std::string get_arrow_path();
};
#endif
5 changes: 3 additions & 2 deletions src/nyx/environment.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

#ifdef USE_ARROW
#include "output_writers.h"
#include "arrow_output.h"
#include "arrow_output_stream.h"
#endif

#ifdef USE_GPU
Expand Down Expand Up @@ -118,8 +118,9 @@ class Environment: public BasicEnvironment

#ifdef USE_ARROW

ArrowOutput arrow_output = ArrowOutput();
std::string arrow_output_type = "";
ArrowOutputStream arrow_stream;
std::shared_ptr<ApacheArrowWriter> arrow_writer = nullptr;

#endif

Expand Down
11 changes: 8 additions & 3 deletions src/nyx/globals.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace Nyxus

bool scanFilePairParallel(const std::string& intens_fpath, const std::string& label_fpath, int num_fastloader_threads, int num_sensemaker_threads, int filepair_index, int tot_num_filepairs);
std::string getPureFname(const std::string& fpath);
int processDataset(const std::vector<std::string>& intensFiles, const std::vector<std::string>& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, bool save2csv, const std::string& csvOutputDir);
int processDataset(const std::vector<std::string>& intensFiles, const std::vector<std::string>& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, bool save2csv, bool arrow_output, const std::string& csvOutputDir);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't having overloaded function processDataset() for the separate cases of CSV and Apache output types be less confusing than having a single overparametered processDataset() ?

bool gatherRoisMetrics(const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads);
bool processTrivialRois (const std::vector<int>& trivRoiLabels, const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads, size_t memory_limit);
bool processNontrivialRois (const std::vector<int>& nontrivRoiLabels, const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads);
Expand All @@ -46,7 +46,7 @@ namespace Nyxus
bool gatherRoisMetricsInMemory (const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& intens_image, const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& label_image, int start_idx);
bool processIntSegImagePairInMemory (const std::string& intens_fpath, const std::string& label_fpath, int filepair_index, const std::string& intens_name, const std::string& seg_name);
int processMontage(const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& intensFiles, const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& labelFiles, int numReduceThreads, const std::vector<std::string>& intensity_names,
const std::vector<std::string>& seg_names, std::string& error_message);
const std::vector<std::string>& seg_names, std::string& error_message, bool arrow_output=false, const std::string& outputDir="");
bool scanTrivialRois (const std::vector<int>& batch_labels, const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& intens_images, const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& label_images, int start_idx);
bool processTrivialRoisInMemory (const std::vector<int>& trivRoiLabels, const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& intens_fpath, const py::array_t<unsigned int, py::array::c_style | py::array::forcecast>& label_fpath, int start_idx, size_t memory_limit);
#endif
Expand All @@ -55,7 +55,12 @@ namespace Nyxus
std::string get_feature_output_fname(const std::string& intFpath, const std::string& segFpath);
extern const std::vector<std::string> mandatory_output_columns;
bool save_features_2_csv (const std::string & intFpath, const std::string & segFpath, const std::string & outputDir);
bool save_features_2_buffer (ResultsCache& results_cache);
bool save_features_2_buffer (ResultsCache& results_cache);

std::vector<std::tuple<std::vector<std::string>, int, std::vector<double>>> get_feature_values();
std::vector<std::string> get_header(const std::vector<std::tuple<std::string, AvailableFeatures>>& F );



void init_feature_buffers();
void clear_feature_buffers();
Expand Down
31 changes: 10 additions & 21 deletions src/nyx/main_nyxus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "dirs_and_files.h"
#include "environment.h"
#include "globals.h"

#include "arrow_output_stream.h"
#ifdef USE_GPU
bool gpu_initialize(int dev_id);
#endif
Expand Down Expand Up @@ -59,6 +59,13 @@ int main (int argc, char** argv)
auto startTS = getTimeStr();
VERBOSLVL1(std::cout << "\n>>> STARTING >>> " << startTS << "\n";)


bool use_arrow = false;

#ifdef USE_ARROW
use_arrow = theEnvironment.arrow_output_type == "ARROW" || theEnvironment.arrow_output_type == "PARQUET";
#endif

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This piece of logic is OK but is just somewhat not elegant to put such a low-level stuff in main(). Again, if we overload processDataset() as mentioned earlier, we would be able to call it separately with Apache-related and CSV parameters.

// Process the image data
int min_online_roi_size = 0;
errorCode = processDataset (
Expand All @@ -68,7 +75,8 @@ int main (int argc, char** argv)
theEnvironment.n_pixel_scan_threads,
theEnvironment.n_reduce_threads,
min_online_roi_size,
theEnvironment.useCsv, // 'true' to save to csv
use_arrow,
theEnvironment.useCsv,
theEnvironment.output_dir);

// Report feature extraction error, if any
Expand All @@ -90,25 +98,6 @@ int main (int argc, char** argv)
break;
}

// Save features in Apache formats, if enabled
#ifdef USE_ARROW

if (theEnvironment.arrow_output_type == "ARROW" || theEnvironment.arrow_output_type == "ARROWIPC")
theEnvironment.arrow_output.create_arrow_file(theResultsCache.get_headerBuf(),
theResultsCache.get_stringColBuf(),
theResultsCache.get_calcResultBuf(),
theResultsCache.get_num_rows(),
theEnvironment.output_dir);

else
if (theEnvironment.arrow_output_type == "PARQUET")
theEnvironment.arrow_output.create_parquet_file(theResultsCache.get_headerBuf(),
theResultsCache.get_stringColBuf(),
theResultsCache.get_calcResultBuf(),
theResultsCache.get_num_rows(),
theEnvironment.output_dir);
#endif

// Process nested ROIs
if (theEnvironment.nestedOptions.defined())
{
Expand Down
Loading