Skip to content

Commit

Permalink
Add option to Parquet writer to skip compressing individual columns (#…
Browse files Browse the repository at this point in the history
…15411)

#15081 added the ability to select per-column encodings in the Parquet writer. Some Parquet encodings (e.g `DELTA_BINARY_PACKED`) do not mix well with compression (see [PARQUET-2414](https://issues.apache.org/jira/browse/PARQUET-2414) for example). This PR adds the ability to turn off compression for select columns. This uses the same mechanism as encoding selection, so an example use would be:
```c++
  cudf::io::table_input_metadata table_metadata(table);
  table_metadata.column_metadata[0]
    .set_name("int_delta_binary")
    .set_encoding(cudf::io::column_encoding::DELTA_BINARY_PACKED)
    .set_skip_compression(true);
```

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Bradley Dice (https://github.com/bdice)

Approvers:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Bradley Dice (https://github.com/bdice)

URL: #15411
  • Loading branch information
etseidl authored Apr 18, 2024
1 parent d1b92e2 commit e0c4280
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 1 deletion.
21 changes: 21 additions & 0 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ class column_in_metadata {
bool _list_column_is_map = false;
bool _use_int96_timestamp = false;
bool _output_as_binary = false;
bool _skip_compression = false;
std::optional<uint8_t> _decimal_precision;
std::optional<int32_t> _parquet_field_id;
std::vector<column_in_metadata> children;
Expand Down Expand Up @@ -722,6 +723,19 @@ class column_in_metadata {
return *this;
}

/**
* @brief Specifies whether this column should not be compressed regardless of the compression
* codec specified for the file.
*
* @param skip If `true` do not compress this column
* @return this for chaining
*/
column_in_metadata& set_skip_compression(bool skip) noexcept
{
_skip_compression = skip;
return *this;
}

/**
* @brief Sets the encoding to use for this column.
*
Expand Down Expand Up @@ -844,6 +858,13 @@ class column_in_metadata {
*/
[[nodiscard]] bool is_enabled_output_as_binary() const noexcept { return _output_as_binary; }

/**
* @brief Get whether to skip compressing this column
*
* @return Boolean indicating whether to skip compression of this column
*/
[[nodiscard]] bool is_enabled_skip_compression() const noexcept { return _skip_compression; }

/**
* @brief Get the encoding that was set for this column.
*
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1591,7 +1591,9 @@ __device__ void finish_page_encode(state_buf* s,
}
pages[blockIdx.x] = s->page;
if (not comp_results.empty()) {
comp_results[blockIdx.x] = {0, compression_status::FAILURE};
auto const status =
s->col.skip_compression ? compression_status::SKIPPED : compression_status::FAILURE;
comp_results[blockIdx.x] = {0, status};
pages[blockIdx.x].comp_res = &comp_results[blockIdx.x];
}
}
Expand Down Expand Up @@ -2495,6 +2497,7 @@ CUDF_KERNEL void __launch_bounds__(decide_compression_block_size)
if (auto comp_res = curr_page.comp_res; comp_res != nullptr) {
auto const lvl_bytes = curr_page.is_v2() ? curr_page.level_bytes() : 0;
compressed_data_size += comp_res->bytes_written + lvl_bytes;
// TODO: would this be better as a ballot?
if (comp_res->status != compression_status::SUCCESS) {
atomicOr(&compression_error[warp_id], 1);
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ struct parquet_column_device_view : stats_column_desc {
//!< nullability of parent_column. May be different from
//!< col.nullable() in case of chunked writing.
bool output_as_byte_array; //!< Indicates this list column is being written as a byte array
bool skip_compression; //!< Skip compression for this column
column_encoding requested_encoding; //!< User specified encoding for this column.
};

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ struct schema_tree_node : public SchemaElement {
statistics_dtype stats_dtype;
int32_t ts_scale;
column_encoding requested_encoding;
bool skip_compression;

// TODO(fut): Think about making schema a class that holds a vector of schema_tree_nodes. The
// function construct_schema_tree could be its constructor. It can have method to get the per
Expand Down Expand Up @@ -698,6 +699,7 @@ std::vector<schema_tree_node> construct_schema_tree(
set_field_id(col_schema, col_meta);
set_encoding(col_schema, col_meta);
col_schema.output_as_byte_array = col_meta.is_enabled_output_as_binary();
col_schema.skip_compression = col_meta.is_enabled_skip_compression();
schema.push_back(col_schema);
} else if (col->type().id() == type_id::STRUCT) {
// if struct, add current and recursively call for all children
Expand Down Expand Up @@ -833,6 +835,7 @@ std::vector<schema_tree_node> construct_schema_tree(
col_schema.leaf_column = col;
set_field_id(col_schema, col_meta);
set_encoding(col_schema, col_meta);
col_schema.skip_compression = col_meta.is_enabled_skip_compression();
schema.push_back(col_schema);
}
};
Expand Down Expand Up @@ -1023,6 +1026,7 @@ parquet_column_device_view parquet_column_view::get_device_view(rmm::cuda_stream
desc.max_def_level = _max_def_level;
desc.max_rep_level = _max_rep_level;
desc.requested_encoding = schema_node.requested_encoding;
desc.skip_compression = schema_node.skip_compression;
return desc;
}

Expand Down
42 changes: 42 additions & 0 deletions cpp/tests/io/parquet_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@

#include <cudf/io/data_sink.hpp>
#include <cudf/io/parquet.hpp>
#include <cudf/io/types.hpp>
#include <cudf/unary.hpp>

#include <src/io/parquet/parquet_common.hpp>

#include <fstream>

using cudf::test::iterators::no_nulls;
Expand Down Expand Up @@ -1321,6 +1324,45 @@ TEST_F(ParquetWriterTest, CompStatsEmptyTable)
expect_compression_stats_empty(stats);
}

TEST_F(ParquetWriterTest, SkipCompression)
{
constexpr auto page_rows = 1000;
constexpr auto row_group_rows = 2 * page_rows;
constexpr auto num_rows = 2 * row_group_rows;

auto sequence = thrust::make_counting_iterator(0);
column_wrapper<int> col(sequence, sequence + num_rows, no_nulls());

auto expected = table_view{{col, col}};
auto expected_metadata = cudf::io::table_input_metadata{expected};
expected_metadata.column_metadata[0].set_skip_compression(true);

auto const filepath = temp_env->get_temp_filepath("SkipCompression.parquet");
cudf::io::parquet_writer_options out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected)
.compression(cudf::io::compression_type::ZSTD)
.max_page_size_rows(page_rows)
.row_group_size_rows(row_group_rows)
.max_page_fragment_size(page_rows)
.metadata(std::move(expected_metadata));

cudf::io::write_parquet(out_opts);

cudf::io::parquet_reader_options read_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath});
auto result = cudf::io::read_parquet(read_opts);

CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, expected);

// check metadata to make sure column 0 is not compressed and column 1 is
auto const source = cudf::io::datasource::create(filepath);
cudf::io::parquet::detail::FileMetaData fmd;
read_footer(source, &fmd);

EXPECT_EQ(fmd.row_groups[0].columns[0].meta_data.codec, cudf::io::parquet::detail::UNCOMPRESSED);
EXPECT_EQ(fmd.row_groups[0].columns[1].meta_data.codec, cudf::io::parquet::detail::ZSTD);
}

TEST_F(ParquetWriterTest, NoNullsAsNonNullable)
{
column_wrapper<int32_t> col{{1, 2, 3}, no_nulls()};
Expand Down

0 comments on commit e0c4280

Please sign in to comment.