Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Apache Arrow stream writers #147

Merged
merged 30 commits into from
Oct 10, 2023
Merged

Conversation

JesseMckinzie
Copy link
Member

  • Update Apache Arrow writers for Arrow IPC and Parquet formats to write in streams to make Arrow writing fully scalable
  • Update Python API and CLI to use new writers
  • Add unit tests for updated get_arrow_table method in the Python API

src/nyx/main_nyxus.cpp Outdated Show resolved Hide resolved
@@ -27,6 +27,7 @@

namespace Nyxus
{

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a white-space change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was left behind after removing a function I implemented

std::string arrow_output_type = "";
ArrowOutputStream arrow_stream;
std::shared_ptr<ApacheArrowWriter> arrow_writer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initialize to nullptr?

@@ -16,9 +20,20 @@
#include <memory>

#include "helpers/helpers.h"
#include "globals.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear why we need this here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer needed

// Sanity
#ifdef _WIN32
#include<windows.h>
#endif

#include <chrono>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

Comment on lines +300 to +304
if (ok == false)
{
std::cout << "save_features_2_csv() returned an error code" << std::endl;
return 2;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This need to be updated to refer to proper function call. May be something like

if (!ok){
  if(save2csv){...}
  else {...}
}

if (!status.ok()) {
// Handle read error
auto err = status.ToString();
throw std::runtime_error("Error writing Arrow file: " + err);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we throw? This can be performance degrading if we are in a situation where we keep throwing. Are we catching this in the caller code? If not, may be just ignore/log and move on?
Since this function already return error codes if anything goes wrong, we should just use the same error reporting mechanism instead of throw.

Comment on lines 78 to 79
use_arrow, // 'true' to save to csv
theEnvironment.useCsv,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the arg comment :-)

@@ -0,0 +1,166 @@

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a required file?

#ifdef USE_ARROW
if (arrow_output) {

auto features = Nyxus::get_feature_values();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is an extra call that we don't need?

throw std::invalid_argument("The arrow file path must end in \".arrow\"");
}

if (!(arrow_file_type_upper == "ARROW" || arrow_file_type_upper == "ARROWIPC" || arrow_file_type_upper == "PARQUET")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How a user can specify via arrow_file_type_upper that the desired format is ".feather" ?

@@ -32,7 +32,7 @@ namespace Nyxus

bool scanFilePairParallel(const std::string& intens_fpath, const std::string& label_fpath, int num_fastloader_threads, int num_sensemaker_threads, int filepair_index, int tot_num_filepairs);
std::string getPureFname(const std::string& fpath);
int processDataset(const std::vector<std::string>& intensFiles, const std::vector<std::string>& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, bool save2csv, const std::string& csvOutputDir);
int processDataset(const std::vector<std::string>& intensFiles, const std::vector<std::string>& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, bool save2csv, bool arrow_output, const std::string& csvOutputDir);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't having overloaded function processDataset() for the separate cases of CSV and Apache output types be less confusing than having a single overparametered processDataset() ?

#ifdef USE_ARROW
use_arrow = theEnvironment.arrow_output_type == "ARROW" || theEnvironment.arrow_output_type == "PARQUET";
#endif

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This piece of logic is OK but is just somewhat not elegant to put such a low-level stuff in main(). Again, if we overload processDataset() as mentioned earlier, we would be able to call it separately with Apache-related and CSV parameters.

return 2;
}
}
#endif
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File scan_fastloader_way.cpp is getting cluttered with #ifdef USE_ARROW code. Which is OK for this PR but I would immediately fix in a subsequent PR to keep the image scan logic free of low-level file format stuff. (Principle of separation of concerns.)

std::shared_ptr<ApacheArrowWriter> create_arrow_file(const std::string& arrow_file_type,
const std::string& arrow_file_path,
const std::vector<std::string>& header);
std::shared_ptr<arrow::Table> get_arrow_table(const std::string& file_path, arrow::Status& table_status);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function call should change in a subsequent PR not to pass a ref to retrieve an error/status code. Instead the caller code will check for nullptr on the return value.

@sameeul sameeul merged commit bfa2543 into PolusAI:main Oct 10, 2023
18 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants