diff --git a/CMakeLists.txt b/CMakeLists.txt index 5ab35ee0..49d2c014 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -94,6 +94,7 @@ list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake-modules") #==== Source files set(SOURCE + src/nyx/arrow_output_stream.cpp src/nyx/features/basic_morphology.cpp src/nyx/features/caliper_feret.cpp src/nyx/features/caliper_martin.cpp @@ -149,6 +150,7 @@ set(SOURCE src/nyx/image_loader.cpp src/nyx/output_2_buffer.cpp src/nyx/output_2_csv.cpp + src/nyx/output_writers.cpp src/nyx/parallel.cpp src/nyx/phase1.cpp src/nyx/phase2.cpp diff --git a/src/nyx/arrow_output.h b/src/nyx/arrow_output.h deleted file mode 100644 index b234ef89..00000000 --- a/src/nyx/arrow_output.h +++ /dev/null @@ -1,117 +0,0 @@ -#pragma once - -#ifdef USE_ARROW -#include -#include - -#include "output_writers.h" -#include "helpers/helpers.h" - -#include - -#if __has_include() - #include - namespace fs = std::filesystem; -#elif __has_include() - #include - namespace fs = std::experimental::filesystem; -#else - error "Missing the header." -#endif - -/** - * @brief Class to write to Apache Arrow formats - * - * This class provides methods for writing to the Arrow IPC and Parquet formats. - * - */ -class ArrowOutput { - -private: - - std::string arrow_file_path_ = ""; - std::string parquet_file_path_ = ""; - std::shared_ptr writer_ = nullptr; - std::string arrow_output_type_ = ""; - - -public: - void create_arrow_file( const std::vector& header, - const std::vector& string_columns, - const std::vector& results, - size_t num_rows, - const std::string& arrow_file_path="NyxusFeatures.arrow") { - - - if(arrow_file_path != "" && !fs::is_directory(arrow_file_path) && !Nyxus::ends_with_substr(arrow_file_path, ".arrow")) { - throw std::invalid_argument("The arrow file path must end in \".arrow\""); - } - - if (arrow_file_path == "") { - arrow_file_path_="NyxusFeatures.arrow"; - } else { - arrow_file_path_ = arrow_file_path; - } - - if (fs::is_directory(arrow_file_path)) { - arrow_file_path_ += "/NyxusFeatures.arrow"; - } - - writer_ = WriterFactory::create_writer(arrow_file_path_); - - writer_->write(header, string_columns, results, num_rows); - - - - } - - void create_parquet_file(const std::vector& header, - const std::vector& string_columns, - const std::vector& results, - size_t num_rows, - const std::string& parquet_file_path="NyxusFeatures.parquet") { - - if(parquet_file_path != "" && !fs::is_directory(parquet_file_path) && !Nyxus::ends_with_substr(parquet_file_path, ".parquet")) { - throw std::invalid_argument("The parquet file path must end in \".parquet\""); - } - - if (parquet_file_path == "") { - parquet_file_path_="NyxusFeatures.parquet"; - } else { - parquet_file_path_ = parquet_file_path; - } - - if (fs::is_directory(parquet_file_path)) { - parquet_file_path_ += "/NyxusFeatures.parquet"; - } - - writer_ = WriterFactory::create_writer(parquet_file_path_); - - writer_->write(header, string_columns, results, num_rows); - } - - std::string get_arrow_file() {return arrow_file_path_;} - - std::string get_parquet_file() { return parquet_file_path_ ;} - - std::shared_ptr get_arrow_table(const std::vector& header, - const std::vector& string_columns, - const std::vector& results, - size_t num_rows) { - - if (writer_ == nullptr) { - writer_ = WriterFactory::create_writer("out.arrow"); - - writer_->generate_arrow_table(header, string_columns, results, num_rows); - - return writer_->get_arrow_table(); - } - - auto table = writer_->get_arrow_table(); - - return table; - - } - -}; -#endif \ No newline at end of file diff --git a/src/nyx/arrow_output_stream.cpp b/src/nyx/arrow_output_stream.cpp new file mode 100644 index 00000000..9cc522fd --- /dev/null +++ b/src/nyx/arrow_output_stream.cpp @@ -0,0 +1,49 @@ +#ifdef USE_ARROW + +#include "arrow_output_stream.h" + +std::shared_ptr ArrowOutputStream::create_arrow_file(const std::string& arrow_file_type, + const std::string& arrow_file_path, + const std::vector& header) { + + std::string arrow_file_type_upper = Nyxus::toupper(arrow_file_type); + + if(arrow_file_path != "" && !fs::is_directory(arrow_file_path) && !(Nyxus::ends_with_substr(arrow_file_path, ".arrow") || Nyxus::ends_with_substr(arrow_file_path, ".feather") || Nyxus::ends_with_substr(arrow_file_path, ".parquet"))) { + throw std::invalid_argument("The arrow file path must end in \".arrow\""); + } + + if (!(arrow_file_type_upper == "ARROW" || arrow_file_type_upper == "ARROWIPC" || arrow_file_type_upper == "PARQUET")) { + throw std::invalid_argument("The valid file types are ARROW, ARROWIPC, or PARQUET"); + } + + std::string extension = (arrow_file_type_upper == "PARQUET") ? ".parquet" : ".arrow"; + + if (arrow_file_path == "") { + arrow_file_path_ = "NyxusFeatures" + extension; + } else { + arrow_file_path_ = arrow_file_path; + } + + if (fs::is_directory(arrow_file_path)) { + arrow_file_path_ += "/NyxusFeatures" + extension; + } + + writer_ = WriterFactory::create_writer(arrow_file_path_, header); + + return writer_; +} + + +std::shared_ptr ArrowOutputStream::get_arrow_table(const std::string& file_path, arrow::Status& table_status) { + + if (this->arrow_table_ != nullptr) return this->arrow_table_; + + this->arrow_table_ = writer_->get_arrow_table(file_path, table_status); + + return this->arrow_table_; +} + +std::string ArrowOutputStream::get_arrow_path() { + return arrow_file_path_; +} +#endif \ No newline at end of file diff --git a/src/nyx/arrow_output_stream.h b/src/nyx/arrow_output_stream.h new file mode 100644 index 00000000..f99042a4 --- /dev/null +++ b/src/nyx/arrow_output_stream.h @@ -0,0 +1,45 @@ +#pragma once + +#ifdef USE_ARROW + +#include +#include + +#include "output_writers.h" +#include "helpers/helpers.h" + +#include + +#if __has_include() + #include + namespace fs = std::filesystem; +#elif __has_include() + #include + namespace fs = std::experimental::filesystem; +#else + error "Missing the header." +#endif + +/** + * @brief Class to write to Apache Arrow formats + * + * This class provides methods for writing to the Arrow IPC and Parquet formats. + * + */ +class ArrowOutputStream { + +private: + + std::string arrow_file_path_ = ""; + std::shared_ptr writer_ = nullptr; + std::string arrow_output_type_ = ""; + std::shared_ptr arrow_table_ = nullptr; + +public: + std::shared_ptr create_arrow_file(const std::string& arrow_file_type, + const std::string& arrow_file_path, + const std::vector& header); + std::shared_ptr get_arrow_table(const std::string& file_path, arrow::Status& table_status); + std::string get_arrow_path(); +}; +#endif \ No newline at end of file diff --git a/src/nyx/environment.h b/src/nyx/environment.h index 6e50f01a..d62acc0a 100644 --- a/src/nyx/environment.h +++ b/src/nyx/environment.h @@ -10,7 +10,7 @@ #ifdef USE_ARROW #include "output_writers.h" - #include "arrow_output.h" + #include "arrow_output_stream.h" #endif #ifdef USE_GPU @@ -118,8 +118,9 @@ class Environment: public BasicEnvironment #ifdef USE_ARROW - ArrowOutput arrow_output = ArrowOutput(); std::string arrow_output_type = ""; + ArrowOutputStream arrow_stream; + std::shared_ptr arrow_writer = nullptr; #endif diff --git a/src/nyx/globals.h b/src/nyx/globals.h index e6dea0a3..777c3716 100644 --- a/src/nyx/globals.h +++ b/src/nyx/globals.h @@ -32,7 +32,7 @@ namespace Nyxus bool scanFilePairParallel(const std::string& intens_fpath, const std::string& label_fpath, int num_fastloader_threads, int num_sensemaker_threads, int filepair_index, int tot_num_filepairs); std::string getPureFname(const std::string& fpath); - int processDataset(const std::vector& intensFiles, const std::vector& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, bool save2csv, const std::string& csvOutputDir); + int processDataset(const std::vector& intensFiles, const std::vector& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, bool save2csv, bool arrow_output, const std::string& csvOutputDir); bool gatherRoisMetrics(const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads); bool processTrivialRois (const std::vector& trivRoiLabels, const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads, size_t memory_limit); bool processNontrivialRois (const std::vector& nontrivRoiLabels, const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads); @@ -46,7 +46,7 @@ namespace Nyxus bool gatherRoisMetricsInMemory (const py::array_t& intens_image, const py::array_t& label_image, int start_idx); bool processIntSegImagePairInMemory (const std::string& intens_fpath, const std::string& label_fpath, int filepair_index, const std::string& intens_name, const std::string& seg_name); int processMontage(const py::array_t& intensFiles, const py::array_t& labelFiles, int numReduceThreads, const std::vector& intensity_names, - const std::vector& seg_names, std::string& error_message); + const std::vector& seg_names, std::string& error_message, bool arrow_output=false, const std::string& outputDir=""); bool scanTrivialRois (const std::vector& batch_labels, const py::array_t& intens_images, const py::array_t& label_images, int start_idx); bool processTrivialRoisInMemory (const std::vector& trivRoiLabels, const py::array_t& intens_fpath, const py::array_t& label_fpath, int start_idx, size_t memory_limit); #endif @@ -55,7 +55,12 @@ namespace Nyxus std::string get_feature_output_fname(const std::string& intFpath, const std::string& segFpath); extern const std::vector mandatory_output_columns; bool save_features_2_csv (const std::string & intFpath, const std::string & segFpath, const std::string & outputDir); - bool save_features_2_buffer (ResultsCache& results_cache); + bool save_features_2_buffer (ResultsCache& results_cache); + + std::vector, int, std::vector>> get_feature_values(); + std::vector get_header(const std::vector>& F ); + + void init_feature_buffers(); void clear_feature_buffers(); diff --git a/src/nyx/main_nyxus.cpp b/src/nyx/main_nyxus.cpp index 0a12af7c..ff7be233 100644 --- a/src/nyx/main_nyxus.cpp +++ b/src/nyx/main_nyxus.cpp @@ -3,7 +3,7 @@ #include "dirs_and_files.h" #include "environment.h" #include "globals.h" - +#include "arrow_output_stream.h" #ifdef USE_GPU bool gpu_initialize(int dev_id); #endif @@ -59,6 +59,13 @@ int main (int argc, char** argv) auto startTS = getTimeStr(); VERBOSLVL1(std::cout << "\n>>> STARTING >>> " << startTS << "\n";) + + bool use_arrow = false; + +#ifdef USE_ARROW + use_arrow = theEnvironment.arrow_output_type == "ARROW" || theEnvironment.arrow_output_type == "PARQUET"; +#endif + // Process the image data int min_online_roi_size = 0; errorCode = processDataset ( @@ -68,7 +75,8 @@ int main (int argc, char** argv) theEnvironment.n_pixel_scan_threads, theEnvironment.n_reduce_threads, min_online_roi_size, - theEnvironment.useCsv, // 'true' to save to csv + use_arrow, + theEnvironment.useCsv, theEnvironment.output_dir); // Report feature extraction error, if any @@ -90,25 +98,6 @@ int main (int argc, char** argv) break; } - // Save features in Apache formats, if enabled - #ifdef USE_ARROW - - if (theEnvironment.arrow_output_type == "ARROW" || theEnvironment.arrow_output_type == "ARROWIPC") - theEnvironment.arrow_output.create_arrow_file(theResultsCache.get_headerBuf(), - theResultsCache.get_stringColBuf(), - theResultsCache.get_calcResultBuf(), - theResultsCache.get_num_rows(), - theEnvironment.output_dir); - - else - if (theEnvironment.arrow_output_type == "PARQUET") - theEnvironment.arrow_output.create_parquet_file(theResultsCache.get_headerBuf(), - theResultsCache.get_stringColBuf(), - theResultsCache.get_calcResultBuf(), - theResultsCache.get_num_rows(), - theEnvironment.output_dir); - #endif - // Process nested ROIs if (theEnvironment.nestedOptions.defined()) { diff --git a/src/nyx/output_2_csv.cpp b/src/nyx/output_2_csv.cpp index 5c33c7c8..a920dc72 100644 --- a/src/nyx/output_2_csv.cpp +++ b/src/nyx/output_2_csv.cpp @@ -55,6 +55,120 @@ namespace Nyxus return x; } + std::vector get_header(const std::vector>& F ) { + std::stringstream ssHead; + + std::vector head; + + // Mandatory column names + for (const auto& s : mandatory_output_columns) + { + head.emplace_back(s); + } + + // Optional columns + for (auto& enabdF : F) + { + auto fn = std::get<0>(enabdF); // feature name + auto fc = std::get<1>(enabdF); // feature code + + // Handle missing feature name (which is a significant issue!) in order to at least be able to trace back to the feature code + if (fn.empty()) + { + std::stringstream temp; + temp << "feature" << fc; + fn = temp.str(); + } + + // Parameterized feature + // --GLCM family + bool angledGlcmFeature = std::find (GLCMFeature::featureset.begin(), GLCMFeature::featureset.end(), fc) != GLCMFeature::featureset.end(); + if (angledGlcmFeature) + { + // Populate with angles + for (auto ang : theEnvironment.glcmAngles) + { + // CSV separator + //if (ang != theEnvironment.rotAngles[0]) + // ssHead << ","; + head.emplace_back(fn + "_" + std::to_string(ang)); + } + // Proceed with other features + continue; + } + + // --GLRLM family + bool glrlmFeature = std::find (GLRLMFeature::featureset.begin(), GLRLMFeature::featureset.end(), fc) != GLRLMFeature::featureset.end(); + if (glrlmFeature) + { + // Populate with angles + for (auto ang : GLRLMFeature::rotAngles) + { + head.emplace_back(fn + "_" + std::to_string(ang)); + } + // Proceed with other features + continue; + } + + // --Gabor + if (fc == GABOR) + { + // Generate the feature value list + for (auto i = 0; i < GaborFeature::f0_theta_pairs.size(); i++) + head.emplace_back(fn + "_" + std::to_string(i)); + + // Proceed with other features + continue; + } + + if (fc == FRAC_AT_D) + { + // Generate the feature value list + for (auto i = 0; i < RadialDistributionFeature::num_features_FracAtD; i++) + head.emplace_back(fn + "_" + std::to_string(i)); + + // Proceed with other features + continue; + } + + if (fc == MEAN_FRAC) + { + // Generate the feature value list + for (auto i = 0; i < RadialDistributionFeature::num_features_MeanFrac; i++) + head.emplace_back(fn + "_" + std::to_string(i)); + + // Proceed with other features + continue; + } + + if (fc == RADIAL_CV) + { + // Generate the feature value list + for (auto i = 0; i < RadialDistributionFeature::num_features_RadialCV; i++) + head.emplace_back(fn + "_" + std::to_string(i)); + + // Proceed with other features + continue; + } + + // --Zernike features header + if (fc == ZERNIKE2D) + { + // Populate with indices + for (int i = 0; i < ZernikeFeature::num_feature_values_calculated; i++) // i < ZernikeFeature::num_feature_values_calculated + head.emplace_back(fn + "_Z" + std::to_string(i)); + + // Proceed with other features + continue; + } + + // Regular feature + head.emplace_back(fn); + } + + return head; + } + std::string get_feature_output_fname (const std::string& intFpath, const std::string& segFpath) { std::string retval; @@ -118,115 +232,17 @@ namespace Nyxus { std::stringstream ssHead; - // Mandatory column names - for (const auto& s : mandatory_output_columns) - { - ssHead << s; - if (s != mandatory_output_columns.back()) - ssHead << ","; - } - - // Optional columns - for (auto& enabdF : F) - { - auto fn = std::get<0>(enabdF); // feature name - auto fc = std::get<1>(enabdF); // feature code - - // Handle missing feature name (which is a significant issue!) in order to at least be able to trace back to the feature code - if (fn.empty()) - { - std::stringstream temp; - temp << "feature" << fc; - fn = temp.str(); - } - - // Parameterized feature - // --GLCM family - bool angledGlcmFeature = std::find (GLCMFeature::featureset.begin(), GLCMFeature::featureset.end(), fc) != GLCMFeature::featureset.end(); - if (angledGlcmFeature) - { - // Populate with angles - for (auto ang : theEnvironment.glcmAngles) - { - // CSV separator - //if (ang != theEnvironment.rotAngles[0]) - // ssHead << ","; - ssHead << "," << fn << "_" << ang; - } - // Proceed with other features - continue; - } - - // --GLRLM family - bool glrlmFeature = std::find (GLRLMFeature::featureset.begin(), GLRLMFeature::featureset.end(), fc) != GLRLMFeature::featureset.end(); - if (glrlmFeature) - { - // Populate with angles - for (auto ang : GLRLMFeature::rotAngles) - { - ssHead << "," << fn << "_" << ang; - } - // Proceed with other features - continue; - } - - // --Gabor - if (fc == GABOR) - { - // Generate the feature value list - for (auto i = 0; i < GaborFeature::f0_theta_pairs.size(); i++) - ssHead << "," << fn << "_" << i; - - // Proceed with other features - continue; - } - - if (fc == FRAC_AT_D) - { - // Generate the feature value list - for (auto i = 0; i < RadialDistributionFeature::num_features_FracAtD; i++) - ssHead << "," << fn << "_" << i; - - // Proceed with other features - continue; - } - - if (fc == MEAN_FRAC) - { - // Generate the feature value list - for (auto i = 0; i < RadialDistributionFeature::num_features_MeanFrac; i++) - ssHead << "," << fn << "_" << i; - - // Proceed with other features - continue; - } - - if (fc == RADIAL_CV) - { - // Generate the feature value list - for (auto i = 0; i < RadialDistributionFeature::num_features_RadialCV; i++) - ssHead << "," << fn << "_" << i; - - // Proceed with other features - continue; - } + auto head_vector = Nyxus::get_header(F); - // --Zernike features header - if (fc == ZERNIKE2D) - { - // Populate with indices - for (int i = 0; i < ZernikeFeature::num_feature_values_calculated; i++) // i < ZernikeFeature::num_feature_values_calculated - ssHead << "," << fn << "_Z" << i; + for(const auto& column: head_vector){ + ssHead << column << ", "; + } - // Proceed with other features - continue; - } + auto head_string = ssHead.str(); - // Regular feature - ssHead << "," << fn; - } + head_string.pop_back(); // remove trailing comma - fprintf(fp, "%s\n", ssHead.str().c_str()); + fprintf(fp, "%s\n", head_string.c_str()); // Prevent rendering the header again for another image's portion of labels if (theEnvironment.separateCsv == false) @@ -432,6 +448,133 @@ namespace Nyxus return true; } + std::vector, int, std::vector>> get_feature_values() { + + std::vector, int, std::vector>> features; + + // Sort the labels + std::vector L{ uniqueLabels.begin(), uniqueLabels.end() }; + std::sort(L.begin(), L.end()); + + // Learn what features need to be displayed + std::vector> F = theFeatureSet.getEnabledFeatures(); + + // -- Values + for (auto l : L) + { + LR& r = roiData[l]; + + std::vector feature_values; + + // Skip blacklisted ROI + if (r.blacklisted) + continue; + + // Tear off pure file names from segment and intensity file paths + fs::path pseg(r.segFname), pint(r.intFname); + std::vector filenames; + filenames.push_back(pseg.filename().u8string()); + filenames.push_back(pint.filename().u8string()); + + for (auto& enabdF : F) + { + auto fc = std::get<1>(enabdF); + auto vv = r.get_fvals(std::get<1>(enabdF)); + + // Parameterized feature + // --GLCM family + bool angledGlcmFeature = std::find (GLCMFeature::featureset.begin(), GLCMFeature::featureset.end(), fc) != GLCMFeature::featureset.end(); + if (angledGlcmFeature) + { + // Mock angled values if they haven't been calculated for some error reason + if (vv.size() < GLCMFeature::angles.size()) + vv.resize(GLCMFeature::angles.size(), 0.0); + // Output the sub-values + int nAng = GLCMFeature::angles.size(); + for (int i=0; i < nAng; i++) + { + feature_values.push_back(vv[i]); + } + // Proceed with other features + continue; + } + + // --GLRLM family + bool glrlmFeature = std::find (GLRLMFeature::featureset.begin(), GLRLMFeature::featureset.end(), fc) != GLRLMFeature::featureset.end(); + if (glrlmFeature) + { + // Polulate with angles + int nAng = 4; + for (int i=0; i < nAng; i++) + { + feature_values.push_back(vv[i]); + } + // Proceed with other features + continue; + } + + // --Gabor + if (fc == GABOR) + { + for (auto i = 0; i < GaborFeature::f0_theta_pairs.size(); i++) + { + feature_values.push_back(vv[i]); + } + + // Proceed with other features + continue; + } + + // --Zernike feature values + if (fc == ZERNIKE2D) + { + for (int i = 0; i < ZernikeFeature::num_feature_values_calculated; i++) + { + feature_values.push_back(vv[i]); + } + + // Proceed with other features + continue; + } + + // --Radial distribution features + if (fc == FRAC_AT_D) + { + for (auto i = 0; i < RadialDistributionFeature::num_features_FracAtD; i++) + { + feature_values.push_back(vv[i]); + } + // Proceed with other features + continue; + } + if (fc == MEAN_FRAC) + { + for (auto i = 0; i < RadialDistributionFeature::num_features_MeanFrac; i++) + { + feature_values.push_back(vv[i]); + } + // Proceed with other features + continue; + } + if (fc == RADIAL_CV) + { + for (auto i = 0; i < RadialDistributionFeature::num_features_RadialCV; i++) + { + feature_values.push_back(vv[i]); + } + // Proceed with other features + continue; + } + + feature_values.push_back(vv[0]); + } + + features.push_back(std::make_tuple(filenames, l, feature_values)); + } + + return features; + } + // Diagnostic function void print_by_label(const char* featureName, std::unordered_map L, int numColumns) { diff --git a/src/nyx/output_writers.cpp b/src/nyx/output_writers.cpp new file mode 100644 index 00000000..d8b1bb40 --- /dev/null +++ b/src/nyx/output_writers.cpp @@ -0,0 +1,416 @@ +#ifdef USE_ARROW +#include "output_writers.h" + +std::shared_ptr ApacheArrowWriter::get_arrow_table(const std::string& file_path, arrow::Status& table_status) { + + if (table_ != nullptr) return table_; + + auto file_extension = fs::path(file_path).extension().u8string(); + + if (file_extension == ".parquet") { + arrow::MemoryPool* pool = arrow::default_memory_pool(); + + + std::shared_ptr input; + + input = arrow::io::ReadableFile::Open(file_path).ValueOrDie(); + + std::unique_ptr arrow_reader; + + auto status = parquet::arrow::OpenFile(input, pool, &arrow_reader); + + if (!status.ok()) { + // Handle read error + table_status = status; + return nullptr; + } + + // Read entire file as a single Arrow table + std::shared_ptr table; + + status = arrow_reader->ReadTable(&table); + + if (!status.ok()) { + // Handle read error + table_status = status; + return nullptr; + } + + return table; + + } else if (file_extension == ".arrow") { + + // Create a memory-mapped file for reading. + std::shared_ptr input; + input = arrow::io::ReadableFile::Open(file_path).ValueOrDie(); + + // Create an IPC reader. + auto ipc_reader = (arrow::ipc::RecordBatchStreamReader::Open(input.get())).ValueOrDie(); + + this->table_ = ipc_reader->ToTable().ValueOrDie(); + + return table_; + + } else { + throw std::invalid_argument("Error: file must either be an Arrow or Parquet file."); + } + +} + +arrow::Status ParquetWriter::setup(const std::vector &header) { + + + std::vector> fields; + + + fields.push_back(arrow::field(header[0], arrow::utf8())); + fields.push_back(arrow::field(header[1], arrow::utf8())); + fields.push_back(arrow::field(header[2], arrow::int64())); + + for (int i = 3; i < header.size(); ++i) + { + fields.push_back(arrow::field(header[i], arrow::float64())); + } + + schema_ = arrow::schema(fields); + + PARQUET_ASSIGN_OR_THROW( + output_stream_, arrow::io::FileOutputStream::Open(output_file_) + ); + + // Choose compression + std::shared_ptr props = + parquet::WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build(); + + // Opt to store Arrow schema for easier reads back into Arrow + std::shared_ptr arrow_props = + parquet::ArrowWriterProperties::Builder().store_schema()->build(); + + ARROW_ASSIGN_OR_RAISE( + writer_, parquet::arrow::FileWriter::Open(*schema_, + arrow::default_memory_pool(), output_stream_, + props, arrow_props)); + + return arrow::Status::OK(); +} + + + +ParquetWriter::ParquetWriter(const std::string& output_file, const std::vector& header) : output_file_(output_file) { + + auto status = this->setup(header); + + if (!status.ok()) { + // Handle read error + std::cout << "Error writing setting up Arrow writer: " << status.ToString() << std::endl; + } +} + + +arrow::Status ParquetWriter::write (const std::vector, int, std::vector>>& features) { + + int num_rows = features.size(); + + std::vector> arrays; + + arrow::StringBuilder string_builder; + std::shared_ptr intensity_array; + + + arrow::Status append_status; + // construct intensity column + for (int i = 0; i < num_rows; ++i) { + append_status = string_builder.Append(std::get<0>(features[i])[0]); + + if (!append_status.ok()) { + // Handle read error + return append_status; + } + } + + append_status = string_builder.Finish(&intensity_array); + if (!append_status.ok()) { + // Handle read error + return append_status; + } + + + arrays.push_back(intensity_array); + string_builder.Reset(); + + std::shared_ptr segmentation_array; + + // construct intensity column + for (int i = 0; i < num_rows; ++i) { + append_status = string_builder.Append(std::get<0>(features[i])[1]); + + if (!append_status.ok()) { + // Handle read error + return append_status; + } + } + + append_status = string_builder.Finish(&segmentation_array); + + if (!append_status.ok()) { + // Handle read error + return append_status; + } + + arrays.push_back(segmentation_array); + + arrow::Int64Builder int_builder; + std::shared_ptr labels_array; + // construct label column + for (int i = 0; i < num_rows; ++i) { + append_status = int_builder.Append(std::get<1>(features[i])); + if (!append_status.ok()) { + // Handle read error + return append_status; + } + } + + append_status = int_builder.Finish(&labels_array); + if (!append_status.ok()) { + // Handle read error + return append_status; + } + arrays.push_back(labels_array); + + // construct columns for each feature + for (int j = 0; j < std::get<2>(features[0]).size(); ++j) { + + arrow::DoubleBuilder builder; + std::shared_ptr double_array; + + for (int i = 0; i < num_rows; ++i) { + append_status = builder.Append(std::get<2>(features[i])[j]); + + if (!append_status.ok()) { + // Handle read error + return append_status; + } + } + + append_status = builder.Finish(&double_array); + + if (!append_status.ok()) { + // Handle read error + return append_status; + } + + arrays.push_back(double_array); + } + + std::shared_ptr batch = arrow::RecordBatch::Make(schema_, num_rows, arrays); + + ARROW_ASSIGN_OR_RAISE(auto table, + arrow::Table::FromRecordBatches(schema_, {batch})); + + + std::cout << table->ToString() << std::endl; + + ARROW_RETURN_NOT_OK(writer_->WriteTable(*table.get(), batch->num_rows())); + + + + return arrow::Status::OK(); +} + +arrow::Status ParquetWriter::close () { + arrow::Status status = writer_->Close(); + + if (!status.ok()) { + // Handle read error + return status; + } + return arrow::Status::OK(); + + + return arrow::Status::OK(); +} + +arrow::Status ArrowIPCWriter::setup(const std::vector &header) { + + std::vector> fields; + + + fields.push_back(arrow::field("intensity_image", arrow::utf8())); + fields.push_back(arrow::field("segmentation_image", arrow::utf8())); + fields.push_back(arrow::field("ROI_label", arrow::int64())); + + for (int i = 3; i < header.size(); ++i) + { + fields.push_back(arrow::field(header[i], arrow::float64())); + } + + schema_ = arrow::schema(fields); + + ARROW_ASSIGN_OR_RAISE( + output_stream_, arrow::io::FileOutputStream::Open(output_file_) + ); + + writer_ = arrow::ipc::MakeFileWriter(output_stream_, schema_); + + return arrow::Status::OK(); +} + + + +ArrowIPCWriter::ArrowIPCWriter(const std::string& output_file, const std::vector &header) : output_file_(output_file) { + + auto status = this->setup(header); + +} + + +arrow::Status ArrowIPCWriter::write (const std::vector, int, std::vector>>& features) { + + + int num_rows = features.size(); + + std::vector> arrays; + + arrow::StringBuilder string_builder; + std::shared_ptr intensity_array; + + + arrow::Status append_status; + // construct intensity column + for (int i = 0; i < num_rows; ++i) { + append_status = string_builder.Append(std::get<0>(features[i])[0]); + + if (!append_status.ok()) { + // Handle read error + return append_status; + } + } + + append_status = string_builder.Finish(&intensity_array); + if (!append_status.ok()) { + // Handle read error + return append_status; + } + + + arrays.push_back(intensity_array); + string_builder.Reset(); + + std::shared_ptr segmentation_array; + + // construct intensity column + for (int i = 0; i < num_rows; ++i) { + append_status = string_builder.Append(std::get<0>(features[i])[1]); + + if (!append_status.ok()) { + // Handle read error + return append_status; + } + } + + append_status = string_builder.Finish(&segmentation_array); + + if (!append_status.ok()) { + // Handle read error + return append_status; + } + + arrays.push_back(segmentation_array); + + arrow::Int32Builder int_builder; + std::shared_ptr labels_array; + // construct label column + for (int i = 0; i < num_rows; ++i) { + append_status = int_builder.Append(std::get<1>(features[i])); + if (!append_status.ok()) { + // Handle read error + return append_status; + } + } + + append_status = int_builder.Finish(&labels_array); + if (!append_status.ok()) { + // Handle read error + return append_status; + } + arrays.push_back(labels_array); + + // construct columns for each feature + for (int j = 0; j < std::get<2>(features[0]).size(); ++j) { + + arrow::DoubleBuilder builder; + std::shared_ptr double_array; + + for (int i = 0; i < num_rows; ++i) { + append_status = builder.Append(std::get<2>(features[i])[j]); + + if (!append_status.ok()) { + // Handle read error + return append_status; + } + } + + append_status = builder.Finish(&double_array); + + if (!append_status.ok()) { + // Handle read error + return append_status; + } + + arrays.push_back(double_array); + } + + std::shared_ptr batch = arrow::RecordBatch::Make(schema_, num_rows, arrays); + + auto status = writer_->get()->WriteRecordBatch(*batch); + + if (!status.ok()) { + // Handle read error + return status; + } + + return arrow::Status::OK(); +} + + +arrow::Status ArrowIPCWriter::close () { + + arrow::Status status = writer_->get()->Close(); + + if (!status.ok()) { + // Handle read error + return status; + } + return arrow::Status::OK(); + +} + + +std::shared_ptr WriterFactory::create_writer(const std::string &output_file, const std::vector &header) { + + if (Nyxus::ends_with_substr(output_file, ".parquet")) { + + return std::make_shared(output_file, header); + + } else if (Nyxus::ends_with_substr(output_file, ".arrow") || Nyxus::ends_with_substr(output_file, ".feather")) { + + return std::make_shared(output_file, header); + + } else { + + std::filesystem::path path(output_file); + + if (path.has_extension()) { + std::string file_extension = path.extension().string(); + + throw std::invalid_argument("No writer option for extension \"" + file_extension + "\". Valid options are \".parquet\" or \".arrow\"."); + + } else { + + throw std::invalid_argument("No extension type was provided in the path. "); + + } + } +} + +#endif \ No newline at end of file diff --git a/src/nyx/output_writers.h b/src/nyx/output_writers.h index 24e4c527..d5d4e619 100644 --- a/src/nyx/output_writers.h +++ b/src/nyx/output_writers.h @@ -1,3 +1,4 @@ + #pragma once #ifdef USE_ARROW @@ -8,16 +9,27 @@ #include #include #include +#include + +#include #include #include #include #include #include - +#include #include "helpers/helpers.h" -#include +#if __has_include() + #include + namespace fs = std::filesystem; +#elif __has_include() + #include + namespace fs = std::experimental::filesystem; +#else + error "Missing the header." +#endif /** * @brief Base class for creating Apache Arrow output writers @@ -28,113 +40,37 @@ */ class ApacheArrowWriter { -protected: - std::shared_ptr table_; -public: - - /** - * @brief Get the arrow table object - * - * @return std::shared_ptr - */ - std::shared_ptr get_arrow_table() {return table_;} - - /** - * @brief Generate an Arrow table from Nyxus output - * - * @param header Header data - * @param string_columns String data - * @param numeric_columns Numeric data - * @param number_of_rows Number of rows - * @return std::shared_ptr - */ - std::shared_ptr generate_arrow_table(const std::vector &header, - const std::vector &string_columns, - const std::vector &numeric_columns, - int number_of_rows) - { - std::vector> fields; - - fields.push_back(arrow::field(header[0], arrow::utf8())); - fields.push_back(arrow::field(header[1], arrow::utf8())); - fields.push_back(arrow::field(header[2], arrow::int32())); - - for (int i = 3; i < header.size(); ++i) - { - fields.push_back(arrow::field(header[i], arrow::float64())); - } - - auto schema = arrow::schema(fields); - arrow::StringBuilder string_builder_0; - - std::vector temp_string_vec1(string_columns.size()/2); - std::vector temp_string_vec2(string_columns.size()/2); - - for (int i = 0; i < string_columns.size(); i+=2) { - temp_string_vec1[i/2] = string_columns[i]; - temp_string_vec2[i/2] = string_columns[i+1]; - } - - PARQUET_THROW_NOT_OK(string_builder_0.AppendValues(temp_string_vec1)); - - arrow::StringBuilder string_builder_1; - - PARQUET_THROW_NOT_OK(string_builder_1.AppendValues(temp_string_vec2)); +private: + std::shared_ptr table_ = nullptr; - std::shared_ptr array_0, array_1; - PARQUET_THROW_NOT_OK(string_builder_0.Finish(&array_0)); - PARQUET_THROW_NOT_OK(string_builder_1.Finish(&array_1)); + arrow::Status open(std::shared_ptr input, const std::string& file_path) { + ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open(file_path)); - std::vector> arrays; - - arrays.push_back(array_0); - arrays.push_back(array_1); - - // add labels - arrow::Int32Builder labels_builder; - - std::vector temp_vec; - int num_columns = numeric_columns.size() / number_of_rows; - for (int i = 0; i < numeric_columns.size(); i += num_columns) - { - temp_vec.push_back(numeric_columns[i]); - } - - - PARQUET_THROW_NOT_OK(labels_builder.AppendValues( - temp_vec)); - - std::shared_ptr array_2; - - PARQUET_THROW_NOT_OK(labels_builder.Finish(&array_2)); - arrays.push_back(array_2); - - int idx; - for (int i = 1; i < num_columns; ++i) - { - arrow::DoubleBuilder builder; + return arrow::Status::OK(); + } - std::vector temp; + arrow::Status open_parquet_file(std::shared_ptr input, arrow::MemoryPool* pool, std::unique_ptr arrow_reader) { + ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, pool, &arrow_reader)); - for (int j = 0; j < number_of_rows; ++j) - { - temp.push_back(numeric_columns[i + (j * num_columns)]); - } + return arrow::Status::OK(); + } - PARQUET_THROW_NOT_OK(builder.AppendValues( - temp)); + arrow::Status read_parquet_table(std::unique_ptr arrow_reader, std::shared_ptr table) { + ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table)); - std::shared_ptr temp_array; + return arrow::Status::OK(); + } - PARQUET_THROW_NOT_OK(builder.Finish(&temp_array)); - arrays.push_back(temp_array); - } +public: - //table_ = arrow::Table::Make(schema, arrays); - return arrow::Table::Make(schema, arrays); - } + /** + * @brief Get the arrow table object + * + * @return std::shared_ptr + */ + std::shared_ptr get_arrow_table(const std::string& file_path, arrow::Status& table_status); /** * @brief Write Nyxus data to Arrow file @@ -145,10 +81,9 @@ class ApacheArrowWriter * @param number_of_rows Number of rows * @return arrow::Status */ - virtual arrow::Status write (const std::vector &header, - const std::vector &string_columns, - const std::vector &numeric_columns, - int number_of_rows) = 0; + virtual arrow::Status write (const std::vector, int, std::vector>>& features) = 0; + + virtual arrow::Status close () = 0; }; @@ -162,37 +97,20 @@ class ParquetWriter : public ApacheArrowWriter { private: std::string output_file_; + std::shared_ptr schema_; + std::shared_ptr output_stream_; + std::unique_ptr writer_; - public: - - ParquetWriter(const std::string& output_file) : output_file_(output_file) {} + arrow::Status setup(const std::vector &header); - /** - * @brief Write to Parquet - * - * @param header Header data - * @param string_columns String data (filenames) - * @param numeric_columns Numeric data (feature calculations) - * @param number_of_rows Number of rows - * @return arrow::Status - */ - arrow::Status write (const std::vector &header, - const std::vector &string_columns, - const std::vector &numeric_columns, - int number_of_rows) override { - - table_ = generate_arrow_table(header, string_columns, numeric_columns, number_of_rows); - - std::shared_ptr outfile; + public: - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(output_file_)); + ParquetWriter(const std::string& output_file, const std::vector& header); - PARQUET_THROW_NOT_OK( - parquet::arrow::WriteTable(*table_, arrow::default_memory_pool(), outfile, 3)); + + arrow::Status write (const std::vector, int, std::vector>>& features) override; - return arrow::Status::OK(); - } + arrow::Status close () override; }; /** @@ -205,10 +123,15 @@ class ArrowIPCWriter : public ApacheArrowWriter { private: std::string output_file_; + std::shared_ptr schema_; + std::shared_ptr output_stream_; + arrow::Result> writer_; + + arrow::Status setup(const std::vector &header); public: - ArrowIPCWriter(const std::string& output_file) : output_file_(output_file) {} + ArrowIPCWriter(const std::string& output_file, const std::vector &header); /** * @brief Write to Arrow IPC @@ -219,28 +142,10 @@ class ArrowIPCWriter : public ApacheArrowWriter { * @param number_of_rows Number of rows * @return arrow::Status */ - arrow::Status write (const std::vector &header, - const std::vector &string_columns, - const std::vector &numeric_columns, - int number_of_rows) override { - - table_ = generate_arrow_table(header, string_columns, numeric_columns, number_of_rows); - - // Create the Arrow file writer - std::shared_ptr output_stream; - - ARROW_ASSIGN_OR_RAISE( - output_stream, arrow::io::FileOutputStream::Open(output_file_) - ); + arrow::Status write (const std::vector, int, std::vector>>& features) override; - auto writer = arrow::ipc::MakeFileWriter(output_stream, table_->schema()); - // Write the Arrow table to file - writer->get()->WriteTable(*table_); - writer->get()->Close(); - - return arrow::Status::OK(); - } + arrow::Status close () override; }; /** @@ -257,31 +162,8 @@ class WriterFactory { * @param output_file Path to output file (.arrow or .parquet) * @return std::shared_ptr */ - static std::shared_ptr create_writer(const std::string &output_file) { - - if (Nyxus::ends_with_substr(output_file, ".parquet")) { - - return std::make_shared(output_file); - - } else if (Nyxus::ends_with_substr(output_file, ".arrow") || Nyxus::ends_with_substr(output_file, ".feather")) { - - return std::make_shared(output_file); - - } else { - - std::filesystem::path path(output_file); - - if (path.has_extension()) { - std::string file_extension = path.extension().string(); - - throw std::invalid_argument("No writer option for extension \"" + file_extension + "\". Valid options are \".parquet\" or \".arrow\"."); - - } else { + static std::shared_ptr create_writer(const std::string &output_file, const std::vector &header); +}; - throw std::invalid_argument("No extension type was provided in the path. "); - } - } - } -}; -#endif \ No newline at end of file +#endif diff --git a/src/nyx/python/new_bindings_py.cpp b/src/nyx/python/new_bindings_py.cpp index 14720f5f..3c99add7 100644 --- a/src/nyx/python/new_bindings_py.cpp +++ b/src/nyx/python/new_bindings_py.cpp @@ -16,13 +16,11 @@ #ifdef USE_ARROW #include "../output_writers.h" - #include "../arrow_output.h" + #include "../arrow_output_stream.h" - #include #include #include - #include #include #include @@ -142,7 +140,8 @@ py::tuple featurize_directory_imp ( const std::string &intensity_dir, const std::string &labels_dir, const std::string &file_pattern, - bool pandas_output=true) + bool pandas_output=true, + const std::string &arrow_file_path="") { // Check and cache the file pattern if (! theEnvironment.check_file_pattern(file_pattern)) @@ -174,6 +173,10 @@ py::tuple featurize_directory_imp ( // We're good to extract features. Reset the feature results cache theResultsCache.clear(); + auto arrow_output = !pandas_output; + + theEnvironment.separateCsv = false; + // Process the image sdata int min_online_roi_size = 0; errorCode = processDataset( @@ -183,7 +186,8 @@ py::tuple featurize_directory_imp ( theEnvironment.n_pixel_scan_threads, theEnvironment.n_reduce_threads, min_online_roi_size, - false, // 'true' to save to csv + arrow_output, + false, theEnvironment.output_dir); if (errorCode) @@ -192,16 +196,10 @@ py::tuple featurize_directory_imp ( // Output the result if (pandas_output) { - #ifdef USE_ARROW - // Get by value to preserve buffers for writing to arrow - auto pyHeader = py::array(py::cast(theResultsCache.get_headerBufByVal())); - auto pyStrData = py::array(py::cast(theResultsCache.get_stringColBufByVal())); - auto pyNumData = as_pyarray(std::move(theResultsCache.get_calcResultBufByVal())); - #else // regular dataframe output + auto pyHeader = py::array(py::cast(theResultsCache.get_headerBuf())); auto pyStrData = py::array(py::cast(theResultsCache.get_stringColBuf())); auto pyNumData = as_pyarray(std::move(theResultsCache.get_calcResultBuf())); - #endif // Shape the user-facing dataframe auto nRows = theResultsCache.get_num_rows(); @@ -210,7 +208,6 @@ py::tuple featurize_directory_imp ( return py::make_tuple (pyHeader, pyStrData, pyNumData); } - // To avoid duplication, return a void dataframe on the Python-side when the output is a file in Arrow format return py::make_tuple(); } @@ -219,11 +216,18 @@ py::tuple featurize_montage_imp ( const py::array_t& label_images, const std::vector& intensity_names, const std::vector& label_names, - bool pandas_output=true) + bool pandas_output=true, + const std::string arrow_output_type="", + const std::string output_dir="") { // Set the whole-slide/multi-ROI flag theEnvironment.singleROI = false; +#ifdef USE_ARROW + // Set arrow output type + theEnvironment.arrow_output_type = arrow_output_type; +#endif + auto intens_buffer = intensity_images.request(); auto label_buffer = label_images.request(); @@ -262,33 +266,29 @@ py::tuple featurize_montage_imp ( theEnvironment.n_reduce_threads, intensity_names, label_names, - error_message); + error_message, + !pandas_output, + output_dir); if (errorCode) throw std::runtime_error("Error #" + std::to_string(errorCode) + " " + error_message + " occurred during dataset processing."); if (pandas_output) { - #ifdef USE_ARROW - // Get by value to preserve buffers for writing to arrow - auto pyHeader = py::array(py::cast(theResultsCache.get_headerBufByVal())); - auto pyStrData = py::array(py::cast(theResultsCache.get_stringColBufByVal())); - auto pyNumData = as_pyarray(std::move(theResultsCache.get_calcResultBufByVal())); - #else - auto pyHeader = py::array(py::cast(theResultsCache.get_headerBuf())); - auto pyStrData = py::array(py::cast(theResultsCache.get_stringColBuf())); - auto pyNumData = as_pyarray(std::move(theResultsCache.get_calcResultBuf())); - #endif - auto nRows = theResultsCache.get_num_rows(); - pyStrData = pyStrData.reshape({nRows, pyStrData.size() / nRows}); - pyNumData = pyNumData.reshape({ nRows, pyNumData.size() / nRows }); + auto pyHeader = py::array(py::cast(theResultsCache.get_headerBuf())); + auto pyStrData = py::array(py::cast(theResultsCache.get_stringColBuf())); + auto pyNumData = as_pyarray(std::move(theResultsCache.get_calcResultBuf())); + + auto nRows = theResultsCache.get_num_rows(); + pyStrData = pyStrData.reshape({nRows, pyStrData.size() / nRows}); + pyNumData = pyNumData.reshape({ nRows, pyNumData.size() / nRows }); return py::make_tuple(pyHeader, pyStrData, pyNumData, error_message); } - // Return "nothing" when output will be an Arrow format - return py::make_tuple(error_message); + std::string path = output_dir + "NyxusFeatures."; + return py::make_tuple(error_message, path); } py::tuple featurize_fname_lists_imp (const py::list& int_fnames, const py::list & seg_fnames, bool single_roi, bool pandas_output=true) @@ -343,6 +343,7 @@ py::tuple featurize_fname_lists_imp (const py::list& int_fnames, const py::list theEnvironment.n_pixel_scan_threads, theEnvironment.n_reduce_threads, min_online_roi_size, + !pandas_output, false, // 'true' to save to csv theEnvironment.output_dir); if (errorCode) @@ -350,21 +351,16 @@ py::tuple featurize_fname_lists_imp (const py::list& int_fnames, const py::list if (pandas_output) { - #ifdef USE_ARROW - // Get by value to preserve buffers for writing to arrow - auto pyHeader = py::array(py::cast(theResultsCache.get_headerBufByVal())); - auto pyStrData = py::array(py::cast(theResultsCache.get_stringColBufByVal())); - auto pyNumData = as_pyarray(std::move(theResultsCache.get_calcResultBufByVal())); - #else auto pyHeader = py::array(py::cast(theResultsCache.get_headerBuf())); auto pyStrData = py::array(py::cast(theResultsCache.get_stringColBuf())); auto pyNumData = as_pyarray(std::move(theResultsCache.get_calcResultBuf())); - #endif + auto nRows = theResultsCache.get_num_rows(); pyStrData = pyStrData.reshape({nRows, pyStrData.size() / nRows}); pyNumData = pyNumData.reshape({ nRows, pyNumData.size() / nRows }); return py::make_tuple(pyHeader, pyStrData, pyNumData); + } // Return "nothing" when output will be an Arrow format @@ -389,16 +385,9 @@ py::tuple findrelations_imp( if (! mineOK) throw std::runtime_error("Error occurred during dataset processing: mine_segment_relations() returned false"); -#ifdef USE_ARROW - // Get by value to preserve buffers for writing to arrow - auto pyHeader = py::array(py::cast(theResultsCache.get_headerBufByVal())); - auto pyStrData = py::array(py::cast(theResultsCache.get_stringColBufByVal())); - auto pyNumData = as_pyarray(std::move(theResultsCache.get_calcResultBufByVal())); -#else auto pyHeader = py::array(py::cast(theResultsCache.get_headerBuf())); auto pyStrData = py::array(py::cast(theResultsCache.get_stringColBuf())); auto pyNumData = as_pyarray(std::move(theResultsCache.get_calcResultBuf())); -#endif auto nRows = theResultsCache.get_num_rows(); pyStrData = pyStrData.reshape({ nRows, pyStrData.size() / nRows }); pyNumData = pyNumData.reshape({ nRows, pyNumData.size() / nRows }); @@ -529,28 +518,10 @@ std::map get_params_imp(const std::vector get_arrow_table_imp(const std::string& file_path) { - throw std::runtime_error("Arrow functionality is not available. Rebuild Nyxus with Arrow enabled."); - -#endif -} + arrow::Status status; -#ifdef USEARROW + auto table = theEnvironment.arrow_stream.get_arrow_table(file_path, status); -std::shared_ptr get_arrow_table_imp() { + if (!status.ok()) { + throw std::runtime_error("Error creating Arrow table: " + status.ToString()); + } - return theEnvironment.arrow_output.get_arrow_table(theResultsCache.get_headerBuf(), - theResultsCache.get_stringColBuf(), - theResultsCache.get_calcResultBuf(), - theResultsCache.get_num_rows()); + return table; } #else -void get_arrow_table_imp() { +void get_arrow_table_imp(const std::string& file_path) { throw std::runtime_error("Arrow functionality is not available. Rebuild Nyxus with Arrow enabled."); } @@ -641,10 +600,8 @@ PYBIND11_MODULE(backend, m) m.def("set_environment_params_imp", &set_environment_params_imp, "Set the environment variables of Nyxus"); m.def("get_params_imp", &get_params_imp, "Get parameters of Nyxus"); m.def("arrow_is_enabled_imp", &arrow_is_enabled_imp, "Check if arrow is enabled."); - m.def("create_arrow_file_imp", &create_arrow_file_imp, "Creates an arrow file for the feature calculations"); m.def("get_arrow_file_imp", &get_arrow_file_imp, "Get path to arrow file"); m.def("get_parquet_file_imp", &get_parquet_file_imp, "Returns path to parquet file"); - m.def("create_parquet_file_imp", &create_parquet_file_imp, "Create parquet file for the features calculations"); m.def("get_arrow_table_imp", &get_arrow_table_imp, py::call_guard()); } diff --git a/src/nyx/python/nyxus/nyxus.py b/src/nyx/python/nyxus/nyxus.py index 742a5015..824bd652 100644 --- a/src/nyx/python/nyxus/nyxus.py +++ b/src/nyx/python/nyxus/nyxus.py @@ -32,10 +32,8 @@ if (arrow_headers_found() and arrow_is_enabled_imp()): from .backend import ( - create_arrow_file_imp, get_arrow_file_imp, get_parquet_file_imp, - create_parquet_file_imp, get_arrow_table_imp, ) @@ -215,7 +213,7 @@ def featurize_directory( if (output_type == 'pandas'): - header, string_data, numeric_data = featurize_directory_imp (intensity_dir, label_dir, file_pattern, True) + header, string_data, numeric_data = featurize_directory_imp (intensity_dir, label_dir, file_pattern, True, "") df = pd.concat( [ @@ -233,24 +231,10 @@ def featurize_directory( else: - featurize_directory_imp(intensity_dir, label_dir, file_pattern, False) - - output_type = output_type.lower() # ignore case of output type - - if (output_type == 'arrow' or output_type == 'arrowipc'): - - self.create_arrow_file(output_path) - - return self.get_arrow_ipc_file() - - elif (output_type == 'parquet'): - - self.create_parquet_file(output_path) - - return self.get_parquet_file() - + featurize_directory_imp(intensity_dir, label_dir, file_pattern, False, output_path) + + - def featurize( self, intensity_images: np.ndarray, @@ -286,6 +270,10 @@ def featurize( Pandas DataFrame containing the requested features with one row per label per image. """ + valid_output_types = ['arrow', 'parquet', 'pandas'] + if (output_type != "" and output_type not in valid_output_types): + raise ValueError("Invalid output type: " + output_type + ". Valid options are: " + valid_output_types) + # verify argument types if not isinstance(intensity_images, np.ndarray): @@ -338,7 +326,7 @@ def featurize( if (output_type == 'pandas'): - header, string_data, numeric_data, error_message = featurize_montage_imp (intensity_images, label_images, intensity_names, label_names, True) + header, string_data, numeric_data, error_message = featurize_montage_imp (intensity_images, label_images, intensity_names, label_names, True, "", "") self.error_message = error_message if(error_message != ''): @@ -360,25 +348,21 @@ def featurize( else: - error_message = featurize_montage_imp (intensity_images, label_images, intensity_names, label_names, False) + error_message = featurize_montage_imp (intensity_images, label_images, intensity_names, label_names, False, output_type, output_path) self.error_message = error_message - if(error_message != ''): - print(error_message) - output_type = output_type.lower() # ignore case of output type + if(error_message[0] != ''): + raise RuntimeError('Error calculating features: ' + error_message[0]) - if (output_type == 'arrow' or output_type == 'arrowipc'): - - self.create_arrow_file(output_path) - - return self.get_arrow_ipc_file() - - elif (output_type == 'parquet'): - - self.create_parquet_file(output_path) + if (output_path.endswith('.arrow') or output_path.endswith('.parquet')): + return output_path + else: + if (output_path == ""): + return 'NyxusFeatures.' + output_type + else: + return output_path + '/NyxusFeatures.' + output_type - return self.get_parquet_file() def using_gpu(self, gpu_on: bool): use_gpu(gpu_on) @@ -440,19 +424,7 @@ def featurize_files ( featurize_fname_lists_imp (intensity_files, mask_files, single_roi, False) - output_type = output_type.lower() # ignore case of output type - if (output_type == 'arrow' or output_type == 'arrowipc'): - - self.create_arrow_file(output_path) - - return self.get_arrow_ipc_file() - - elif (output_type == 'parquet'): - - self.create_parquet_file(output_path) - - return self.get_parquet_file() def blacklist_roi(self, blacklist:str): @@ -715,26 +687,6 @@ def get_params(self, *args): return get_params_imp(vars) - - def create_arrow_file(self, path: str="NyxusFeatures.arrow"): - """Creates an Arrow IPC file containing the features. - - This method must be called after calling one of the featurize methods. - - Parameters - ---------- - path: Path to write the arrow file to. (Optional, default "NyxusFeatures.arrow") - - Returns - ------- - None - - """ - if self.arrow_is_enabled(): - create_arrow_file_imp(path) - else: - raise RuntimeError("Nyxus was not built with Arrow. To use this functionality, rebuild Nyxus with Arrow support on.") - def get_arrow_ipc_file(self): """Returns the path to the Arrow IPC file. @@ -753,27 +705,7 @@ def get_arrow_ipc_file(self): return get_arrow_file_imp() else: raise RuntimeError("Nyxus was not built with Arrow. To use this functionality, rebuild Nyxus with Arrow support on.") - - def create_parquet_file(self, path: str="NyxusFeatures.parquet"): - """Creates a Parquet file containing the features. - - This method must be called after calling one of the featurize methods. - - Parameters - ---------- - path: Path to write the parquet file to. (Optional, default "NyxusFeatures.parquet") - - Returns - ------- - None - - """ - if self.arrow_is_enabled(): - create_parquet_file_imp(path) - else: - raise RuntimeError("Nyxus was not built with Arrow. To use this functionality, rebuild Nyxus with Arrow support on.") - def get_parquet_file(self): """Returns the path to the Arrow IPC file. @@ -791,7 +723,7 @@ def get_parquet_file(self): else: raise RuntimeError("Nyxus was not built with Arrow. To use this functionality, rebuild Nyxus with Arrow support on.") - def get_arrow_memory_mapping(self): + def get_arrow_memory_mapping(self, arrow_ipc_file_path): """Returns a memory mapping to the Arrow IPC file. This method creates a memory mapping between the Arrow IPC file on disk to allow @@ -808,13 +740,8 @@ def get_arrow_memory_mapping(self): """ if self.arrow_is_enabled(): - arrow_file_path = self.get_arrow_ipc_file() - - if (arrow_file_path == ""): - self.create_arrow_file() - arrow_file_path = self.get_arrow_ipc_file() - with pa.memory_map(arrow_file_path, 'rb') as source: + with pa.memory_map(arrow_ipc_file_path, 'rb') as source: array = pa.ipc.open_file(source).read_all() return array @@ -822,7 +749,7 @@ def get_arrow_memory_mapping(self): raise RuntimeError("Apache arrow is not enabled. Please rebuild Nyxus with Arrow support to enable this functionality.") - def get_arrow_table(self): + def get_arrow_table(self, arrow_file_path: str): """Returns an arrow table containing the feature calculations. Parameters @@ -836,7 +763,7 @@ def get_arrow_table(self): """ if self.arrow_is_enabled(): - return get_arrow_table_imp() + return get_arrow_table_imp(str(arrow_file_path)) else: raise RuntimeError("Nyxus was not built with Arrow. To use this functionality, rebuild Nyxus with Arrow support on.") diff --git a/src/nyx/results_cache.h b/src/nyx/results_cache.h index 08baa3a5..aa7e5e14 100644 --- a/src/nyx/results_cache.h +++ b/src/nyx/results_cache.h @@ -16,11 +16,6 @@ class ResultsCache totalNumLabels_ = 0; } - std::vector get_headerBufByVal() { return headerBuf_; } - std::vector get_stringColBufByVal() { return stringColBuf_; } - std::vector get_calcResultBufByVal() { return calcResultBuf_; } - - std::vector& get_headerBuf() { return headerBuf_; } std::vector& get_stringColBuf() { return stringColBuf_; } std::vector& get_calcResultBuf() { return calcResultBuf_; } diff --git a/src/nyx/scan_fastloader_way.cpp b/src/nyx/scan_fastloader_way.cpp index 298594a7..9d516f53 100644 --- a/src/nyx/scan_fastloader_way.cpp +++ b/src/nyx/scan_fastloader_way.cpp @@ -26,11 +26,17 @@ namespace py = pybind11; #include "globals.h" #include "helpers/timing.h" +#ifdef USE_ARROW +#include "arrow_output_stream.h" +#include "output_writers.h" +#endif + // Sanity #ifdef _WIN32 #include #endif + namespace Nyxus { bool processIntSegImagePair (const std::string& intens_fpath, const std::string& label_fpath, int num_FL_threads, int filepair_index, int tot_num_filepairs) @@ -203,17 +209,35 @@ namespace Nyxus int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, + bool arrow_output, bool save2csv, - const std::string& csvOutputDir) + const std::string& outputDir) { + #ifdef CHECKTIMING if (Stopwatch::inclusive()) Stopwatch::reset(); #endif - + // One-time initialization init_feature_buffers(); + + // initialize arrow writer if needed + #ifdef USE_ARROW + if (arrow_output) { + + theEnvironment.arrow_stream = ArrowOutputStream(); + + try { + theEnvironment.arrow_writer = theEnvironment.arrow_stream.create_arrow_file(theEnvironment.arrow_output_type, outputDir, Nyxus::get_header(theFeatureSet.getEnabledFeatures())); + } catch (const std::exception &err) { + std::cout << "Error creating Arrow file: " << err.what() << std::endl; + return 1; + } + } + #endif + bool ok = true; // Iterate file pattern-filtered images of the dataset @@ -252,15 +276,33 @@ namespace Nyxus return 1; } + #ifdef USE_ARROW + if (arrow_output) { + + auto status = theEnvironment.arrow_writer->write(Nyxus::get_feature_values()); + + if (!status.ok()) { + // Handle read error + std::cout << "Error writing Arrow file: " << status.ToString() << std::endl; + return 2; + } + } + #endif + // For the non-Apache output mode, save the result for this intensity-label file pair - if (save2csv) - ok = save_features_2_csv(ifp, lfp, csvOutputDir); - else - ok = save_features_2_buffer(theResultsCache); - if (ok == false) - { - std::cout << "save_features_2_csv() returned an error code" << std::endl; - return 2; + if (!arrow_output) { + + if (save2csv) { + ok = save_features_2_csv(ifp, lfp, outputDir); + } else { + ok = save_features_2_buffer(theResultsCache); + } + + if (ok == false) + { + std::cout << "save_features_2_csv() returned an error code" << std::endl; + return 2; + } } theImLoader.close(); @@ -302,11 +344,24 @@ namespace Nyxus // Details - also to a file VERBOSLVL3( fs::path p(theSegFname); - Stopwatch::save_stats(theEnvironment.output_dir + "/inclusive_nyxustiming.csv"); + Stopwatch::save_stats(theEnvironment.output_dir + "/inclusive_nyxustiming.csv"); ); } #endif + #ifdef USE_ARROW + if (arrow_output) { + // close arrow file after use + auto status = theEnvironment.arrow_writer->close(); + + if (!status.ok()) { + // Handle read error + std::cout << "Error closing Arrow file: " << status.ToString() << std::endl; + return 2; + } + } + #endif + return 0; // success } @@ -318,8 +373,23 @@ namespace Nyxus int numReduceThreads, const std::vector& intensity_names, const std::vector& seg_names, - std::string& error_message) - { + std::string& error_message, + bool arrow_output, + const std::string& outputDir) + { + #ifdef USE_ARROW + if (arrow_output) { + + theEnvironment.arrow_stream = ArrowOutputStream(); + + try { + theEnvironment.arrow_writer = theEnvironment.arrow_stream.create_arrow_file(theEnvironment.arrow_output_type, outputDir, Nyxus::get_header(theFeatureSet.getEnabledFeatures())); + } catch (const std::exception &err) { + error_message = err.what(); + return 1; + } + } + #endif auto intens_buffer = intensity_images.request(); auto label_buffer = label_images.request(); @@ -343,8 +413,23 @@ namespace Nyxus error_message = "processIntSegImagePairInMemory() returned an error code while processing file pair"; return 1; } + + #ifdef USE_ARROW + if (arrow_output) { + + auto status = theEnvironment.arrow_writer->write(Nyxus::get_feature_values()); - ok = save_features_2_buffer(theResultsCache); + if (!status.ok()) { + // Handle read error + error_message = "Error writing Arrow file: " + status.ToString(); + return 2; + } + } + #endif + + if (!arrow_output) + ok = save_features_2_buffer(theResultsCache); + if (ok == false) { error_message = "save_features_2_buffer() failed"; @@ -367,6 +452,19 @@ namespace Nyxus if (PyErr_CheckSignals() != 0) throw pybind11::error_already_set(); } + + #ifdef USE_ARROW + if (arrow_output) { + // close arrow file after use + auto status = theEnvironment.arrow_writer->close(); + + if (!status.ok()) { + // Handle read error + error_message = "Error closing Arrow file: " + status.ToString(); + return 2; + } + } + #endif return 0; // success } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 9e2623bf..f73f1d9b 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -45,6 +45,7 @@ set(TEST_SRC test_gabor.cc test_gabor.h test_initialization.h + ../src/nyx/arrow_output_stream.cpp ../src/nyx/features/basic_morphology.cpp ../src/nyx/features/caliper_feret.cpp ../src/nyx/features/caliper_martin.cpp @@ -99,6 +100,7 @@ set(TEST_SRC ../src/nyx/image_loader.cpp ../src/nyx/output_2_buffer.cpp ../src/nyx/output_2_csv.cpp + ../src/nyx/output_writers.cpp ../src/nyx/parallel.cpp ../src/nyx/phase1.cpp ../src/nyx/phase2.cpp @@ -168,6 +170,17 @@ if (ZLIB_FOUND) include_directories (${ZLIB_INCLUDE_DIR}) endif (ZLIB_FOUND) +# Look for installed packages the system +find_package(Arrow REQUIRED) +if (Arrow_FOUND) + list(APPEND runAllTests_LIBRARIES arrow_shared) +endif() + +find_package(Parquet) +if (Parquet_FOUND) + list(APPEND runAllTests_LIBRARIES parquet_shared) +endif() + target_include_directories (runAllTests PUBLIC ${GTEST_INCLUDE_DIRS}) target_link_directories (runAllTests PUBLIC ${GTEST_LIBRARY_PATH}) diff --git a/tests/python/test_nyxus.py b/tests/python/test_nyxus.py index e1ac3eec..3f948cba 100644 --- a/tests/python/test_nyxus.py +++ b/tests/python/test_nyxus.py @@ -89,6 +89,7 @@ def test_gabor_customization (self): def test_get_default_params(self): nyx = nyxus.Nyxus (["*ALL*"]) + assert nyx is not None # actual @@ -311,22 +312,15 @@ def test_make_arrow_ipc(self): features = nyx.featurize(intens, seg) - if (not nyx.arrow_is_enabled()): + arrow_path = nyx.featurize(intens, seg, output_type="arrow") + + arrow_array = nyx.get_arrow_memory_mapping(arrow_path) + + pd_columns = list(features.columns) - with pytest.raises (Exception): - nyx.create_arrow_file() - - with pytest.raises (Exception): - arrow_array = nyx.get_arrow_memory_mapping() - return - - nyx.create_arrow_file() - - arrow_array = nyx.get_arrow_memory_mapping() - - for col in features: - column_list = features[col].tolist() - arrow_list = arrow_array[col] + for i in range(len(features.columns)): + column_list = features[pd_columns[i]].tolist() + arrow_list = arrow_array[i] for i in range(len(column_list)): feature_value = column_list[i] @@ -361,11 +355,13 @@ def test_arrow_ipc(self): features = nyx.featurize(intens, seg) - arrow_array = nyx.get_arrow_memory_mapping() + arrow_array = nyx.get_arrow_memory_mapping(arrow_path) - for col in features: - column_list = features[col].tolist() - arrow_list = arrow_array[col] + pd_columns = list(features.columns) + + for i in range(len(features.columns)): + column_list = features[pd_columns[i]].tolist() + arrow_list = arrow_array[i] for i in range(len(column_list)): feature_value = column_list[i] @@ -400,43 +396,12 @@ def test_arrow_ipc_no_path(self): features = nyx.featurize(intens, seg) - arrow_array = nyx.get_arrow_memory_mapping() - - for col in features: - column_list = features[col].tolist() - arrow_list = arrow_array[col] + arrow_array = nyx.get_arrow_memory_mapping(arrow_path) + pd_columns = list(features.columns) - for i in range(len(column_list)): - feature_value = column_list[i] - arrow_value = arrow_list[i].as_py() - - #skip nan values - if (isinstance(feature_value, (int, float)) and math.isnan(feature_value)): - if (not math.isnan(arrow_value)): - assert False - - continue - assert feature_value == arrow_value - - @pytest.mark.arrow - def test_arrow_ipc_no_create(self): - - nyx = nyxus.Nyxus (["*ALL*"]) - assert nyx is not None - - features = nyx.featurize(intens, seg) - - if (not nyx.arrow_is_enabled()): - with pytest.raises (Exception): - arrow_array = nyx.get_arrow_memory_mapping() - - return - - arrow_array = nyx.get_arrow_memory_mapping() - - for col in features: - column_list = features[col].tolist() - arrow_list = arrow_array[col] + for i in range(len(features.columns)): + column_list = features[pd_columns[i]].tolist() + arrow_list = arrow_array[i] for i in range(len(column_list)): feature_value = column_list[i] @@ -449,8 +414,7 @@ def test_arrow_ipc_no_create(self): continue assert feature_value == arrow_value - - path = nyx.get_arrow_ipc_file() + @pytest.mark.arrow def test_arrow_ipc_path(self): @@ -458,122 +422,19 @@ def test_arrow_ipc_path(self): nyx = nyxus.Nyxus (["*ALL*"]) assert nyx is not None - features = nyx.featurize(intens, seg) - - if (not nyx.arrow_is_enabled()): - with pytest.raises (Exception): - nyx.create_arrow_file() - return - - nyx.create_arrow_file() - - path = nyx.get_arrow_ipc_file() - - assert path == 'NyxusFeatures.arrow' - - @pytest.mark.arrow - def test_arrow_ipc_path_no_create(self): - - nyx = nyxus.Nyxus (["*ALL*"]) - assert nyx is not None - - if (not nyx.arrow_is_enabled()): - assert True - return - - features = nyx.featurize(intens, seg) - - if (not nyx.arrow_is_enabled()): - with pytest.raises (Exception): - path = nyx.get_arrow_ipc_file() - return - - path = nyx.get_arrow_ipc_file() - - assert path == 'NyxusFeatures.arrow' - - #os.remove(path) - - @pytest.mark.arrow - def test_custom_arrow_ipc_path(self): - - nyx = nyxus.Nyxus (["*ALL*"]) - assert nyx is not None - - if (not nyx.arrow_is_enabled()): - assert True - return - - features = nyx.featurize(intens, seg) - - nyx.create_arrow_file('out/out.arrow') - - if (not nyx.arrow_is_enabled()): - with pytest.raises (Exception): - path = nyx.get_arrow_ipc_file() - - return + arrow_path = nyx.featurize(intens, seg, output_type="arrow") path = nyx.get_arrow_ipc_file() - - assert path == 'out/out.arrow' - - @pytest.mark.arrow - def test_make_parquet_file(self): - - nyx = nyxus.Nyxus (["*ALL*"]) - assert nyx is not None - - features = nyx.featurize(intens, seg) - - if (not nyx.arrow_is_enabled()): - with pytest.raises (Exception): - nyx.create_parquet_file() - - with pytest.raises (Exception): - parquet_file = nyx.get_parquet_file() - - return - - nyx.create_parquet_file() - - parquet_file = nyx.get_parquet_file() - # Read the Parquet file into a Pandas DataFrame - parquet_df = pq.read_table(parquet_file).to_pandas() - - - for col in features: - column_list = features[col].tolist() - arrow_list = parquet_df[col].tolist() - - for i in range(len(column_list)): - feature_value = column_list[i] - arrow_value = arrow_list[i] - - #skip nan values - if (isinstance(feature_value, (int, float)) and math.isnan(feature_value)): - if (not math.isnan(arrow_value)): - assert False + assert arrow_path == 'NyxusFeatures.arrow' - continue - assert feature_value == arrow_value @pytest.mark.arrow def test_parquet_writer(self): nyx = nyxus.Nyxus (["*ALL*"]) assert nyx is not None - - if (not nyx.arrow_is_enabled()): - with pytest.raises (Exception): - nyx.create_parquet_file() - - with pytest.raises (Exception): - parquet_file = nyx.get_parquet_file() - - return features = nyx.featurize(intens, seg) @@ -581,11 +442,13 @@ def test_parquet_writer(self): # Read the Parquet file into a Pandas DataFrame parquet_df = pq.read_table(parquet_file).to_pandas() + pd_columns = list(features.columns) + + arrow_columns = list(parquet_df.columns) - - for col in features: - column_list = features[col].tolist() - arrow_list = parquet_df[col].tolist() + for i in range(len(features.columns)): + column_list = features[pd_columns[i]].tolist() + arrow_list = parquet_df[arrow_columns[i]].tolist() for i in range(len(column_list)): feature_value = column_list[i] @@ -606,15 +469,6 @@ def test_parquet_writer(self): assert nyx is not None - if (not nyx.arrow_is_enabled()): - with pytest.raises (Exception): - nyx.create_parquet_file() - - with pytest.raises (Exception): - parquet_file = nyx.get_parquet_file() - - return - features = nyx.featurize(intens, seg) parquet_file = nyx.featurize(intens, seg, output_type="parquet") @@ -623,11 +477,14 @@ def test_parquet_writer(self): # Read the Parquet file into a Pandas DataFrame parquet_df = pq.read_table(parquet_file).to_pandas() - - for col in features: - column_list = features[col].tolist() - arrow_list = parquet_df[col].tolist() + pd_columns = list(features.columns) + + arrow_columns = list(parquet_df.columns) + + for i in range(len(features.columns)): + column_list = features[pd_columns[i]].tolist() + arrow_list = parquet_df[arrow_columns[i]].tolist() for i in range(len(column_list)): feature_value = column_list[i] @@ -640,4 +497,68 @@ def test_parquet_writer(self): continue assert feature_value == arrow_value - \ No newline at end of file + ''' + @pytest.mark.arrow + def test_arrow_ipc_get_table(self): + + nyx = nyxus.Nyxus (["*ALL*"]) + assert nyx is not None + + arrow_path = nyx.featurize(intens, seg, output_type="arrow") + + assert arrow_path == 'NyxusFeatures.arrow' + + features = nyx.featurize(intens, seg) + + arrow_table = nyx.get_arrow_table(arrow_path) + + arrow_df = arrow_table.to_pandas() + + for col in features: + column_list = features[col].tolist() + arrow_list = arrow_df[col].tolist() + + for i in range(len(column_list)): + feature_value = column_list[i] + arrow_value = arrow_list[i] + + #skip nan values + if (isinstance(feature_value, (int, float)) and math.isnan(feature_value)): + if (not math.isnan(arrow_value)): + assert False + + continue + assert feature_value == arrow_value + + @pytest.mark.arrow + def test_parquet_get_table(self): + + nyx = nyxus.Nyxus (["*ALL*"]) + assert nyx is not None + + arrow_path = nyx.featurize(intens, seg, output_type="parquet") + + assert arrow_path == 'NyxusFeatures.parquet' + + features = nyx.featurize(intens, seg) + + arrow_table = nyx.get_arrow_table(arrow_path) + + arrow_df = arrow_table.to_pandas() + + for col in features: + column_list = features[col].tolist() + arrow_list = arrow_df[col].tolist() + + for i in range(len(column_list)): + feature_value = column_list[i] + arrow_value = arrow_list[i] + + #skip nan values + if (isinstance(feature_value, (int, float)) and math.isnan(feature_value)): + if (not math.isnan(arrow_value)): + assert False + + continue + assert feature_value == arrow_value + ''' \ No newline at end of file