Skip to content

Commit

Permalink
Merge branch 'branch-25.02' into host_udf_reduction
Browse files Browse the repository at this point in the history
  • Loading branch information
ttnghia committed Dec 20, 2024
2 parents 2da9137 + fb62d0e commit fca442a
Show file tree
Hide file tree
Showing 34 changed files with 243 additions and 216 deletions.
4 changes: 2 additions & 2 deletions cpp/include/cudf/io/nvcomp_adapter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include <string>

namespace CUDF_EXPORT cudf {
namespace io::nvcomp {
namespace io::detail::nvcomp {

enum class compression_type { SNAPPY, ZSTD, DEFLATE, LZ4, GZIP };

Expand Down Expand Up @@ -88,5 +88,5 @@ inline bool operator==(feature_status_parameters const& lhs, feature_status_para
[[nodiscard]] std::optional<std::string> is_decompression_disabled(
compression_type compression, feature_status_parameters params = feature_status_parameters());

} // namespace io::nvcomp
} // namespace io::detail::nvcomp
} // namespace CUDF_EXPORT cudf
10 changes: 2 additions & 8 deletions cpp/src/groupby/sort/aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,7 @@ void aggregate_result_functor::operator()<aggregation::MIN>(aggregation const& a
operator()<aggregation::ARGMIN>(*argmin_agg);
column_view const argmin_result = cache.get_result(values, *argmin_agg);

// We make a view of ARGMIN result without a null mask and gather using
// this mask. The values in data buffer of ARGMIN result corresponding
// to null values was initialized to ARGMIN_SENTINEL which is an out of
// bounds index value and causes the gathered value to be null.
// Compute the ARGMIN result without the null mask in the gather map.
column_view const null_removed_map(
data_type(type_to_id<size_type>()),
argmin_result.size(),
Expand Down Expand Up @@ -251,10 +248,7 @@ void aggregate_result_functor::operator()<aggregation::MAX>(aggregation const& a
operator()<aggregation::ARGMAX>(*argmax_agg);
column_view const argmax_result = cache.get_result(values, *argmax_agg);

// We make a view of ARGMAX result without a null mask and gather using
// this mask. The values in data buffer of ARGMAX result corresponding
// to null values was initialized to ARGMAX_SENTINEL which is an out of
// bounds index value and causes the gathered value to be null.
// Compute the ARGMAX result without the null mask in the gather map.
column_view const null_removed_map(
data_type(type_to_id<size_type>()),
argmax_result.size(),
Expand Down
31 changes: 15 additions & 16 deletions cpp/src/groupby/sort/group_argmax.cu
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,21 @@ std::unique_ptr<column> group_argmax(column_view const& values,
stream,
mr);

// The functor returns the index of maximum in the sorted values.
// We need the index of maximum in the original unsorted values.
// So use indices to gather the sort order used to sort `values`.
// Gather map cannot be null so we make a view with the mask removed.
// The values in data buffer of indices corresponding to null values was
// initialized to ARGMAX_SENTINEL. Using gather_if.
// This can't use gather because nulls in gathered column will not store ARGMAX_SENTINEL.
auto indices_view = indices->mutable_view();
thrust::gather_if(rmm::exec_policy(stream),
indices_view.begin<size_type>(), // map first
indices_view.end<size_type>(), // map last
indices_view.begin<size_type>(), // stencil
key_sort_order.begin<size_type>(), // input
indices_view.begin<size_type>(), // result
[] __device__(auto i) { return (i != cudf::detail::ARGMAX_SENTINEL); });
return indices;
// The functor returns the indices of maximums based on the sorted keys.
// We need the indices of maximums from the original unsorted keys
// so we use these indices and the key_sort_order to map to the correct indices.
// We do not use cudf::gather since we can move the null-mask separately.
auto indices_view = indices->view();
auto output = rmm::device_uvector<size_type>(indices_view.size(), stream, mr);
thrust::gather(rmm::exec_policy_nosync(stream),
indices_view.begin<size_type>(), // map first
indices_view.end<size_type>(), // map last
key_sort_order.begin<size_type>(), // input
output.data() // result (must not overlap map)
);
auto null_count = indices_view.null_count();
auto null_mask = indices->release().null_mask.release();
return std::make_unique<column>(std::move(output), std::move(*null_mask), null_count);
}

} // namespace detail
Expand Down
32 changes: 16 additions & 16 deletions cpp/src/groupby/sort/group_argmin.cu
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>

#include <thrust/gather.h>

Expand All @@ -42,22 +43,21 @@ std::unique_ptr<column> group_argmin(column_view const& values,
stream,
mr);

// The functor returns the index of minimum in the sorted values.
// We need the index of minimum in the original unsorted values.
// So use indices to gather the sort order used to sort `values`.
// The values in data buffer of indices corresponding to null values was
// initialized to ARGMIN_SENTINEL. Using gather_if.
// This can't use gather because nulls in gathered column will not store ARGMIN_SENTINEL.
auto indices_view = indices->mutable_view();
thrust::gather_if(rmm::exec_policy(stream),
indices_view.begin<size_type>(), // map first
indices_view.end<size_type>(), // map last
indices_view.begin<size_type>(), // stencil
key_sort_order.begin<size_type>(), // input
indices_view.begin<size_type>(), // result
[] __device__(auto i) { return (i != cudf::detail::ARGMIN_SENTINEL); });

return indices;
// The functor returns the indices of minimums based on the sorted keys.
// We need the indices of minimums from the original unsorted keys
// so we use these and the key_sort_order to map to the correct indices.
// We do not use cudf::gather since we can move the null-mask separately.
auto indices_view = indices->view();
auto output = rmm::device_uvector<size_type>(indices_view.size(), stream, mr);
thrust::gather(rmm::exec_policy_nosync(stream),
indices_view.begin<size_type>(), // map first
indices_view.end<size_type>(), // map last
key_sort_order.begin<size_type>(), // input
output.data() // result (must not overlap map)
);
auto null_count = indices_view.null_count();
auto null_mask = indices->release().null_mask.release();
return std::make_unique<column>(std::move(output), std::move(*null_mask), null_count);
}

} // namespace detail
Expand Down
37 changes: 37 additions & 0 deletions cpp/src/io/comp/common.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cstddef>

namespace cudf::io::detail {

/**
* @brief The size used for padding a data buffer's size to a multiple of the padding.
*
* Padding is necessary for input/output buffers of several compression/decompression kernels
* (inflate_kernel and nvcomp snappy). Such kernels operate on aligned data pointers, which require
* padding to the buffers so that the pointers can shift along the address space to satisfy their
* alignment requirement.
*
* In the meantime, it is not entirely clear why such padding is needed. We need to further
* investigate and implement a better fix rather than just padding the buffer.
* See https://github.com/rapidsai/cudf/issues/13605.
*/
constexpr std::size_t BUFFER_PADDING_MULTIPLE{8};

} // namespace cudf::io::detail
5 changes: 2 additions & 3 deletions cpp/src/io/comp/comp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,14 @@ std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src,
outputs[0] = d_dst;
outputs.host_to_device_async(stream);

cudf::detail::hostdevice_vector<cudf::io::compression_result> hd_status(1, stream);
cudf::detail::hostdevice_vector<compression_result> hd_status(1, stream);
hd_status[0] = {};
hd_status.host_to_device_async(stream);

nvcomp::batched_compress(nvcomp::compression_type::SNAPPY, inputs, outputs, hd_status, stream);

hd_status.device_to_host_sync(stream);
CUDF_EXPECTS(hd_status[0].status == cudf::io::compression_status::SUCCESS,
"snappy compression failed");
CUDF_EXPECTS(hd_status[0].status == compression_status::SUCCESS, "snappy compression failed");
return cudf::detail::make_std_vector_sync<uint8_t>(d_dst, stream);
}

Expand Down
22 changes: 20 additions & 2 deletions cpp/src/io/comp/comp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,34 @@

#pragma once

#include "common.hpp"

#include <cudf/io/types.hpp>
#include <cudf/utilities/span.hpp>

#include <memory>
#include <string>
#include <vector>

namespace CUDF_EXPORT cudf {
namespace io::detail {

/**
* @brief Status of a compression/decompression operation.
*/
enum class compression_status : uint8_t {
SUCCESS, ///< Successful, output is valid
FAILURE, ///< Failed, output is invalid (e.g. input is unsupported in some way)
SKIPPED, ///< Operation skipped (if conversion, uncompressed data can be used)
OUTPUT_OVERFLOW, ///< Output buffer is too small; operation can succeed with larger output
};

/**
* @brief Descriptor of compression/decompression result.
*/
struct compression_result {
uint64_t bytes_written;
compression_status status;
};

/**
* @brief Compresses a system memory buffer.
*
Expand Down
8 changes: 3 additions & 5 deletions cpp/src/io/comp/debrotli.cu
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ THE SOFTWARE.

#include <rmm/cuda_stream_view.hpp>

namespace cudf {
namespace io {
namespace cudf::io::detail {

constexpr uint32_t huffman_lookup_table_width = 8;
constexpr int8_t brotli_code_length_codes = 18;
constexpr uint32_t brotli_num_distance_short_codes = 16;
Expand Down Expand Up @@ -2020,7 +2020,6 @@ CUDF_KERNEL void __launch_bounds__(block_size, 2)
results[block_id].status =
(s->error == 0) ? compression_status::SUCCESS : compression_status::FAILURE;
// Return ext heap used by last block (statistics)
results[block_id].reserved = s->fb_size;
}
}

Expand Down Expand Up @@ -2115,5 +2114,4 @@ void gpu_debrotli(device_span<device_span<uint8_t const> const> inputs,
#endif
}

} // namespace io
} // namespace cudf
} // namespace cudf::io::detail
7 changes: 2 additions & 5 deletions cpp/src/io/comp/gpuinflate.cu
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ Mark Adler [email protected]

#include <rmm/cuda_stream_view.hpp>

namespace cudf {
namespace io {
namespace cudf::io::detail {

constexpr int max_bits = 15; // maximum bits in a code
constexpr int max_l_codes = 286; // maximum number of literal/length codes
Expand Down Expand Up @@ -1139,7 +1138,6 @@ CUDF_KERNEL void __launch_bounds__(block_size)
default: return compression_status::FAILURE;
}
}();
results[z].reserved = (int)(state->end - state->cur); // Here mainly for debug purposes
}
}

Expand Down Expand Up @@ -1224,5 +1222,4 @@ void gpu_copy_uncompressed_blocks(device_span<device_span<uint8_t const> const>
}
}

} // namespace io
} // namespace cudf
} // namespace cudf::io::detail
41 changes: 4 additions & 37 deletions cpp/src/io/comp/gpuinflate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#pragma once

#include "io/comp/comp.hpp"

#include <cudf/io/types.hpp>
#include <cudf/utilities/export.hpp>
#include <cudf/utilities/span.hpp>
Expand All @@ -24,44 +26,10 @@

#include <cstdint>

namespace cudf {
namespace io {

/**
* @brief Status of a compression/decompression operation.
*/
enum class compression_status : uint8_t {
SUCCESS, ///< Successful, output is valid
FAILURE, ///< Failed, output is invalid (e.g. input is unsupported in some way)
SKIPPED, ///< Operation skipped (if conversion, uncompressed data can be used)
OUTPUT_OVERFLOW, ///< Output buffer is too small; operation can succeed with larger output
};

/**
* @brief Descriptor of compression/decompression result.
*/
struct compression_result {
uint64_t bytes_written;
compression_status status;
uint32_t reserved;
};
namespace cudf::io::detail {

enum class gzip_header_included { NO, YES };

/**
* @brief The value used for padding a data buffer such that its size will be multiple of it.
*
* Padding is necessary for input/output buffers of several compression/decompression kernels
* (inflate_kernel and nvcomp snappy). Such kernels operate on aligned data pointers, which require
* padding to the buffers so that the pointers can shift along the address space to satisfy their
* alignment requirement.
*
* In the meantime, it is not entirely clear why such padding is needed. We need to further
* investigate and implement a better fix rather than just padding the buffer.
* See https://github.com/rapidsai/cudf/issues/13605.
*/
constexpr std::size_t BUFFER_PADDING_MULTIPLE{8};

/**
* @brief Interface for decompressing GZIP-compressed data
*
Expand Down Expand Up @@ -169,5 +137,4 @@ void gpu_snap(device_span<device_span<uint8_t const> const> inputs,
device_span<compression_result const> results,
rmm::cuda_stream_view stream);

} // namespace io
} // namespace cudf
} // namespace cudf::io::detail
6 changes: 2 additions & 4 deletions cpp/src/io/comp/io_uncomp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@

#pragma once

#include "common.hpp"

#include <cudf/io/types.hpp>
#include <cudf/utilities/span.hpp>

#include <memory>
#include <string>
#include <vector>

using cudf::host_span;

namespace CUDF_EXPORT cudf {
namespace io::detail {

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/comp/nvcomp_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

#include <mutex>

namespace cudf::io::nvcomp {
namespace cudf::io::detail::nvcomp {
namespace {

// Dispatcher for nvcompBatched<format>DecompressGetTempSizeEx
Expand Down Expand Up @@ -478,4 +478,4 @@ std::optional<size_t> compress_max_allowed_chunk_size(compression_type compressi
}
}

} // namespace cudf::io::nvcomp
} // namespace cudf::io::detail::nvcomp
6 changes: 3 additions & 3 deletions cpp/src/io/comp/nvcomp_adapter.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,7 +23,7 @@
#include <thrust/transform.h>
#include <thrust/tuple.h>

namespace cudf::io::nvcomp {
namespace cudf::io::detail::nvcomp {

batched_args create_batched_nvcomp_args(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
Expand Down Expand Up @@ -127,4 +127,4 @@ std::pair<size_t, size_t> max_chunk_and_total_input_size(device_span<size_t cons
return {max, sum};
}

} // namespace cudf::io::nvcomp
} // namespace cudf::io::detail::nvcomp
Loading

0 comments on commit fca442a

Please sign in to comment.