diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index abe2fc8ed8b..9d79733703c 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -13,6 +13,7 @@ jobs: # Please keep pr-builder as the top job here pr-builder: needs: + - check-nightly-ci - changed-files - checks - conda-cpp-build @@ -54,6 +55,18 @@ jobs: - name: Telemetry setup if: ${{ vars.TELEMETRY_ENABLED == 'true' }} uses: rapidsai/shared-actions/telemetry-dispatch-stash-base-env-vars@main + check-nightly-ci: + # Switch to ubuntu-latest once it defaults to a version of Ubuntu that + # provides at least Python 3.11 (see + # https://docs.python.org/3/library/datetime.html#datetime.date.fromisoformat) + runs-on: ubuntu-24.04 + env: + RAPIDS_GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + steps: + - name: Check if nightly CI is passing + uses: rapidsai/shared-actions/check_nightly_success/dispatch@main + with: + repo: cudf changed-files: secrets: inherit needs: telemetry-setup diff --git a/cpp/include/cudf/hashing/detail/murmurhash3_x86_32.cuh b/cpp/include/cudf/hashing/detail/murmurhash3_x86_32.cuh index e0c7ce840d7..69edf38e359 100644 --- a/cpp/include/cudf/hashing/detail/murmurhash3_x86_32.cuh +++ b/cpp/include/cudf/hashing/detail/murmurhash3_x86_32.cuh @@ -57,62 +57,71 @@ struct MurmurHash3_x86_32 { }; template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()(bool const& key) const +MurmurHash3_x86_32::result_type __device__ inline MurmurHash3_x86_32::operator()( + bool const& key) const { return this->compute(static_cast(key)); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()(float const& key) const +MurmurHash3_x86_32::result_type __device__ inline MurmurHash3_x86_32::operator()( + float const& key) const { return this->compute(normalize_nans_and_zeros(key)); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()(double const& key) const +MurmurHash3_x86_32::result_type __device__ inline MurmurHash3_x86_32::operator()( + double const& key) const { return this->compute(normalize_nans_and_zeros(key)); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()( - cudf::string_view const& key) const +MurmurHash3_x86_32::result_type + __device__ inline MurmurHash3_x86_32::operator()( + cudf::string_view const& key) const { return this->compute_bytes(reinterpret_cast(key.data()), key.size_bytes()); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()( - numeric::decimal32 const& key) const +MurmurHash3_x86_32::result_type + __device__ inline MurmurHash3_x86_32::operator()( + numeric::decimal32 const& key) const { return this->compute(key.value()); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()( - numeric::decimal64 const& key) const +MurmurHash3_x86_32::result_type + __device__ inline MurmurHash3_x86_32::operator()( + numeric::decimal64 const& key) const { return this->compute(key.value()); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()( - numeric::decimal128 const& key) const +MurmurHash3_x86_32::result_type + __device__ inline MurmurHash3_x86_32::operator()( + numeric::decimal128 const& key) const { return this->compute(key.value()); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()( - cudf::list_view const& key) const +MurmurHash3_x86_32::result_type + __device__ inline MurmurHash3_x86_32::operator()( + cudf::list_view const& key) const { CUDF_UNREACHABLE("List column hashing is not supported"); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()( - cudf::struct_view const& key) const +MurmurHash3_x86_32::result_type + __device__ inline MurmurHash3_x86_32::operator()( + cudf::struct_view const& key) const { CUDF_UNREACHABLE("Direct hashing of struct_view is not supported"); } diff --git a/cpp/src/groupby/hash/compute_groupby.cu b/cpp/src/groupby/hash/compute_groupby.cu index e1dbf2a3d9e..9648d942513 100644 --- a/cpp/src/groupby/hash/compute_groupby.cu +++ b/cpp/src/groupby/hash/compute_groupby.cu @@ -61,7 +61,7 @@ std::unique_ptr compute_groupby(table_view const& keys, d_row_equal, probing_scheme_t{d_row_hash}, cuco::thread_scope_device, - cuco::storage{}, + cuco::storage{}, cudf::detail::cuco_allocator{rmm::mr::polymorphic_allocator{}, stream}, stream.value()}; diff --git a/cpp/src/groupby/hash/compute_mapping_indices.cuh b/cpp/src/groupby/hash/compute_mapping_indices.cuh index d353830780f..f86a93109be 100644 --- a/cpp/src/groupby/hash/compute_mapping_indices.cuh +++ b/cpp/src/groupby/hash/compute_mapping_indices.cuh @@ -106,15 +106,15 @@ CUDF_KERNEL void mapping_indices_kernel(cudf::size_type num_input_rows, __shared__ cudf::size_type shared_set_indices[GROUPBY_SHM_MAX_ELEMENTS]; // Shared set initialization - __shared__ cuco::window windows[window_extent.value()]; + __shared__ cuco::bucket buckets[bucket_extent.value()]; auto raw_set = cuco::static_set_ref{ cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL}, global_set.key_eq(), probing_scheme_t{global_set.hash_function()}, cuco::thread_scope_block, - cuco::aow_storage_ref{ - window_extent, windows}}; + cuco::bucket_storage_ref{ + bucket_extent, buckets}}; auto shared_set = raw_set.rebind_operators(cuco::insert_and_find); auto const block = cooperative_groups::this_thread_block(); diff --git a/cpp/src/groupby/hash/helpers.cuh b/cpp/src/groupby/hash/helpers.cuh index f950e03e0fb..92925e11bac 100644 --- a/cpp/src/groupby/hash/helpers.cuh +++ b/cpp/src/groupby/hash/helpers.cuh @@ -27,7 +27,7 @@ namespace cudf::groupby::detail::hash { CUDF_HOST_DEVICE auto constexpr GROUPBY_CG_SIZE = 1; /// Number of slots per thread -CUDF_HOST_DEVICE auto constexpr GROUPBY_WINDOW_SIZE = 1; +CUDF_HOST_DEVICE auto constexpr GROUPBY_BUCKET_SIZE = 1; /// Thread block size CUDF_HOST_DEVICE auto constexpr GROUPBY_BLOCK_SIZE = 128; @@ -48,9 +48,9 @@ using shmem_extent_t = cuco::extent(static_cast(GROUPBY_SHM_MAX_ELEMENTS) * 1.43)>; -/// Number of windows needed by each shared memory hash set -CUDF_HOST_DEVICE auto constexpr window_extent = - cuco::make_window_extent(shmem_extent_t{}); +/// Number of buckets needed by each shared memory hash set +CUDF_HOST_DEVICE auto constexpr bucket_extent = + cuco::make_bucket_extent(shmem_extent_t{}); using row_hash_t = cudf::experimental::row::hash::device_row_hasher, - cuco::storage>; + cuco::storage>; using nullable_global_set_t = cuco::static_set, @@ -83,7 +83,7 @@ using nullable_global_set_t = cuco::static_set, - cuco::storage>; + cuco::storage>; template using hash_set_ref_t = cuco::static_set_ref< @@ -91,7 +91,7 @@ using hash_set_ref_t = cuco::static_set_ref< cuda::thread_scope_device, row_comparator_t, probing_scheme_t, - cuco::aow_storage_ref>, + cuco::bucket_storage_ref>, Op>; template @@ -100,6 +100,6 @@ using nullable_hash_set_ref_t = cuco::static_set_ref< cuda::thread_scope_device, nullable_row_comparator_t, probing_scheme_t, - cuco::aow_storage_ref>, + cuco::bucket_storage_ref>, Op>; } // namespace cudf::groupby::detail::hash diff --git a/cpp/src/io/comp/common.hpp b/cpp/src/io/comp/common.hpp index 72bb63e817e..a81ac60e03a 100644 --- a/cpp/src/io/comp/common.hpp +++ b/cpp/src/io/comp/common.hpp @@ -21,7 +21,7 @@ namespace cudf::io::detail { /** - * @brief The value used for padding a data buffer such that its size will be multiple of it. + * @brief The size used for padding a data buffer's size to a multiple of the padding. * * Padding is necessary for input/output buffers of several compression/decompression kernels * (inflate_kernel and nvcomp snappy). Such kernels operate on aligned data pointers, which require diff --git a/cpp/src/io/fst/logical_stack.cuh b/cpp/src/io/fst/logical_stack.cuh index 0f1fc7d572b..98641f2c893 100644 --- a/cpp/src/io/fst/logical_stack.cuh +++ b/cpp/src/io/fst/logical_stack.cuh @@ -513,6 +513,12 @@ void sparse_stack_op_to_top_of_stack(StackSymbolItT d_symbols, stream)); } + // Check if the last element of d_kv_operations is 0. If not, then we have a problem. + if (num_symbols_in && !supports_reset_op) { + StackOpT last_symbol = d_kv_ops_current.element(num_symbols_in - 1, stream); + CUDF_EXPECTS(last_symbol.stack_level == 0, "The logical stack is not empty!"); + } + // Stable radix sort, sorting by stack level of the operations d_kv_operations_unsigned = cub::DoubleBuffer{ reinterpret_cast(d_kv_operations.Current()), diff --git a/cpp/src/io/json/nested_json_gpu.cu b/cpp/src/io/json/nested_json_gpu.cu index f1c2826c62a..30a28a1cf98 100644 --- a/cpp/src/io/json/nested_json_gpu.cu +++ b/cpp/src/io/json/nested_json_gpu.cu @@ -1473,10 +1473,11 @@ void get_stack_context(device_span json_in, to_stack_op::start_state, stream); - auto stack_ops_bufsize = d_num_stack_ops.value(stream); + // Copy back to actual number of stack operations + auto num_stack_ops = d_num_stack_ops.value(stream); // Sequence of stack symbols and their position in the original input (sparse representation) - rmm::device_uvector stack_ops{stack_ops_bufsize, stream}; - rmm::device_uvector stack_op_indices{stack_ops_bufsize, stream}; + rmm::device_uvector stack_ops{num_stack_ops, stream}; + rmm::device_uvector stack_op_indices{num_stack_ops, stream}; // Run bracket-brace FST to retrieve starting positions of structs and lists json_to_stack_ops_fst.Transduce(json_in.begin(), @@ -1487,9 +1488,6 @@ void get_stack_context(device_span json_in, to_stack_op::start_state, stream); - // Copy back to actual number of stack operations - auto const num_stack_ops = d_num_stack_ops.value(stream); - // Stack operations with indices are converted to top of the stack for each character in the input if (stack_behavior == stack_behavior_t::ResetOnDelimiter) { fst::sparse_stack_op_to_top_of_stack( diff --git a/cpp/src/io/orc/dict_enc.cu b/cpp/src/io/orc/dict_enc.cu index 0cb5c382631..7facc6497ed 100644 --- a/cpp/src/io/orc/dict_enc.cu +++ b/cpp/src/io/orc/dict_enc.cu @@ -180,9 +180,9 @@ CUDF_KERNEL void __launch_bounds__(block_size) for (size_type i = 0; i < dict.map_slots.size(); i += block_size) { if (t + i < dict.map_slots.size()) { - auto window = dict.map_slots.begin() + t + i; - // Collect all slots from each window. - for (auto& slot : *window) { + auto bucket = dict.map_slots.begin() + t + i; + // Collect all slots from each bucket. + for (auto& slot : *bucket) { auto const key = slot.first; if (key != KEY_SENTINEL) { auto loc = counter.fetch_add(1, memory_order_relaxed); diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index daff429c087..f4e75f78dec 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -47,16 +47,16 @@ using slot_type = cuco::pair; auto constexpr map_cg_size = 1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset. ///< Note: Adjust insert and find loops to use `cg::tile` if increasing this. -auto constexpr window_size = +auto constexpr bucket_size = 1; ///< Number of concurrent slots (set for best performance) handled by each thread. auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size ///< N * (1/0.7) = 1.43 to target a 70% occupancy factor. -using storage_type = cuco::aow_storage, - cudf::detail::cuco_allocator>; +using storage_type = cuco::bucket_storage, + cudf::detail::cuco_allocator>; using storage_ref_type = typename storage_type::ref_type; -using window_type = typename storage_type::window_type; +using bucket_type = typename storage_type::bucket_type; using slot_type = cuco::pair; auto constexpr KEY_SENTINEL = size_type{-1}; @@ -193,7 +193,7 @@ struct StripeStream { */ struct stripe_dictionary { // input - device_span map_slots; // hash map (windows) storage + device_span map_slots; // hash map (buckets) storage uint32_t column_idx = 0; // column index size_type start_row = 0; // first row in the stripe size_type start_rowgroup = 0; // first rowgroup in the stripe diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index b85ebf2fa1a..b5f9b894c46 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -210,7 +210,7 @@ struct map_find_fn { template CUDF_KERNEL void __launch_bounds__(block_size) - populate_chunk_hash_maps_kernel(device_span const map_storage, + populate_chunk_hash_maps_kernel(device_span const map_storage, cudf::detail::device_2dspan frags) { auto const col_idx = blockIdx.y; @@ -239,7 +239,7 @@ CUDF_KERNEL void __launch_bounds__(block_size) template CUDF_KERNEL void __launch_bounds__(block_size) - collect_map_entries_kernel(device_span const map_storage, + collect_map_entries_kernel(device_span const map_storage, device_span chunks) { auto& chunk = chunks[blockIdx.x]; @@ -251,11 +251,11 @@ CUDF_KERNEL void __launch_bounds__(block_size) if (t == 0) { new (&counter) cuda::atomic{0}; } __syncthreads(); - // Iterate over all windows in the map. + // Iterate over all buckets in the map. for (; t < chunk.dict_map_size; t += block_size) { - auto window = map_storage.data() + chunk.dict_map_offset + t; - // Collect all slots from each window. - for (auto& slot : *window) { + auto bucket = map_storage.data() + chunk.dict_map_offset + t; + // Collect all slots from each bucket. + for (auto& slot : *bucket) { auto const key = slot.first; if (key != KEY_SENTINEL) { auto const loc = counter.fetch_add(1, memory_order_relaxed); @@ -272,7 +272,7 @@ CUDF_KERNEL void __launch_bounds__(block_size) template CUDF_KERNEL void __launch_bounds__(block_size) - get_dictionary_indices_kernel(device_span const map_storage, + get_dictionary_indices_kernel(device_span const map_storage, cudf::detail::device_2dspan frags) { auto const col_idx = blockIdx.y; @@ -302,7 +302,7 @@ CUDF_KERNEL void __launch_bounds__(block_size) s_ck_start_val_idx); } -void populate_chunk_hash_maps(device_span const map_storage, +void populate_chunk_hash_maps(device_span const map_storage, cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream) { @@ -311,7 +311,7 @@ void populate_chunk_hash_maps(device_span const map_storage, <<>>(map_storage, frags); } -void collect_map_entries(device_span const map_storage, +void collect_map_entries(device_span const map_storage, device_span chunks, rmm::cuda_stream_view stream) { @@ -320,7 +320,7 @@ void collect_map_entries(device_span const map_storage, <<>>(map_storage, chunks); } -void get_dictionary_indices(device_span const map_storage, +void get_dictionary_indices(device_span const map_storage, cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream) { diff --git a/cpp/src/io/parquet/parquet_gpu.cuh b/cpp/src/io/parquet/parquet_gpu.cuh index 7c09764da2d..800875f7448 100644 --- a/cpp/src/io/parquet/parquet_gpu.cuh +++ b/cpp/src/io/parquet/parquet_gpu.cuh @@ -34,7 +34,7 @@ using slot_type = cuco::pair; auto constexpr map_cg_size = 1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset. ///< Note: Adjust insert and find loops to use `cg::tile` if increasing this. -auto constexpr window_size = +auto constexpr bucket_size = 1; ///< Number of concurrent slots (set for best performance) handled by each thread. auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size ///< N * (1/0.7) = 1.43 to target a 70% occupancy factor. @@ -43,12 +43,12 @@ auto constexpr KEY_SENTINEL = key_type{-1}; auto constexpr VALUE_SENTINEL = mapped_type{-1}; auto constexpr SCOPE = cuda::thread_scope_block; -using storage_type = cuco::aow_storage, - cudf::detail::cuco_allocator>; +using storage_type = cuco::bucket_storage, + cudf::detail::cuco_allocator>; using storage_ref_type = typename storage_type::ref_type; -using window_type = typename storage_type::window_type; +using bucket_type = typename storage_type::bucket_type; /** * @brief Return the byte length of parquet dtypes that are physically represented by INT32 @@ -100,7 +100,7 @@ inline size_type __device__ row_to_value_idx(size_type idx, * @param frags Column fragments * @param stream CUDA stream to use */ -void populate_chunk_hash_maps(device_span const map_storage, +void populate_chunk_hash_maps(device_span const map_storage, cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream); @@ -111,7 +111,7 @@ void populate_chunk_hash_maps(device_span const map_storage, * @param chunks Flat span of chunks to compact hash maps for * @param stream CUDA stream to use */ -void collect_map_entries(device_span const map_storage, +void collect_map_entries(device_span const map_storage, device_span chunks, rmm::cuda_stream_view stream); @@ -128,7 +128,7 @@ void collect_map_entries(device_span const map_storage, * @param frags Column fragments * @param stream CUDA stream to use */ -void get_dictionary_indices(device_span const map_storage, +void get_dictionary_indices(device_span const map_storage, cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream); diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 012af90afb5..326232ced60 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -552,7 +552,7 @@ void decode_page_headers(pass_intermediate_data& pass, { CUDF_FUNC_RANGE(); - auto iter = thrust::make_counting_iterator(0); + auto iter = thrust::counting_iterator(0); rmm::device_uvector chunk_page_counts(pass.chunks.size() + 1, stream); thrust::transform_exclusive_scan( rmm::exec_policy_nosync(stream), @@ -564,7 +564,7 @@ void decode_page_headers(pass_intermediate_data& pass, return static_cast( i >= num_chunks ? 0 : chunks[i].num_data_pages + chunks[i].num_dict_pages); }), - 0, + size_t{0}, thrust::plus{}); rmm::device_uvector d_chunk_page_info(pass.chunks.size(), stream); thrust::for_each(rmm::exec_policy_nosync(stream), diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 2eb9c49fd88..6b1a20701f9 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1303,7 +1303,7 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, } else { chunk.use_dictionary = true; chunk.dict_map_size = - static_cast(cuco::make_window_extent( + static_cast(cuco::make_bucket_extent( static_cast(occupancy_factor * chunk.num_values))); chunk.dict_map_offset = total_map_storage_size; total_map_storage_size += chunk.dict_map_size; @@ -1318,7 +1318,7 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, total_map_storage_size, cudf::detail::cuco_allocator{rmm::mr::polymorphic_allocator{}, stream}}; // Create a span of non-const map_storage as map_storage_ref takes in a non-const pointer. - device_span const map_storage_data{map_storage.data(), total_map_storage_size}; + device_span const map_storage_data{map_storage.data(), total_map_storage_size}; // Synchronize chunks.host_to_device_async(stream); diff --git a/cpp/tests/io/json/json_test.cpp b/cpp/tests/io/json/json_test.cpp index 37a750330fa..23ca5734ded 100644 --- a/cpp/tests/io/json/json_test.cpp +++ b/cpp/tests/io/json/json_test.cpp @@ -3450,4 +3450,15 @@ TEST_P(JsonCompressedIOTest, BasicJsonLines) CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(1), float64_wrapper{{1.1, 2.2, 3.3}}); } +TEST_F(JsonReaderTest, MismatchedBeginEndTokens) +{ + std::string data = R"({"not_valid": "json)"; + auto opts = + cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()}) + .lines(true) + .recovery_mode(cudf::io::json_recovery_mode_t::FAIL) + .build(); + EXPECT_THROW(cudf::io::read_json(opts), cudf::logic_error); +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index 410fd57691e..da4faabf189 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -12,7 +12,7 @@ # the License. # ============================================================================= -set(cython_sources column.pyx groupby.pyx scalar.pyx strings_udf.pyx types.pyx utils.pyx) +set(cython_sources column.pyx scalar.pyx strings_udf.pyx types.pyx utils.pyx) set(linked_libraries cudf::cudf) rapids_cython_create_modules( diff --git a/python/cudf/cudf/_lib/__init__.py b/python/cudf/cudf/_lib/__init__.py index 6b5a7814e48..10f9d813ccc 100644 --- a/python/cudf/cudf/_lib/__init__.py +++ b/python/cudf/cudf/_lib/__init__.py @@ -1,10 +1,7 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. import numpy as np -from . import ( - groupby, - strings_udf, -) +from . import strings_udf MAX_COLUMN_SIZE = np.iinfo(np.int32).max MAX_COLUMN_SIZE_STR = "INT32_MAX" diff --git a/python/cudf/cudf/_lib/groupby.pyx b/python/cudf/cudf/_lib/groupby.pyx deleted file mode 100644 index 80a77ef2267..00000000000 --- a/python/cudf/cudf/_lib/groupby.pyx +++ /dev/null @@ -1,281 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. -from functools import singledispatch - -from pandas.errors import DataError - -from cudf.api.types import _is_categorical_dtype, is_string_dtype -from cudf.core.buffer import acquire_spill_lock -from cudf.core.dtypes import ( - CategoricalDtype, - DecimalDtype, - IntervalDtype, - ListDtype, - StructDtype, -) - -from cudf._lib.scalar cimport DeviceScalar -from cudf._lib.utils cimport columns_from_pylibcudf_table - -from cudf._lib.scalar import as_device_scalar - -import pylibcudf - -from cudf.core._internals.aggregation import make_aggregation - -# The sets below define the possible aggregations that can be performed on -# different dtypes. These strings must be elements of the AggregationKind enum. -# The libcudf infrastructure exists for "COLLECT" support on -# categoricals, but the dtype support in python does not. -_CATEGORICAL_AGGS = {"COUNT", "NUNIQUE", "SIZE", "UNIQUE"} -_STRING_AGGS = { - "COLLECT", - "COUNT", - "MAX", - "MIN", - "NTH", - "NUNIQUE", - "SIZE", - "UNIQUE", -} -_LIST_AGGS = {"COLLECT"} -_STRUCT_AGGS = {"COLLECT", "CORRELATION", "COVARIANCE"} -_INTERVAL_AGGS = {"COLLECT"} -_DECIMAL_AGGS = { - "ARGMIN", - "ARGMAX", - "COLLECT", - "COUNT", - "MAX", - "MIN", - "NTH", - "NUNIQUE", - "SUM", -} - - -@singledispatch -def get_valid_aggregation(dtype): - if is_string_dtype(dtype): - return _STRING_AGGS - return "ALL" - - -@get_valid_aggregation.register -def _(dtype: ListDtype): - return _LIST_AGGS - - -@get_valid_aggregation.register -def _(dtype: CategoricalDtype): - return _CATEGORICAL_AGGS - - -@get_valid_aggregation.register -def _(dtype: ListDtype): - return _LIST_AGGS - - -@get_valid_aggregation.register -def _(dtype: StructDtype): - return _STRUCT_AGGS - - -@get_valid_aggregation.register -def _(dtype: IntervalDtype): - return _INTERVAL_AGGS - - -@get_valid_aggregation.register -def _(dtype: DecimalDtype): - return _DECIMAL_AGGS - - -cdef class GroupBy: - cdef dict __dict__ - - def __init__(self, keys, dropna=True): - with acquire_spill_lock() as spill_lock: - self._groupby = pylibcudf.groupby.GroupBy( - pylibcudf.table.Table([c.to_pylibcudf(mode="read") for c in keys]), - pylibcudf.types.NullPolicy.EXCLUDE if dropna - else pylibcudf.types.NullPolicy.INCLUDE - ) - - # We spill lock the columns while this GroupBy instance is alive. - self._spill_lock = spill_lock - - def groups(self, list values): - """ - Perform a sort groupby, using the keys used to construct the Groupby as the key - columns and ``values`` as the value columns. - - Parameters - ---------- - values: list of Columns - The value columns - - Returns - ------- - offsets: list of integers - Integer offsets such that offsets[i+1] - offsets[i] - represents the size of group `i`. - grouped_keys: list of Columns - The grouped key columns - grouped_values: list of Columns - The grouped value columns - """ - offsets, grouped_keys, grouped_values = self._groupby.get_groups( - pylibcudf.table.Table([c.to_pylibcudf(mode="read") for c in values]) - if values else None - ) - - return ( - offsets, - columns_from_pylibcudf_table(grouped_keys), - ( - columns_from_pylibcudf_table(grouped_values) - if grouped_values is not None else [] - ), - ) - - def aggregate(self, values, aggregations): - """ - Parameters - ---------- - values : Frame - aggregations - A dict mapping column names in `Frame` to a list of aggregations - to perform on that column - - Each aggregation may be specified as: - - a string (e.g., "max") - - a lambda/function - - Returns - ------- - Frame of aggregated values - """ - included_aggregations = [] - column_included = [] - requests = [] - for i, (col, aggs) in enumerate(zip(values, aggregations)): - valid_aggregations = get_valid_aggregation(col.dtype) - included_aggregations_i = [] - col_aggregations = [] - for agg in aggs: - str_agg = str(agg) - if ( - is_string_dtype(col) - and agg not in _STRING_AGGS - and - ( - str_agg in {"cumsum", "cummin", "cummax"} - or not ( - any(a in str_agg for a in { - "count", - "max", - "min", - "first", - "last", - "nunique", - "unique", - "nth" - }) - or (agg is list) - ) - ) - ): - raise TypeError( - f"function is not supported for this dtype: {agg}" - ) - elif ( - _is_categorical_dtype(col) - and agg not in _CATEGORICAL_AGGS - and ( - str_agg in {"cumsum", "cummin", "cummax"} - or - not ( - any(a in str_agg for a in {"count", "max", "min", "unique"}) - ) - ) - ): - raise TypeError( - f"{col.dtype} type does not support {agg} operations" - ) - - agg_obj = make_aggregation(agg) - if valid_aggregations == "ALL" or agg_obj.kind in valid_aggregations: - included_aggregations_i.append((agg, agg_obj.kind)) - col_aggregations.append(agg_obj.c_obj) - included_aggregations.append(included_aggregations_i) - if col_aggregations: - requests.append(pylibcudf.groupby.GroupByRequest( - col.to_pylibcudf(mode="read"), col_aggregations - )) - column_included.append(i) - - if not requests and any(len(v) > 0 for v in aggregations): - raise DataError("All requested aggregations are unsupported.") - - keys, results = self._groupby.scan(requests) if \ - _is_all_scan_aggregate(aggregations) else self._groupby.aggregate(requests) - - result_columns = [[] for _ in range(len(values))] - for i, result in zip(column_included, results): - result_columns[i] = columns_from_pylibcudf_table(result) - - return result_columns, columns_from_pylibcudf_table(keys), included_aggregations - - def shift(self, list values, int periods, list fill_values): - keys, shifts = self._groupby.shift( - pylibcudf.table.Table([c.to_pylibcudf(mode="read") for c in values]), - [periods] * len(values), - [ - ( as_device_scalar(val, dtype=col.dtype)).c_value - for val, col in zip(fill_values, values) - ], - ) - - return columns_from_pylibcudf_table(shifts), columns_from_pylibcudf_table(keys) - - def replace_nulls(self, list values, object method): - _, replaced = self._groupby.replace_nulls( - pylibcudf.table.Table([c.to_pylibcudf(mode="read") for c in values]), - [ - pylibcudf.replace.ReplacePolicy.PRECEDING - if method == 'ffill' else pylibcudf.replace.ReplacePolicy.FOLLOWING - ] * len(values), - ) - - return columns_from_pylibcudf_table(replaced) - - -_GROUPBY_SCANS = {"cumcount", "cumsum", "cummin", "cummax", "cumprod", "rank"} - - -def _is_all_scan_aggregate(all_aggs): - """ - Returns true if all are scan aggregations. - Raises - ------ - NotImplementedError - If both reduction aggregations and scan aggregations are present. - """ - - def get_name(agg): - return agg.__name__ if callable(agg) else agg - - all_scan = all( - get_name(agg_name) in _GROUPBY_SCANS for aggs in all_aggs - for agg_name in aggs - ) - any_scan = any( - get_name(agg_name) in _GROUPBY_SCANS for aggs in all_aggs - for agg_name in aggs - ) - - if not all_scan and any_scan: - raise NotImplementedError( - "Cannot perform both aggregation and scan in one operation" - ) - return all_scan and any_scan diff --git a/python/cudf/cudf/core/_base_index.py b/python/cudf/cudf/core/_base_index.py index f4543bc6156..c2f3c782d10 100644 --- a/python/cudf/cudf/core/_base_index.py +++ b/python/cudf/cudf/core/_base_index.py @@ -1447,7 +1447,7 @@ def _union(self, other, sort=None): other_df["order"] = other_df.index res = self_df.merge(other_df, on=[0], how="outer") res = res.sort_values( - by=res._data.to_pandas_index()[1:], ignore_index=True + by=res._data.to_pandas_index[1:], ignore_index=True ) union_result = cudf.core.index._index_from_data({0: res._data[0]}) diff --git a/python/cudf/cudf/core/_internals/aggregation.py b/python/cudf/cudf/core/_internals/aggregation.py index fe8ea5a947a..1d21d34b1bf 100644 --- a/python/cudf/cudf/core/_internals/aggregation.py +++ b/python/cudf/cudf/core/_internals/aggregation.py @@ -29,11 +29,11 @@ class Aggregation: def __init__(self, agg: plc.aggregation.Aggregation) -> None: - self.c_obj = agg + self.plc_obj = agg @property def kind(self) -> str: - name = self.c_obj.kind().name + name = self.plc_obj.kind().name return _agg_name_map.get(name, name) @classmethod diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index cccafaeba88..75b9070b53f 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -1605,7 +1605,7 @@ def scan(self, scan_op: str, inclusive: bool, **kwargs) -> Self: return type(self).from_pylibcudf( # type: ignore[return-value] plc.reduce.scan( self.to_pylibcudf(mode="read"), - aggregation.make_aggregation(scan_op, kwargs).c_obj, + aggregation.make_aggregation(scan_op, kwargs).plc_obj, plc.reduce.ScanType.INCLUSIVE if inclusive else plc.reduce.ScanType.EXCLUSIVE, @@ -1637,7 +1637,7 @@ def reduce(self, reduction_op: str, dtype=None, **kwargs) -> ScalarLike: with acquire_spill_lock(): plc_scalar = plc.reduce.reduce( self.to_pylibcudf(mode="read"), - aggregation.make_aggregation(reduction_op, kwargs).c_obj, + aggregation.make_aggregation(reduction_op, kwargs).plc_obj, dtype_to_pylibcudf_type(col_dtype), ) result_col = type(self).from_pylibcudf( diff --git a/python/cudf/cudf/core/column_accessor.py b/python/cudf/cudf/core/column_accessor.py index e4fd82e819b..aaf7d071dff 100644 --- a/python/cudf/cudf/core/column_accessor.py +++ b/python/cudf/cudf/core/column_accessor.py @@ -207,11 +207,16 @@ def _from_columns_like_self( @property def level_names(self) -> tuple[abc.Hashable, ...]: + if self.is_cached("to_pandas_index"): + return self.to_pandas_index.names if self._level_names is None or len(self._level_names) == 0: return tuple((None,) * max(1, self.nlevels)) else: return self._level_names + def is_cached(self, attr_name: str) -> bool: + return attr_name in self.__dict__ + @property def nlevels(self) -> int: if len(self) == 0: @@ -262,7 +267,12 @@ def _clear_cache(self, old_ncols: int, new_ncols: int) -> None: new_ncols: int len(self) after self._data was modified """ - cached_properties = ("columns", "names", "_grouped_data") + cached_properties = ( + "columns", + "names", + "_grouped_data", + "to_pandas_index", + ) for attr in cached_properties: try: self.__delattr__(attr) @@ -276,6 +286,7 @@ def _clear_cache(self, old_ncols: int, new_ncols: int) -> None: except AttributeError: pass + @cached_property def to_pandas_index(self) -> pd.Index: """Convert the keys of the ColumnAccessor to a Pandas Index object.""" if self.multiindex and len(self.level_names) > 0: @@ -726,10 +737,10 @@ def droplevel(self, level: int) -> None: } new_ncols = len(self) self._level_names = ( - self._level_names[:level] + self._level_names[level + 1 :] + self.level_names[:level] + self.level_names[level + 1 :] ) - if len(self._level_names) == 1: + if len(self.level_names) == 1: # can't use nlevels, as it depends on multiindex self.multiindex = False self._clear_cache(old_ncols, new_ncols) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index e66e4f41642..3334b57ce1b 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -961,7 +961,7 @@ def _init_from_series_list(self, data, columns, index): warnings.simplefilter("ignore", FutureWarning) concat_df = cudf.concat(data, axis=1) - cols = concat_df._data.to_pandas_index() + cols = concat_df._data.to_pandas_index if cols.dtype == "object": concat_df.columns = cols.astype("str") @@ -2092,7 +2092,7 @@ def _make_operands_and_index_for_binop( equal_columns = True elif isinstance(other, Series): if ( - not (self_pd_columns := self._data.to_pandas_index()).equals( + not (self_pd_columns := self._data.to_pandas_index).equals( other_pd_index := other.index.to_pandas() ) and not can_reindex @@ -2117,8 +2117,8 @@ def _make_operands_and_index_for_binop( and fn in cudf.utils.utils._EQUALITY_OPS and ( not self.index.equals(other.index) - or not self._data.to_pandas_index().equals( - other._data.to_pandas_index() + or not self._data.to_pandas_index.equals( + other._data.to_pandas_index ) ) ): @@ -2162,11 +2162,11 @@ def _make_operands_and_index_for_binop( if not equal_columns: if isinstance(other, DataFrame): - column_names_list = self._data.to_pandas_index().join( - other._data.to_pandas_index(), how="outer" + column_names_list = self._data.to_pandas_index.join( + other._data.to_pandas_index, how="outer" ) elif isinstance(other, Series): - column_names_list = self._data.to_pandas_index().join( + column_names_list = self._data.to_pandas_index.join( other.index.to_pandas(), how="outer" ) else: @@ -2626,8 +2626,8 @@ def update( if not isinstance(other, DataFrame): other = DataFrame(other) - self_cols = self._data.to_pandas_index() - if not self_cols.equals(other._data.to_pandas_index()): + self_cols = self._data.to_pandas_index + if not self_cols.equals(other._data.to_pandas_index): other = other.reindex(self_cols, axis=1) if not self.index.equals(other.index): other = other.reindex(self.index, axis=0) @@ -2663,7 +2663,7 @@ def __iter__(self): def __contains__(self, item): # This must check against containment in the pandas Index and not # self._column_names to handle NA, None, nan, etc. correctly. - return item in self._data.to_pandas_index() + return item in self._data.to_pandas_index @_performance_tracking def items(self): @@ -2700,14 +2700,14 @@ def at(self): @property # type: ignore @_external_only_api( - "Use _column_names instead, or _data.to_pandas_index() if a pandas " + "Use _column_names instead, or _data.to_pandas_index if a pandas " "index is absolutely necessary. For checking if the columns are a " "MultiIndex, use _data.multiindex." ) @_performance_tracking def columns(self): """Returns a tuple of columns""" - return self._data.to_pandas_index() + return self._data.to_pandas_index @columns.setter # type: ignore @_performance_tracking @@ -2916,7 +2916,7 @@ def reindex( df = self else: columns = cudf.Index(columns) - intersection = self._data.to_pandas_index().intersection( + intersection = self._data.to_pandas_index.intersection( columns.to_pandas() ) df = self.loc[:, intersection] @@ -3430,7 +3430,7 @@ def axes(self): Index(['key', 'k2', 'val', 'temp'], dtype='object')] """ - return [self.index, self._data.to_pandas_index()] + return [self.index, self._data.to_pandas_index] def diff(self, periods=1, axis=0): """ @@ -4129,7 +4129,7 @@ def transpose(self): Not supporting *copy* because default and only behavior is copy=True """ - index = self._data.to_pandas_index() + index = self._data.to_pandas_index columns = self.index.copy(deep=False) if self._num_columns == 0 or self._num_rows == 0: return DataFrame(index=index, columns=columns) @@ -5535,7 +5535,7 @@ def to_pandas( } out_df = pd.DataFrame(out_data, index=out_index) - out_df.columns = self._data.to_pandas_index() + out_df.columns = self._data.to_pandas_index return out_df @@ -6487,7 +6487,7 @@ def _reduce( source = self._get_columns_by_label(numeric_cols) if source.empty: return Series( - index=self._data.to_pandas_index()[:0] + index=self._data.to_pandas_index[:0] if axis == 0 else source.index, dtype="float64", @@ -6540,7 +6540,7 @@ def _reduce( "Columns must all have the same dtype to " f"perform {op=} with {axis=}" ) - pd_index = source._data.to_pandas_index() + pd_index = source._data.to_pandas_index if source._data.multiindex: idx = MultiIndex.from_pandas(pd_index) else: @@ -7242,7 +7242,7 @@ def stack( ] has_unnamed_levels = len(unnamed_levels_indices) > 0 - column_name_idx = self._data.to_pandas_index() + column_name_idx = self._data.to_pandas_index # Construct new index from the levels specified by `level` named_levels = pd.MultiIndex.from_arrays( [column_name_idx.get_level_values(lv) for lv in level_indices] @@ -7432,7 +7432,7 @@ def cov(self, min_periods=None, ddof: int = 1, numeric_only: bool = False): ) cov = cupy.cov(self.values, ddof=ddof, rowvar=False) - cols = self._data.to_pandas_index() + cols = self._data.to_pandas_index df = DataFrame(cupy.asfortranarray(cov), index=cols) df._set_columns_like(self._data) return df @@ -7475,7 +7475,7 @@ def corr( ) corr = cupy.corrcoef(values, rowvar=False) - cols = self._data.to_pandas_index() + cols = self._data.to_pandas_index df = DataFrame(cupy.asfortranarray(corr), index=cols) df._set_columns_like(self._data) return df @@ -7544,7 +7544,7 @@ def keys(self): >>> df.keys() Index([0, 1, 2, 3], dtype='int64') """ - return self._data.to_pandas_index() + return self._data.to_pandas_index def itertuples(self, index=True, name="Pandas"): """ @@ -7778,7 +7778,7 @@ def nunique(self, axis=0, dropna: bool = True) -> Series: raise NotImplementedError("axis parameter is not supported yet.") counts = [col.distinct_count(dropna=dropna) for col in self._columns] return self._constructor_sliced( - counts, index=self._data.to_pandas_index() + counts, index=self._data.to_pandas_index ) def _sample_axis_1( diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index 6cd8e11695f..be3cc410174 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -4,9 +4,10 @@ import copy import itertools import textwrap +import types import warnings from collections import abc -from functools import cached_property +from functools import cached_property, singledispatch from typing import TYPE_CHECKING, Any, Literal import cupy as cp @@ -18,17 +19,27 @@ import cudf import cudf.core._internals from cudf import _lib as libcudf -from cudf._lib import groupby as libgroupby from cudf._lib.types import size_type_dtype from cudf.api.extensions import no_default -from cudf.api.types import is_list_like, is_numeric_dtype +from cudf.api.types import ( + is_list_like, + is_numeric_dtype, + is_string_dtype, +) from cudf.core._compat import PANDAS_LT_300 -from cudf.core._internals import sorting +from cudf.core._internals import aggregation, sorting from cudf.core.abc import Serializable from cudf.core.buffer import acquire_spill_lock -from cudf.core.column.column import ColumnBase, StructDtype, as_column +from cudf.core.column.column import ColumnBase, as_column from cudf.core.column_accessor import ColumnAccessor from cudf.core.copy_types import GatherMap +from cudf.core.dtypes import ( + CategoricalDtype, + DecimalDtype, + IntervalDtype, + ListDtype, + StructDtype, +) from cudf.core.join._join_helpers import _match_join_keys from cudf.core.mixins import Reducible, Scannable from cudf.core.multiindex import MultiIndex @@ -37,7 +48,7 @@ from cudf.utils.utils import GetAttrGetItemMixin if TYPE_CHECKING: - from collections.abc import Iterable + from collections.abc import Generator, Iterable from cudf._typing import ( AggType, @@ -46,6 +57,152 @@ ScalarLike, ) +# The sets below define the possible aggregations that can be performed on +# different dtypes. These strings must be elements of the AggregationKind enum. +# The libcudf infrastructure exists for "COLLECT" support on +# categoricals, but the dtype support in python does not. +_CATEGORICAL_AGGS = {"COUNT", "NUNIQUE", "SIZE", "UNIQUE"} +_STRING_AGGS = { + "COLLECT", + "COUNT", + "MAX", + "MIN", + "NTH", + "NUNIQUE", + "SIZE", + "UNIQUE", +} +_LIST_AGGS = {"COLLECT"} +_STRUCT_AGGS = {"COLLECT", "CORRELATION", "COVARIANCE"} +_INTERVAL_AGGS = {"COLLECT"} +_DECIMAL_AGGS = { + "ARGMIN", + "ARGMAX", + "COLLECT", + "COUNT", + "MAX", + "MIN", + "NTH", + "NUNIQUE", + "SUM", +} + + +@singledispatch +def get_valid_aggregation(dtype): + if is_string_dtype(dtype): + return _STRING_AGGS + return "ALL" + + +@get_valid_aggregation.register +def _(dtype: ListDtype): + return _LIST_AGGS + + +@get_valid_aggregation.register +def _(dtype: CategoricalDtype): + return _CATEGORICAL_AGGS + + +@get_valid_aggregation.register +def _(dtype: ListDtype): + return _LIST_AGGS + + +@get_valid_aggregation.register +def _(dtype: StructDtype): + return _STRUCT_AGGS + + +@get_valid_aggregation.register +def _(dtype: IntervalDtype): + return _INTERVAL_AGGS + + +@get_valid_aggregation.register +def _(dtype: DecimalDtype): + return _DECIMAL_AGGS + + +@singledispatch +def _is_unsupported_agg_for_type(dtype, str_agg: str) -> bool: + return False + + +@_is_unsupported_agg_for_type.register +def _(dtype: np.dtype, str_agg: str) -> bool: + # string specifically + cumulative_agg = str_agg in {"cumsum", "cummin", "cummax"} + basic_agg = any( + a in str_agg + for a in ( + "count", + "max", + "min", + "first", + "last", + "nunique", + "unique", + "nth", + ) + ) + return ( + dtype.kind == "O" + and str_agg not in _STRING_AGGS + and (cumulative_agg or not (basic_agg or str_agg == "")) + ) + + +@_is_unsupported_agg_for_type.register +def _(dtype: CategoricalDtype, str_agg: str) -> bool: + cumulative_agg = str_agg in {"cumsum", "cummin", "cummax"} + not_basic_agg = not any( + a in str_agg for a in ("count", "max", "min", "unique") + ) + return str_agg not in _CATEGORICAL_AGGS and ( + cumulative_agg or not_basic_agg + ) + + +def _is_all_scan_aggregate(all_aggs: list[list[str]]) -> bool: + """ + Returns True if all are scan aggregations. + + Raises + ------ + NotImplementedError + If both reduction aggregations and scan aggregations are present. + """ + groupby_scans = { + "cumcount", + "cumsum", + "cummin", + "cummax", + "cumprod", + "rank", + } + + def get_name(agg): + return agg.__name__ if callable(agg) else agg + + all_scan = all( + get_name(agg_name) in groupby_scans + for aggs in all_aggs + for agg_name in aggs + ) + any_scan = any( + get_name(agg_name) in groupby_scans + for aggs in all_aggs + for agg_name in aggs + ) + + if not all_scan and any_scan: + raise NotImplementedError( + "Cannot perform both aggregation and scan in one operation" + ) + return all_scan and any_scan + def _deprecate_collect(): warnings.warn( @@ -423,7 +580,7 @@ def indices(self) -> dict[ScalarLike, cp.ndarray]: >>> df.groupby(by=["a"]).indices {10: array([0, 1]), 40: array([2])} """ - offsets, group_keys, (indices,) = self._groupby.groups( + offsets, group_keys, (indices,) = self._groups( [ cudf.core.column.as_column( range(len(self.obj)), dtype=size_type_dtype @@ -582,11 +739,137 @@ def rank(x): return result @cached_property - def _groupby(self): - return libgroupby.GroupBy( - [*self.grouping.keys._columns], dropna=self._dropna + def _groupby(self) -> types.SimpleNamespace: + with acquire_spill_lock() as spill_lock: + plc_groupby = plc.groupby.GroupBy( + plc.Table( + [ + col.to_pylibcudf(mode="read") + for col in self.grouping.keys._columns + ] + ), + plc.types.NullPolicy.EXCLUDE + if self._dropna + else plc.types.NullPolicy.INCLUDE, + ) + # Do we need this because we just check _spill_locks in test_spillable_df_groupby? + return types.SimpleNamespace( + plc_groupby=plc_groupby, _spill_locks=spill_lock + ) + + def _groups( + self, values: Iterable[ColumnBase] + ) -> tuple[list[int], list[ColumnBase], list[ColumnBase]]: + plc_columns = [col.to_pylibcudf(mode="read") for col in values] + if not plc_columns: + plc_table = None + else: + plc_table = plc.Table(plc_columns) + offsets, grouped_keys, grouped_values = ( + self._groupby.plc_groupby.get_groups(plc_table) + ) + + return ( + offsets, + [ColumnBase.from_pylibcudf(col) for col in grouped_keys.columns()], + ( + [ + ColumnBase.from_pylibcudf(col) + for col in grouped_values.columns() + ] + if grouped_values is not None + else [] + ), + ) + + def _aggregate( + self, values: tuple[ColumnBase, ...], aggregations + ) -> tuple[ + list[list[ColumnBase]], + list[ColumnBase], + list[list[tuple[str, str]]], + ]: + included_aggregations = [] + column_included = [] + requests = [] + result_columns: list[list[ColumnBase]] = [] + for i, (col, aggs) in enumerate(zip(values, aggregations)): + valid_aggregations = get_valid_aggregation(col.dtype) + included_aggregations_i = [] + col_aggregations = [] + for agg in aggs: + str_agg = str(agg) + if _is_unsupported_agg_for_type(col.dtype, str_agg): + raise TypeError( + f"{col.dtype} type does not support {agg} operations" + ) + agg_obj = aggregation.make_aggregation(agg) + if ( + valid_aggregations == "ALL" + or agg_obj.kind in valid_aggregations + ): + included_aggregations_i.append((agg, agg_obj.kind)) + col_aggregations.append(agg_obj.plc_obj) + included_aggregations.append(included_aggregations_i) + result_columns.append([]) + if col_aggregations: + requests.append( + plc.groupby.GroupByRequest( + col.to_pylibcudf(mode="read"), col_aggregations + ) + ) + column_included.append(i) + + if not requests and any(len(v) > 0 for v in aggregations): + raise pd.errors.DataError( + "All requested aggregations are unsupported." + ) + + keys, results = ( + self._groupby.plc_groupby.scan(requests) + if _is_all_scan_aggregate(aggregations) + else self._groupby.plc_groupby.aggregate(requests) ) + for i, result in zip(column_included, results): + result_columns[i] = [ + ColumnBase.from_pylibcudf(col) for col in result.columns() + ] + + return ( + result_columns, + [ColumnBase.from_pylibcudf(key) for key in keys.columns()], + included_aggregations, + ) + + def _shift( + self, values: tuple[ColumnBase, ...], periods: int, fill_values: list + ) -> Generator[ColumnBase]: + _, shifts = self._groupby.plc_groupby.shift( + plc.table.Table([col.to_pylibcudf(mode="read") for col in values]), + [periods] * len(values), + [ + cudf.Scalar(val, dtype=col.dtype).device_value.c_value + for val, col in zip(fill_values, values) + ], + ) + return (ColumnBase.from_pylibcudf(col) for col in shifts.columns()) + + def _replace_nulls( + self, values: tuple[ColumnBase, ...], method: str + ) -> Generator[ColumnBase]: + _, replaced = self._groupby.plc_groupby.replace_nulls( + plc.Table([col.to_pylibcudf(mode="read") for col in values]), + [ + plc.replace.ReplacePolicy.PRECEDING + if method == "ffill" + else plc.replace.ReplacePolicy.FOLLOWING + ] + * len(values), + ) + + return (ColumnBase.from_pylibcudf(col) for col in replaced.columns()) + @_performance_tracking def agg(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs): """ @@ -702,7 +985,7 @@ def agg(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs): result_columns, grouped_key_cols, included_aggregations, - ) = self._groupby.aggregate(columns, normalized_aggs) + ) = self._aggregate(columns, normalized_aggs) result_index = self.grouping.keys._from_columns_like_self( grouped_key_cols, @@ -761,7 +1044,7 @@ def agg(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs): else: if cudf.get_option( "mode.pandas_compatible" - ) and not libgroupby._is_all_scan_aggregate(normalized_aggs): + ) and not _is_all_scan_aggregate(normalized_aggs): # Even with `sort=False`, pandas guarantees that # groupby preserves the order of rows within each group. left_cols = list(self.grouping.keys.drop_duplicates()._columns) @@ -810,7 +1093,7 @@ def agg(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs): if not self._as_index: result = result.reset_index() - if libgroupby._is_all_scan_aggregate(normalized_aggs): + if _is_all_scan_aggregate(normalized_aggs): # Scan aggregations return rows in original index order return self._mimic_pandas_order(result) @@ -920,7 +1203,7 @@ def _head_tail(self, n, *, take_head: bool, preserve_order: bool): # Can't use _mimic_pandas_order because we need to # subsample the gather map from the full input ordering, # rather than permuting the gather map of the output. - _, _, (ordering,) = self._groupby.groups( + _, _, (ordering,) = self._groups( [as_column(range(0, len(self.obj)))] ) # Invert permutation from original order to groups on the @@ -1312,8 +1595,8 @@ def deserialize(cls, header, frames): return cls(obj, grouping, **kwargs) def _grouped(self, *, include_groups: bool = True): - offsets, grouped_key_cols, grouped_value_cols = self._groupby.groups( - [*self.obj.index._columns, *self.obj._columns] + offsets, grouped_key_cols, grouped_value_cols = self._groups( + itertools.chain(self.obj.index._columns, self.obj._columns) ) grouped_keys = cudf.core.index._index_from_data( dict(enumerate(grouped_key_cols)) @@ -1945,7 +2228,7 @@ def transform( "Currently, `transform()` supports only aggregations." ) from e # If the aggregation is a scan, don't broadcast - if libgroupby._is_all_scan_aggregate([[func]]): + if _is_all_scan_aggregate([[func]]): if len(result) != len(self.obj): raise AssertionError( "Unexpected result length for scan transform" @@ -2409,7 +2692,7 @@ def _scan_fill(self, method: str, limit: int) -> DataFrameOrSeries: dict( zip( values._column_names, - self._groupby.replace_nulls([*values._columns], method), + self._replace_nulls(values._columns, method), ) ) ) @@ -2513,7 +2796,7 @@ def fillna( @_performance_tracking def shift( self, - periods=1, + periods: int = 1, freq=None, axis=0, fill_value=None, @@ -2560,7 +2843,7 @@ def shift( if freq is not None: raise NotImplementedError("Parameter freq is unsupported.") - if not axis == 0: + if axis != 0: raise NotImplementedError("Only axis=0 is supported.") if suffix is not None: @@ -2568,20 +2851,18 @@ def shift( values = self.grouping.values if is_list_like(fill_value): - if len(fill_value) != len(values._data): + if len(fill_value) != values._num_columns: raise ValueError( "Mismatched number of columns and values to fill." ) else: - fill_value = [fill_value] * len(values._data) + fill_value = [fill_value] * values._num_columns result = self.obj.__class__._from_data( dict( zip( values._column_names, - self._groupby.shift( - [*values._columns], periods, fill_value - )[0], + self._shift(values._columns, periods, fill_value), ) ) ) @@ -2680,9 +2961,7 @@ def _mimic_pandas_order( # result coming back from libcudf has null_count few rows than # the input, so we must produce an ordering from the full # input range. - _, _, (ordering,) = self._groupby.groups( - [as_column(range(0, len(self.obj)))] - ) + _, _, (ordering,) = self._groups([as_column(range(0, len(self.obj)))]) if self._dropna and any( c.has_nulls(include_nan=True) > 0 for c in self.grouping._key_columns @@ -3087,7 +3366,7 @@ def agg(self, func, *args, engine=None, engine_kwargs=None, **kwargs): # drop the first level if we have a multiindex if result._data.nlevels > 1: - result.columns = result._data.to_pandas_index().droplevel(0) + result.columns = result._data.to_pandas_index.droplevel(0) return result diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index 72bb85821fa..6854cb02aa5 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -1106,13 +1106,11 @@ def dot(self, other, reflect=False): lhs = self.reindex(index=common, copy=False).values rhs = other.reindex(index=common, copy=False).values if isinstance(other, cudf.DataFrame): - result_index = other._data.to_pandas_index() + result_index = other._data.to_pandas_index elif isinstance(self, cudf.DataFrame) and isinstance( other, (cudf.Series, cudf.DataFrame) ): - common = self._data.to_pandas_index().union( - other.index.to_pandas() - ) + common = self._data.to_pandas_index.union(other.index.to_pandas()) if len(common) > self._num_columns or len(common) > len( other.index ): @@ -1124,7 +1122,7 @@ def dot(self, other, reflect=False): rhs = other.reindex(index=common, copy=False).values lhs = lhs.values if isinstance(other, cudf.DataFrame): - result_cols = other._data.to_pandas_index() + result_cols = other._data.to_pandas_index elif isinstance( other, (cp.ndarray, np.ndarray) @@ -2244,7 +2242,7 @@ def truncate(self, before=None, after=None, axis=0, copy=True): if not copy: raise ValueError("Truncating with copy=False is not supported.") axis = self._get_axis_from_axis_arg(axis) - ax = self.index if axis == 0 else self._data.to_pandas_index() + ax = self.index if axis == 0 else self._data.to_pandas_index if not ax.is_monotonic_increasing and not ax.is_monotonic_decreasing: raise ValueError("truncate requires a sorted index") @@ -6770,7 +6768,7 @@ def _drop_rows_by_labels( return obj.__class__._from_data( join_res.iloc[:, idx_nlv:]._data, index=midx, - columns=obj._data.to_pandas_index(), + columns=obj._data.to_pandas_index, ) else: diff --git a/python/cudf/cudf/core/multiindex.py b/python/cudf/cudf/core/multiindex.py index d2afe643dc4..1e613e49ffc 100644 --- a/python/cudf/cudf/core/multiindex.py +++ b/python/cudf/cudf/core/multiindex.py @@ -1123,7 +1123,7 @@ def _concat(cls, objs) -> Self: # TODO: Verify if this is really necessary or if we can rely on # DataFrame._concat. if len(source_data) > 1: - colnames = source_data[0]._data.to_pandas_index() + colnames = source_data[0]._data.to_pandas_index for obj in source_data[1:]: obj.columns = colnames @@ -2068,7 +2068,7 @@ def _union(self, other, sort=None) -> Self: result_df = self_df.merge(other_df, on=col_names, how="outer") result_df = result_df.sort_values( - by=result_df._data.to_pandas_index()[self.nlevels :], + by=result_df._data.to_pandas_index[self.nlevels :], ignore_index=True, ) diff --git a/python/cudf/cudf/core/reshape.py b/python/cudf/cudf/core/reshape.py index 3ab6ed306b6..0abd42d4d4e 100644 --- a/python/cudf/cudf/core/reshape.py +++ b/python/cudf/cudf/core/reshape.py @@ -431,8 +431,9 @@ def concat( result_columns = ( objs[0] - ._data.to_pandas_index() - .append([obj._data.to_pandas_index() for obj in objs[1:]]) + ._data.to_pandas_index.append( + [obj._data.to_pandas_index for obj in objs[1:]] + ) .unique() ) @@ -689,7 +690,7 @@ def _tile(A, reps): if not value_vars: # TODO: Use frame._data.label_dtype when it's more consistently set var_data = cudf.Series( - value_vars, dtype=frame._data.to_pandas_index().dtype + value_vars, dtype=frame._data.to_pandas_index.dtype ) else: var_data = ( @@ -1273,7 +1274,7 @@ def unstack(df, level, fill_value=None, sort: bool = True): res = df.T.stack(future_stack=False) # Result's index is a multiindex res.index.names = ( - tuple(df._data.to_pandas_index().names) + df.index.names + tuple(df._data.to_pandas_index.names) + df.index.names ) return res else: diff --git a/python/cudf/cudf/core/window/rolling.py b/python/cudf/cudf/core/window/rolling.py index a580c35ccbf..2f8a6d9e5e7 100644 --- a/python/cudf/cudf/core/window/rolling.py +++ b/python/cudf/cudf/core/window/rolling.py @@ -315,7 +315,7 @@ def _apply_agg_column(self, source_column, agg_name): {"dtype": source_column.dtype} if callable(agg_name) else self.agg_params, - ).c_obj, + ).plc_obj, ) ) diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index e0c9e535e6f..4be556e1d67 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -54,6 +54,22 @@ def _get_cudf_schema_element_from_dtype( return lib_type, child_types +def _to_plc_compression( + compression: Literal["infer", "gzip", "bz2", "zip", "xz", None], +) -> plc.io.types.CompressionType: + if compression is not None: + if compression == "gzip": + return plc.io.types.CompressionType.GZIP + elif compression == "bz2": + return plc.io.types.CompressionType.BZIP2 + elif compression == "zip": + return plc.io.types.CompressionType.ZIP + else: + return plc.io.types.CompressionType.AUTO + else: + return plc.io.types.CompressionType.NONE + + @ioutils.doc_read_json() def read_json( path_or_buf, @@ -115,17 +131,7 @@ def read_json( if isinstance(source, str) and not os.path.isfile(source): filepaths_or_buffers[idx] = source.encode() - if compression is not None: - if compression == "gzip": - c_compression = plc.io.types.CompressionType.GZIP - elif compression == "bz2": - c_compression = plc.io.types.CompressionType.BZIP2 - elif compression == "zip": - c_compression = plc.io.types.CompressionType.ZIP - else: - c_compression = plc.io.types.CompressionType.AUTO - else: - c_compression = plc.io.types.CompressionType.NONE + c_compression = _to_plc_compression(compression) if on_bad_lines.lower() == "error": c_on_bad_lines = plc.io.types.JSONRecoveryMode.FAIL @@ -161,13 +167,15 @@ def read_json( if cudf.get_option("io.json.low_memory") and lines: res_cols, res_col_names, res_child_names = ( plc.io.json.chunked_read_json( - plc.io.SourceInfo(filepaths_or_buffers), - processed_dtypes, - c_compression, - keep_quotes=keep_quotes, - mixed_types_as_string=mixed_types_as_string, - prune_columns=prune_columns, - recovery_mode=c_on_bad_lines, + plc.io.json._setup_json_reader_options( + plc.io.SourceInfo(filepaths_or_buffers), + processed_dtypes, + c_compression, + keep_quotes=keep_quotes, + mixed_types_as_string=mixed_types_as_string, + prune_columns=prune_columns, + recovery_mode=c_on_bad_lines, + ) ) ) df = cudf.DataFrame._from_data( @@ -181,19 +189,23 @@ def read_json( return df else: table_w_meta = plc.io.json.read_json( - plc.io.SourceInfo(filepaths_or_buffers), - processed_dtypes, - c_compression, - lines, - byte_range_offset=byte_range[0] - if byte_range is not None - else 0, - byte_range_size=byte_range[1] if byte_range is not None else 0, - keep_quotes=keep_quotes, - mixed_types_as_string=mixed_types_as_string, - prune_columns=prune_columns, - recovery_mode=c_on_bad_lines, - extra_parameters=kwargs, + plc.io.json._setup_json_reader_options( + plc.io.SourceInfo(filepaths_or_buffers), + processed_dtypes, + c_compression, + lines, + byte_range_offset=byte_range[0] + if byte_range is not None + else 0, + byte_range_size=byte_range[1] + if byte_range is not None + else 0, + keep_quotes=keep_quotes, + mixed_types_as_string=mixed_types_as_string, + prune_columns=prune_columns, + recovery_mode=c_on_bad_lines, + extra_parameters=kwargs, + ) ) df = cudf.DataFrame._from_data( @@ -285,6 +297,7 @@ def _plc_write_json( include_nulls: bool = True, lines: bool = False, rows_per_chunk: int = 1024 * 64, # 64K rows + compression: Literal["infer", "gzip", "bz2", "zip", "xz", None] = None, ) -> None: try: tbl_w_meta = plc.io.TableWithMetadata( @@ -301,6 +314,7 @@ def _plc_write_json( .na_rep(na_rep) .include_nulls(include_nulls) .lines(lines) + .compression(_to_plc_compression(compression)) .build() ) if rows_per_chunk != np.iinfo(np.int32).max: diff --git a/python/cudf/cudf/io/orc.py b/python/cudf/cudf/io/orc.py index 5616413b7e4..5103137bc77 100644 --- a/python/cudf/cudf/io/orc.py +++ b/python/cudf/cudf/io/orc.py @@ -240,15 +240,27 @@ def read_orc( elif not isinstance(num_rows, int) or num_rows < -1: raise TypeError("num_rows must be an int >= -1") - tbl_w_meta = plc.io.orc.read_orc( - plc.io.SourceInfo(filepaths_or_buffers), - columns, - stripes, - skiprows, - num_rows, - use_index, - dtype_to_pylibcudf_type(cudf.dtype(timestamp_type)), + options = ( + plc.io.orc.OrcReaderOptions.builder( + plc.io.types.SourceInfo(filepaths_or_buffers) + ) + .use_index(use_index) + .build() ) + if num_rows >= 0: + options.set_num_rows(num_rows) + if skiprows >= 0: + options.set_skip_rows(skiprows) + if stripes is not None and len(stripes) > 0: + options.set_stripes(stripes) + if timestamp_type is not None: + options.set_timestamp_type( + dtype_to_pylibcudf_type(cudf.dtype(timestamp_type)) + ) + if columns is not None and len(columns) > 0: + options.set_columns(columns) + + tbl_w_meta = plc.io.orc.read_orc(options) if isinstance(columns, list) and len(columns) == 0: # When `columns=[]`, index needs to be diff --git a/python/cudf/cudf/testing/testing.py b/python/cudf/cudf/testing/testing.py index 0b09cf7dc34..a1df2c7d857 100644 --- a/python/cudf/cudf/testing/testing.py +++ b/python/cudf/cudf/testing/testing.py @@ -692,8 +692,8 @@ def assert_frame_equal( ) pd.testing.assert_index_equal( - left._data.to_pandas_index(), - right._data.to_pandas_index(), + left._data.to_pandas_index, + right._data.to_pandas_index, exact=check_column_type, check_names=check_names, check_exact=check_exact, diff --git a/python/cudf/cudf/tests/test_column_accessor.py b/python/cudf/cudf/tests/test_column_accessor.py index 5cef077c18d..27ec4fcd1f3 100644 --- a/python/cudf/cudf/tests/test_column_accessor.py +++ b/python/cudf/cudf/tests/test_column_accessor.py @@ -64,7 +64,7 @@ def test_to_pandas_simple(simple_data): # Index([], dtype='object'), and `integer` for RangeIndex() # to ignore this `inferred_type` comparison, we pass exact=False. assert_eq( - ca.to_pandas_index(), + ca.to_pandas_index, pd.DataFrame( {key: value.values_host for key, value in simple_data.items()} ).columns, @@ -75,7 +75,7 @@ def test_to_pandas_simple(simple_data): def test_to_pandas_multiindex(mi_data): ca = ColumnAccessor(mi_data, multiindex=True) assert_eq( - ca.to_pandas_index(), + ca.to_pandas_index, pd.DataFrame( {key: value.values_host for key, value in mi_data.items()} ).columns, @@ -89,7 +89,7 @@ def test_to_pandas_multiindex_names(): level_names=("foo", "bar"), ) assert_eq( - ca.to_pandas_index(), + ca.to_pandas_index, pd.MultiIndex.from_tuples( (("a", "b"), ("c", "d")), names=("foo", "bar") ), diff --git a/python/cudf/cudf/tests/test_dataframe.py b/python/cudf/cudf/tests/test_dataframe.py index d04fd97dcbd..11a9b398b50 100644 --- a/python/cudf/cudf/tests/test_dataframe.py +++ b/python/cudf/cudf/tests/test_dataframe.py @@ -11193,3 +11193,32 @@ def test_dataframe_init_column(): expect = cudf.DataFrame({"a": s}) actual = cudf.DataFrame._from_arrays(s._column, columns=["a"]) assert_eq(expect, actual) + + +@pytest.mark.parametrize("name", [None, "foo", 1, 1.0]) +def test_dataframe_column_name(name): + df = cudf.DataFrame({"a": [1, 2, 3]}) + pdf = df.to_pandas() + + df.columns.name = name + pdf.columns.name = name + + assert_eq(df, pdf) + assert_eq(df.columns.name, pdf.columns.name) + + +@pytest.mark.parametrize("names", [["abc", "def"], [1, 2], ["abc", 10]]) +def test_dataframe_multiindex_column_names(names): + arrays = [["A", "A", "B", "B"], ["one", "two", "one", "two"]] + tuples = list(zip(*arrays)) + index = pd.MultiIndex.from_tuples(tuples, names=["first", "second"]) + + pdf = pd.DataFrame([[1, 2, 3, 4], [5, 6, 7, 8]], columns=index) + df = cudf.from_pandas(pdf) + + assert_eq(df, pdf) + assert_eq(df.columns.names, pdf.columns.names) + pdf.columns.names = names + df.columns.names = names + assert_eq(df, pdf) + assert_eq(df.columns.names, pdf.columns.names) diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index d8a2528230e..db4f3cd3c9f 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -3960,8 +3960,8 @@ def test_group_by_value_counts_with_count_column(): def test_groupby_internal_groups_empty(gdf): # test that we don't segfault when calling the internal # .groups() method with an empty list: - gb = gdf.groupby("y")._groupby - _, _, grouped_vals = gb.groups([]) + gb = gdf.groupby("y") + _, _, grouped_vals = gb._groups([]) assert grouped_vals == [] diff --git a/python/cudf/cudf/tests/test_json.py b/python/cudf/cudf/tests/test_json.py index aaa8d7d07ee..db34329261f 100644 --- a/python/cudf/cudf/tests/test_json.py +++ b/python/cudf/cudf/tests/test_json.py @@ -1453,3 +1453,12 @@ def test_chunked_json_reader(): with cudf.option_context("io.json.low_memory", True): gdf = cudf.read_json(buf, lines=True) assert_eq(df, gdf) + + +@pytest.mark.parametrize("compression", ["gzip", None]) +def test_roundtrip_compression(compression, tmp_path): + expected = cudf.DataFrame({"a": 1, "b": "2"}) + fle = BytesIO() + expected.to_json(fle, engine="cudf", compression=compression) + result = cudf.read_json(fle, engine="cudf", compression=compression) + assert_eq(result, expected) diff --git a/python/cudf_polars/cudf_polars/callback.py b/python/cudf_polars/cudf_polars/callback.py index 29d3dc4ae79..074096446fd 100644 --- a/python/cudf_polars/cudf_polars/callback.py +++ b/python/cudf_polars/cudf_polars/callback.py @@ -231,7 +231,8 @@ def validate_config_options(config: dict) -> None: executor = config.get("executor", "pylibcudf") if executor == "dask-experimental": unsupported = config.get("executor_options", {}).keys() - { - "max_rows_per_partition" + "max_rows_per_partition", + "parquet_blocksize", } else: unsupported = config.get("executor_options", {}).keys() diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/aggregation.py b/python/cudf_polars/cudf_polars/dsl/expressions/aggregation.py index 624a9bd87ea..2ba483c7b2d 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/aggregation.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/aggregation.py @@ -40,6 +40,7 @@ def __init__( self.dtype = dtype self.name = name self.options = options + self.is_pointwise = False self.children = children if name not in Agg._SUPPORTED: raise NotImplementedError( diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/base.py b/python/cudf_polars/cudf_polars/dsl/expressions/base.py index 4c7ae007070..8ba3f9f407c 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/base.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/base.py @@ -36,9 +36,11 @@ class ExecutionContext(IntEnum): class Expr(Node["Expr"]): """An abstract expression object.""" - __slots__ = ("dtype",) + __slots__ = ("dtype", "is_pointwise") dtype: plc.DataType """Data type of the expression.""" + is_pointwise: bool + """Whether this expression acts pointwise on its inputs.""" # This annotation is needed because of https://github.com/python/mypy/issues/17981 _non_child: ClassVar[tuple[str, ...]] = ("dtype",) """Names of non-child data (not Exprs) for reconstruction.""" @@ -164,6 +166,7 @@ def __init__(self, dtype: plc.DataType, error: str) -> None: self.dtype = dtype self.error = error self.children = () + self.is_pointwise = True class NamedExpr: @@ -243,6 +246,7 @@ class Col(Expr): def __init__(self, dtype: plc.DataType, name: str) -> None: self.dtype = dtype self.name = name + self.is_pointwise = True self.children = () def do_evaluate( @@ -280,6 +284,7 @@ def __init__( self.dtype = dtype self.index = index self.table_ref = table_ref + self.is_pointwise = True self.children = (column,) def do_evaluate( diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/binaryop.py b/python/cudf_polars/cudf_polars/dsl/expressions/binaryop.py index 245bdbefe88..556847b4738 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/binaryop.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/binaryop.py @@ -42,6 +42,7 @@ def __init__( op = BinOp._BOOL_KLEENE_MAPPING.get(op, op) self.op = op self.children = (left, right) + self.is_pointwise = True if not plc.binaryop.is_supported_operation( self.dtype, left.dtype, right.dtype, op ): diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/boolean.py b/python/cudf_polars/cudf_polars/dsl/expressions/boolean.py index 5aa35ead127..d5ca22dd8d5 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/boolean.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/boolean.py @@ -81,6 +81,14 @@ def __init__( self.options = options self.name = name self.children = children + self.is_pointwise = self.name not in ( + BooleanFunction.Name.All, + BooleanFunction.Name.Any, + BooleanFunction.Name.IsDuplicated, + BooleanFunction.Name.IsFirstDistinct, + BooleanFunction.Name.IsLastDistinct, + BooleanFunction.Name.IsUnique, + ) if self.name is BooleanFunction.Name.IsIn and not all( c.dtype == self.children[0].dtype for c in self.children ): diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/datetime.py b/python/cudf_polars/cudf_polars/dsl/expressions/datetime.py index c2dddfd9940..0c3159c73d6 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/datetime.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/datetime.py @@ -114,6 +114,7 @@ def __init__( self.options = options self.name = name self.children = children + self.is_pointwise = True if self.name not in self._COMPONENT_MAP: raise NotImplementedError(f"Temporal function {self.name}") diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/literal.py b/python/cudf_polars/cudf_polars/dsl/expressions/literal.py index 7eba0c110ab..8528e66c69c 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/literal.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/literal.py @@ -38,6 +38,7 @@ def __init__(self, dtype: plc.DataType, value: pa.Scalar[Any]) -> None: assert value.type == plc.interop.to_arrow(dtype) self.value = value self.children = () + self.is_pointwise = True def do_evaluate( self, @@ -65,6 +66,7 @@ def __init__(self, dtype: plc.DataType, value: pl.Series) -> None: data = value.to_arrow() self.value = data.cast(dtypes.downcast_arrow_lists(data.type)) self.children = () + self.is_pointwise = True def get_hashable(self) -> Hashable: """Compute a hash of the column.""" diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/rolling.py b/python/cudf_polars/cudf_polars/dsl/expressions/rolling.py index 48c37d101f4..d4616d5d00a 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/rolling.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/rolling.py @@ -24,6 +24,7 @@ def __init__(self, dtype: plc.DataType, options: Any, agg: Expr) -> None: self.dtype = dtype self.options = options self.children = (agg,) + self.is_pointwise = False raise NotImplementedError("Rolling window not implemented") @@ -35,4 +36,5 @@ def __init__(self, dtype: plc.DataType, options: Any, agg: Expr, *by: Expr) -> N self.dtype = dtype self.options = options self.children = (agg, *by) + self.is_pointwise = False raise NotImplementedError("Grouped rolling window not implemented") diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/selection.py b/python/cudf_polars/cudf_polars/dsl/expressions/selection.py index 12326740f74..93ecd026eaf 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/selection.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/selection.py @@ -30,6 +30,7 @@ class Gather(Expr): def __init__(self, dtype: plc.DataType, values: Expr, indices: Expr) -> None: self.dtype = dtype self.children = (values, indices) + self.is_pointwise = False def do_evaluate( self, @@ -71,6 +72,7 @@ class Filter(Expr): def __init__(self, dtype: plc.DataType, values: Expr, indices: Expr): self.dtype = dtype self.children = (values, indices) + self.is_pointwise = True def do_evaluate( self, diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/sorting.py b/python/cudf_polars/cudf_polars/dsl/expressions/sorting.py index 99512e2ef52..189f109e1a2 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/sorting.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/sorting.py @@ -32,6 +32,7 @@ def __init__( self.dtype = dtype self.options = options self.children = (column,) + self.is_pointwise = False def do_evaluate( self, @@ -71,6 +72,7 @@ def __init__( self.dtype = dtype self.options = options self.children = (column, *by) + self.is_pointwise = False def do_evaluate( self, diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/string.py b/python/cudf_polars/cudf_polars/dsl/expressions/string.py index 124a6e8d71c..256840c1f3d 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/string.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/string.py @@ -106,6 +106,7 @@ def __init__( self.options = options self.name = name self.children = children + self.is_pointwise = True self._validate_input() def _validate_input(self): diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/ternary.py b/python/cudf_polars/cudf_polars/dsl/expressions/ternary.py index d2b5d6bae29..120ca8edce0 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/ternary.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/ternary.py @@ -34,6 +34,7 @@ def __init__( ) -> None: self.dtype = dtype self.children = (when, then, otherwise) + self.is_pointwise = True def do_evaluate( self, diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/unary.py b/python/cudf_polars/cudf_polars/dsl/expressions/unary.py index 10caaff6811..3336c901e7f 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/unary.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/unary.py @@ -33,6 +33,7 @@ class Cast(Expr): def __init__(self, dtype: plc.DataType, value: Expr) -> None: self.dtype = dtype self.children = (value,) + self.is_pointwise = True if not dtypes.can_cast(value.dtype, self.dtype): raise NotImplementedError( f"Can't cast {value.dtype.id().name} to {self.dtype.id().name}" @@ -63,6 +64,7 @@ class Len(Expr): def __init__(self, dtype: plc.DataType) -> None: self.dtype = dtype self.children = () + self.is_pointwise = False def do_evaluate( self, @@ -147,6 +149,14 @@ def __init__( self.name = name self.options = options self.children = children + self.is_pointwise = self.name not in ( + "cum_min", + "cum_max", + "cum_prod", + "cum_sum", + "drop_nulls", + "unique", + ) if self.name not in UnaryFunction._supported_fns: raise NotImplementedError(f"Unary function {name=}") diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index b5af3bb80bf..1c1d4860eec 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -604,10 +604,12 @@ def slice_skip(tbl: plc.Table): (name, typ, []) for name, typ in schema.items() ] plc_tbl_w_meta = plc.io.json.read_json( - plc.io.SourceInfo(paths), - lines=True, - dtypes=json_schema, - prune_columns=True, + plc.io.json._setup_json_reader_options( + plc.io.SourceInfo(paths), + lines=True, + dtypes=json_schema, + prune_columns=True, + ) ) # TODO: I don't think cudf-polars supports nested types in general right now # (but when it does, we should pass child column names from nested columns in) diff --git a/python/cudf_polars/cudf_polars/dsl/traversal.py b/python/cudf_polars/cudf_polars/dsl/traversal.py index b3248dae93c..9c45a68812a 100644 --- a/python/cudf_polars/cudf_polars/dsl/traversal.py +++ b/python/cudf_polars/cudf_polars/dsl/traversal.py @@ -10,7 +10,7 @@ from cudf_polars.typing import U_contra, V_co if TYPE_CHECKING: - from collections.abc import Callable, Generator, Mapping, MutableMapping + from collections.abc import Callable, Generator, Mapping, MutableMapping, Sequence from cudf_polars.typing import GenericTransformer, NodeT @@ -23,22 +23,22 @@ ] -def traversal(node: NodeT) -> Generator[NodeT, None, None]: +def traversal(nodes: Sequence[NodeT]) -> Generator[NodeT, None, None]: """ Pre-order traversal of nodes in an expression. Parameters ---------- - node - Root of expression to traverse. + nodes + Roots of expressions to traverse. Yields ------ - Unique nodes in the expression, parent before child, children + Unique nodes in the expressions, parent before child, children in-order from left to right. """ - seen = {node} - lifo = [node] + seen = set(nodes) + lifo = list(nodes) while lifo: node = lifo.pop() diff --git a/python/cudf_polars/cudf_polars/experimental/io.py b/python/cudf_polars/cudf_polars/experimental/io.py index 3a1fec36079..2a5b400af4c 100644 --- a/python/cudf_polars/cudf_polars/experimental/io.py +++ b/python/cudf_polars/cudf_polars/experimental/io.py @@ -4,18 +4,24 @@ from __future__ import annotations +import enum import math -from typing import TYPE_CHECKING +import random +from enum import IntEnum +from typing import TYPE_CHECKING, Any -from cudf_polars.dsl.ir import DataFrameScan, Union +import pylibcudf as plc + +from cudf_polars.dsl.ir import IR, DataFrameScan, Scan, Union from cudf_polars.experimental.base import PartitionInfo from cudf_polars.experimental.dispatch import lower_ir_node if TYPE_CHECKING: from collections.abc import MutableMapping - from cudf_polars.dsl.ir import IR + from cudf_polars.dsl.expr import NamedExpr from cudf_polars.experimental.dispatch import LowerIRTransformer + from cudf_polars.typing import Schema @lower_ir_node.register(DataFrameScan) @@ -47,3 +53,274 @@ def _( } return ir, {ir: PartitionInfo(count=1)} + + +class ScanPartitionFlavor(IntEnum): + """Flavor of Scan partitioning.""" + + SINGLE_FILE = enum.auto() # 1:1 mapping between files and partitions + SPLIT_FILES = enum.auto() # Split each file into >1 partition + FUSED_FILES = enum.auto() # Fuse multiple files into each partition + + +class ScanPartitionPlan: + """ + Scan partitioning plan. + + Notes + ----- + The meaning of `factor` depends on the value of `flavor`: + - SINGLE_FILE: `factor` must be `1`. + - SPLIT_FILES: `factor` is the number of partitions per file. + - FUSED_FILES: `factor` is the number of files per partition. + """ + + __slots__ = ("factor", "flavor") + factor: int + flavor: ScanPartitionFlavor + + def __init__(self, factor: int, flavor: ScanPartitionFlavor) -> None: + if ( + flavor == ScanPartitionFlavor.SINGLE_FILE and factor != 1 + ): # pragma: no cover + raise ValueError(f"Expected factor == 1 for {flavor}, got: {factor}") + self.factor = factor + self.flavor = flavor + + @staticmethod + def from_scan(ir: Scan) -> ScanPartitionPlan: + """Extract the partitioning plan of a Scan operation.""" + if ir.typ == "parquet": + # TODO: Use system info to set default blocksize + parallel_options = ir.config_options.get("executor_options", {}) + blocksize: int = parallel_options.get("parquet_blocksize", 1024**3) + stats = _sample_pq_statistics(ir) + file_size = sum(float(stats[column]) for column in ir.schema) + if file_size > 0: + if file_size > blocksize: + # Split large files + return ScanPartitionPlan( + math.ceil(file_size / blocksize), + ScanPartitionFlavor.SPLIT_FILES, + ) + else: + # Fuse small files + return ScanPartitionPlan( + max(blocksize // int(file_size), 1), + ScanPartitionFlavor.FUSED_FILES, + ) + + # TODO: Use file sizes for csv and json + return ScanPartitionPlan(1, ScanPartitionFlavor.SINGLE_FILE) + + +class SplitScan(IR): + """ + Input from a split file. + + This class wraps a single-file `Scan` object. At + IO/evaluation time, this class will only perform + a partial read of the underlying file. The range + (skip_rows and n_rows) is calculated at IO time. + """ + + __slots__ = ( + "base_scan", + "schema", + "split_index", + "total_splits", + ) + _non_child = ( + "schema", + "base_scan", + "split_index", + "total_splits", + ) + base_scan: Scan + """Scan operation this node is based on.""" + split_index: int + """Index of the current split.""" + total_splits: int + """Total number of splits.""" + + def __init__( + self, schema: Schema, base_scan: Scan, split_index: int, total_splits: int + ): + self.schema = schema + self.base_scan = base_scan + self.split_index = split_index + self.total_splits = total_splits + self._non_child_args = ( + split_index, + total_splits, + *base_scan._non_child_args, + ) + self.children = () + if base_scan.typ not in ("parquet",): # pragma: no cover + raise NotImplementedError( + f"Unhandled Scan type for file splitting: {base_scan.typ}" + ) + + @classmethod + def do_evaluate( + cls, + split_index: int, + total_splits: int, + schema: Schema, + typ: str, + reader_options: dict[str, Any], + config_options: dict[str, Any], + paths: list[str], + with_columns: list[str] | None, + skip_rows: int, + n_rows: int, + row_index: tuple[str, int] | None, + predicate: NamedExpr | None, + ): + """Evaluate and return a dataframe.""" + if typ not in ("parquet",): # pragma: no cover + raise NotImplementedError(f"Unhandled Scan type for file splitting: {typ}") + + if len(paths) > 1: # pragma: no cover + raise ValueError(f"Expected a single path, got: {paths}") + + # Parquet logic: + # - We are one of "total_splits" SplitScan nodes + # assigned to the same file. + # - We know our index within this file ("split_index") + # - We can also use parquet metadata to query the + # total number of rows in each row-group of the file. + # - We can use all this information to calculate the + # "skip_rows" and "n_rows" options to use locally. + + rowgroup_metadata = plc.io.parquet_metadata.read_parquet_metadata( + plc.io.SourceInfo(paths) + ).rowgroup_metadata() + total_row_groups = len(rowgroup_metadata) + if total_splits <= total_row_groups: + # We have enough row-groups in the file to align + # all "total_splits" of our reads with row-group + # boundaries. Calculate which row-groups to include + # in the current read, and use metadata to translate + # the row-group indices to "skip_rows" and "n_rows". + rg_stride = total_row_groups // total_splits + skip_rgs = rg_stride * split_index + skip_rows = sum(rg["num_rows"] for rg in rowgroup_metadata[:skip_rgs]) + n_rows = sum( + rg["num_rows"] + for rg in rowgroup_metadata[skip_rgs : skip_rgs + rg_stride] + ) + else: + # There are not enough row-groups to align + # all "total_splits" of our reads with row-group + # boundaries. Use metadata to directly calculate + # "skip_rows" and "n_rows" for the current read. + total_rows = sum(rg["num_rows"] for rg in rowgroup_metadata) + n_rows = total_rows // total_splits + skip_rows = n_rows * split_index + + # Last split should always read to end of file + if split_index == (total_splits - 1): + n_rows = -1 + + # Perform the partial read + return Scan.do_evaluate( + schema, + typ, + reader_options, + config_options, + paths, + with_columns, + skip_rows, + n_rows, + row_index, + predicate, + ) + + +def _sample_pq_statistics(ir: Scan) -> dict[str, float]: + import numpy as np + import pyarrow.dataset as pa_ds + + # Use average total_uncompressed_size of three files + # TODO: Use plc.io.parquet_metadata.read_parquet_metadata + n_sample = 3 + column_sizes = {} + ds = pa_ds.dataset(random.sample(ir.paths, n_sample), format="parquet") + for i, frag in enumerate(ds.get_fragments()): + md = frag.metadata + for rg in range(md.num_row_groups): + row_group = md.row_group(rg) + for col in range(row_group.num_columns): + column = row_group.column(col) + name = column.path_in_schema + if name not in column_sizes: + column_sizes[name] = np.zeros(n_sample, dtype="int64") + column_sizes[name][i] += column.total_uncompressed_size + + return {name: np.mean(sizes) for name, sizes in column_sizes.items()} + + +@lower_ir_node.register(Scan) +def _( + ir: Scan, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + partition_info: MutableMapping[IR, PartitionInfo] + if ir.typ in ("csv", "parquet", "ndjson") and ir.n_rows == -1 and ir.skip_rows == 0: + plan = ScanPartitionPlan.from_scan(ir) + paths = list(ir.paths) + if plan.flavor == ScanPartitionFlavor.SPLIT_FILES: + # Disable chunked reader when splitting files + config_options = ir.config_options.copy() + config_options["parquet_options"] = config_options.get( + "parquet_options", {} + ).copy() + config_options["parquet_options"]["chunked"] = False + + slices: list[SplitScan] = [] + for path in paths: + base_scan = Scan( + ir.schema, + ir.typ, + ir.reader_options, + ir.cloud_options, + config_options, + [path], + ir.with_columns, + ir.skip_rows, + ir.n_rows, + ir.row_index, + ir.predicate, + ) + slices.extend( + SplitScan(ir.schema, base_scan, sindex, plan.factor) + for sindex in range(plan.factor) + ) + new_node = Union(ir.schema, None, *slices) + partition_info = {slice: PartitionInfo(count=1) for slice in slices} | { + new_node: PartitionInfo(count=len(slices)) + } + else: + groups: list[Scan] = [ + Scan( + ir.schema, + ir.typ, + ir.reader_options, + ir.cloud_options, + ir.config_options, + paths[i : i + plan.factor], + ir.with_columns, + ir.skip_rows, + ir.n_rows, + ir.row_index, + ir.predicate, + ) + for i in range(0, len(paths), plan.factor) + ] + new_node = Union(ir.schema, None, *groups) + partition_info = {group: PartitionInfo(count=1) for group in groups} | { + new_node: PartitionInfo(count=len(groups)) + } + return new_node, partition_info + + return ir, {ir: PartitionInfo(count=1)} # pragma: no cover diff --git a/python/cudf_polars/cudf_polars/experimental/parallel.py b/python/cudf_polars/cudf_polars/experimental/parallel.py index e5884f1c574..6843ed9ee2e 100644 --- a/python/cudf_polars/cudf_polars/experimental/parallel.py +++ b/python/cudf_polars/cudf_polars/experimental/parallel.py @@ -9,8 +9,9 @@ from functools import reduce from typing import TYPE_CHECKING, Any -import cudf_polars.experimental.io # noqa: F401 -from cudf_polars.dsl.ir import IR, Cache, Projection, Union +import cudf_polars.experimental.io +import cudf_polars.experimental.select # noqa: F401 +from cudf_polars.dsl.ir import IR, Cache, Filter, HStack, Projection, Select, Union from cudf_polars.dsl.traversal import CachingVisitor, traversal from cudf_polars.experimental.base import PartitionInfo, _concat, get_key_name from cudf_polars.experimental.dispatch import ( @@ -112,7 +113,7 @@ def task_graph( """ graph = reduce( operator.or_, - (generate_ir_tasks(node, partition_info) for node in traversal(ir)), + (generate_ir_tasks(node, partition_info) for node in traversal([ir])), ) key_name = get_key_name(ir) @@ -226,6 +227,8 @@ def _lower_ir_pwise( lower_ir_node.register(Projection, _lower_ir_pwise) lower_ir_node.register(Cache, _lower_ir_pwise) +lower_ir_node.register(Filter, _lower_ir_pwise) +lower_ir_node.register(HStack, _lower_ir_pwise) def _generate_ir_tasks_pwise( @@ -245,3 +248,6 @@ def _generate_ir_tasks_pwise( generate_ir_tasks.register(Projection, _generate_ir_tasks_pwise) generate_ir_tasks.register(Cache, _generate_ir_tasks_pwise) +generate_ir_tasks.register(Filter, _generate_ir_tasks_pwise) +generate_ir_tasks.register(HStack, _generate_ir_tasks_pwise) +generate_ir_tasks.register(Select, _generate_ir_tasks_pwise) diff --git a/python/cudf_polars/cudf_polars/experimental/select.py b/python/cudf_polars/cudf_polars/experimental/select.py new file mode 100644 index 00000000000..5f79384b569 --- /dev/null +++ b/python/cudf_polars/cudf_polars/experimental/select.py @@ -0,0 +1,36 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +"""Parallel Select Logic.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from cudf_polars.dsl.ir import Select +from cudf_polars.dsl.traversal import traversal +from cudf_polars.experimental.dispatch import lower_ir_node + +if TYPE_CHECKING: + from collections.abc import MutableMapping + + from cudf_polars.dsl.ir import IR + from cudf_polars.experimental.base import PartitionInfo + from cudf_polars.experimental.parallel import LowerIRTransformer + + +@lower_ir_node.register(Select) +def _( + ir: Select, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + child, partition_info = rec(ir.children[0]) + pi = partition_info[child] + if pi.count > 1 and not all( + expr.is_pointwise for expr in traversal([e.value for e in ir.exprs]) + ): + # TODO: Handle non-pointwise expressions. + raise NotImplementedError( + f"Selection {ir} does not support multiple partitions." + ) + new_node = ir.reconstruct([child]) + partition_info[new_node] = pi + return new_node, partition_info diff --git a/python/cudf_polars/tests/dsl/test_traversal.py b/python/cudf_polars/tests/dsl/test_traversal.py index 9755994c419..9fcca2e290e 100644 --- a/python/cudf_polars/tests/dsl/test_traversal.py +++ b/python/cudf_polars/tests/dsl/test_traversal.py @@ -32,21 +32,21 @@ def test_traversal_unique(): dt = plc.DataType(plc.TypeId.INT8) e1 = make_expr(dt, "a", "a") - unique_exprs = list(traversal(e1)) + unique_exprs = list(traversal([e1])) assert len(unique_exprs) == 2 assert set(unique_exprs) == {expr.Col(dt, "a"), e1} assert unique_exprs == [e1, expr.Col(dt, "a")] e2 = make_expr(dt, "a", "b") - unique_exprs = list(traversal(e2)) + unique_exprs = list(traversal([e2])) assert len(unique_exprs) == 3 assert set(unique_exprs) == {expr.Col(dt, "a"), expr.Col(dt, "b"), e2} assert unique_exprs == [e2, expr.Col(dt, "a"), expr.Col(dt, "b")] e3 = make_expr(dt, "b", "a") - unique_exprs = list(traversal(e3)) + unique_exprs = list(traversal([e3])) assert len(unique_exprs) == 3 assert set(unique_exprs) == {expr.Col(dt, "a"), expr.Col(dt, "b"), e3} diff --git a/python/cudf_polars/tests/experimental/test_scan.py b/python/cudf_polars/tests/experimental/test_scan.py new file mode 100644 index 00000000000..a26d751dc86 --- /dev/null +++ b/python/cudf_polars/tests/experimental/test_scan.py @@ -0,0 +1,80 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import pytest + +import polars as pl + +from cudf_polars import Translator +from cudf_polars.experimental.parallel import lower_ir_graph +from cudf_polars.testing.asserts import assert_gpu_result_equal + + +@pytest.fixture(scope="module") +def df(): + return pl.DataFrame( + { + "x": range(3_000), + "y": ["cat", "dog", "fish"] * 1_000, + "z": [1.0, 2.0, 3.0, 4.0, 5.0] * 600, + } + ) + + +def make_source(df, path, fmt, n_files=3): + n_rows = len(df) + stride = int(n_rows / n_files) + for i in range(n_files): + offset = stride * i + part = df.slice(offset, stride) + if fmt == "csv": + part.write_csv(path / f"part.{i}.csv") + elif fmt == "ndjson": + part.write_ndjson(path / f"part.{i}.ndjson") + else: + part.write_parquet( + path / f"part.{i}.parquet", + row_group_size=int(stride / 2), + ) + + +@pytest.mark.parametrize( + "fmt, scan_fn", + [ + ("csv", pl.scan_csv), + ("ndjson", pl.scan_ndjson), + ("parquet", pl.scan_parquet), + ], +) +def test_parallel_scan(tmp_path, df, fmt, scan_fn): + make_source(df, tmp_path, fmt) + q = scan_fn(tmp_path) + engine = pl.GPUEngine( + raise_on_fail=True, + executor="dask-experimental", + ) + assert_gpu_result_equal(q, engine=engine) + + +@pytest.mark.parametrize("blocksize", [1_000, 10_000, 1_000_000]) +def test_parquet_blocksize(tmp_path, df, blocksize): + n_files = 3 + make_source(df, tmp_path, "parquet", n_files) + q = pl.scan_parquet(tmp_path) + engine = pl.GPUEngine( + raise_on_fail=True, + executor="dask-experimental", + executor_options={"parquet_blocksize": blocksize}, + ) + assert_gpu_result_equal(q, engine=engine) + + # Check partitioning + qir = Translator(q._ldf.visit(), engine).translate_ir() + ir, info = lower_ir_graph(qir) + count = info[ir].count + if blocksize <= 12_000: + assert count > n_files + else: + assert count < n_files diff --git a/python/cudf_polars/tests/experimental/test_select.py b/python/cudf_polars/tests/experimental/test_select.py new file mode 100644 index 00000000000..7dfe6ead148 --- /dev/null +++ b/python/cudf_polars/tests/experimental/test_select.py @@ -0,0 +1,54 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import pytest + +import polars as pl + +from cudf_polars.testing.asserts import assert_gpu_result_equal + + +@pytest.fixture(scope="module") +def engine(): + return pl.GPUEngine( + raise_on_fail=True, + executor="dask-experimental", + executor_options={"max_rows_per_partition": 3}, + ) + + +@pytest.fixture(scope="module") +def df(): + return pl.LazyFrame( + { + "a": [1, 2, 3, 4, 5, 6, 7], + "b": [1, 1, 1, 1, 1, 1, 1], + } + ) + + +def test_select(df, engine): + query = df.select( + pl.col("a") + pl.col("b"), (pl.col("a") * 2 + pl.col("b")).alias("d") + ) + assert_gpu_result_equal(query, engine=engine) + + +def test_select_reduce_raises(df, engine): + query = df.select( + (pl.col("a") + pl.col("b")).max(), + (pl.col("a") * 2 + pl.col("b")).alias("d").mean(), + ) + with pytest.raises( + pl.exceptions.ComputeError, + match="NotImplementedError", + ): + assert_gpu_result_equal(query, engine=engine) + + +def test_select_with_cse_no_agg(df, engine): + expr = pl.col("a") + pl.col("a") + query = df.select(expr, (expr * 2).alias("b"), ((expr * 2) + 10).alias("c")) + assert_gpu_result_equal(query, engine=engine) diff --git a/python/pylibcudf/pylibcudf/interop.pyx b/python/pylibcudf/pylibcudf/interop.pyx index bd5397ac328..7a102cf0c88 100644 --- a/python/pylibcudf/pylibcudf/interop.pyx +++ b/python/pylibcudf/pylibcudf/interop.pyx @@ -273,10 +273,19 @@ cdef void _release_array(object array_capsule) noexcept: free(array) +def _maybe_create_nested_column_metadata(Column col): + return ColumnMetadata( + children_meta=[ + _maybe_create_nested_column_metadata(child) for child in col.children() + ] + ) + + def _table_to_schema(Table tbl, metadata): if metadata is None: - metadata = [ColumnMetadata() for _ in range(len(tbl.columns()))] - metadata = [ColumnMetadata(m) if isinstance(m, str) else m for m in metadata] + metadata = [_maybe_create_nested_column_metadata(col) for col in tbl.columns()] + else: + metadata = [ColumnMetadata(m) if isinstance(m, str) else m for m in metadata] cdef vector[column_metadata] c_metadata c_metadata.reserve(len(metadata)) diff --git a/python/pylibcudf/pylibcudf/io/json.pxd b/python/pylibcudf/pylibcudf/io/json.pxd index 4894ca3bd6e..7ce3cb859a5 100644 --- a/python/pylibcudf/pylibcudf/io/json.pxd +++ b/python/pylibcudf/pylibcudf/io/json.pxd @@ -8,6 +8,8 @@ from pylibcudf.io.types cimport ( ) from pylibcudf.libcudf.io.json cimport ( json_recovery_mode_t, + json_reader_options, + json_reader_options_builder, json_writer_options, json_writer_options_builder, ) @@ -15,19 +17,43 @@ from pylibcudf.libcudf.types cimport size_type from pylibcudf.table cimport Table -cpdef TableWithMetadata read_json( - SourceInfo source_info, - list dtypes = *, - compression_type compression = *, - bool lines = *, - size_t byte_range_offset = *, - size_t byte_range_size = *, - bool keep_quotes = *, - bool mixed_types_as_string = *, - bool prune_columns = *, - json_recovery_mode_t recovery_mode = *, - dict extra_parameters = *, -) +cdef class JsonReaderOptions: + cdef json_reader_options c_obj + cdef SourceInfo source + cpdef void set_dtypes(self, list types) + cpdef void enable_keep_quotes(self, bool keep_quotes) + cpdef void enable_mixed_types_as_string(self, bool mixed_types_as_string) + cpdef void enable_prune_columns(self, bool prune_columns) + cpdef void set_byte_range_offset(self, size_t offset) + cpdef void set_byte_range_size(self, size_t size) + cpdef void enable_lines(self, bool val) + # These hidden options are subjected to change without deprecation cycle. + # These are used to test libcudf JSON reader features, not used in cuDF. + cpdef void set_delimiter(self, str val) + cpdef void enable_dayfirst(self, bool val) + cpdef void enable_experimental(self, bool val) + cpdef void enable_normalize_single_quotes(self, bool val) + cpdef void enable_normalize_whitespace(self, bool val) + cpdef void set_strict_validation(self, bool val) + cpdef void allow_unquoted_control_chars(self, bool val) + cpdef void allow_numeric_leading_zeros(self, bool val) + cpdef void allow_nonnumeric_numbers(self, bool val) + cpdef void set_na_values(self, list vals) + +cdef class JsonReaderOptionsBuilder: + cdef json_reader_options_builder c_obj + cdef SourceInfo source + cpdef JsonReaderOptionsBuilder compression(self, compression_type compression) + cpdef JsonReaderOptionsBuilder lines(self, bool val) + cpdef JsonReaderOptionsBuilder keep_quotes(self, bool val) + cpdef JsonReaderOptionsBuilder byte_range_offset(self, size_t byte_range_offset) + cpdef JsonReaderOptionsBuilder byte_range_size(self, size_t byte_range_size) + cpdef JsonReaderOptionsBuilder recovery_mode( + self, json_recovery_mode_t recovery_mode + ) + cpdef build(self) + +cpdef TableWithMetadata read_json(JsonReaderOptions options) cdef class JsonWriterOptions: cdef json_writer_options c_obj @@ -36,6 +62,7 @@ cdef class JsonWriterOptions: cpdef void set_rows_per_chunk(self, size_type val) cpdef void set_true_value(self, str val) cpdef void set_false_value(self, str val) + cpdef void set_compression(self, compression_type comptype) cdef class JsonWriterOptionsBuilder: cdef json_writer_options_builder c_obj @@ -45,17 +72,12 @@ cdef class JsonWriterOptionsBuilder: cpdef JsonWriterOptionsBuilder na_rep(self, str val) cpdef JsonWriterOptionsBuilder include_nulls(self, bool val) cpdef JsonWriterOptionsBuilder lines(self, bool val) + cpdef JsonWriterOptionsBuilder compression(self, compression_type comptype) cpdef JsonWriterOptions build(self) cpdef void write_json(JsonWriterOptions options) cpdef tuple chunked_read_json( - SourceInfo source_info, - list dtypes = *, - compression_type compression = *, - bool keep_quotes = *, - bool mixed_types_as_string = *, - bool prune_columns = *, - json_recovery_mode_t recovery_mode = *, + JsonReaderOptions options, int chunk_size= *, ) diff --git a/python/pylibcudf/pylibcudf/io/json.pyi b/python/pylibcudf/pylibcudf/io/json.pyi index e0489742cd0..db4546f138d 100644 --- a/python/pylibcudf/pylibcudf/io/json.pyi +++ b/python/pylibcudf/pylibcudf/io/json.pyi @@ -19,18 +19,40 @@ ChildNameToTypeMap: TypeAlias = Mapping[str, ChildNameToTypeMap] NameAndType: TypeAlias = tuple[str, DataType, list[NameAndType]] -def read_json( - source_info: SourceInfo, - dtypes: list[NameAndType] | None = None, - compression: CompressionType = CompressionType.AUTO, - lines: bool = False, - byte_range_offset: int = 0, - byte_range_size: int = 0, - keep_quotes: bool = False, - mixed_types_as_string: bool = False, - prune_columns: bool = False, - recovery_mode: JSONRecoveryMode = JSONRecoveryMode.FAIL, -) -> TableWithMetadata: ... +class JsonReaderOptions: + def set_dtypes( + self, types: list[DataType] | list[NameAndType] + ) -> None: ... + def enable_keep_quotes(self, keep_quotes: bool) -> None: ... + def enable_mixed_types_as_string( + self, mixed_types_as_string: bool + ) -> None: ... + def enable_prune_columns(self, prune_columns: bool) -> None: ... + def set_byte_range_offset(self, offset: int) -> None: ... + def set_byte_range_size(self, size: int) -> None: ... + def enable_lines(self, val: bool) -> None: ... + def set_delimiter(self, val: str) -> None: ... + def enable_dayfirst(self, val: bool) -> None: ... + def enable_experimental(self, val: bool) -> None: ... + def enable_normalize_single_quotes(self, val: bool) -> None: ... + def enable_normalize_whitespace(self, val: bool) -> None: ... + def set_strict_validation(self, val: bool) -> None: ... + def allow_unquoted_control_chars(self, val: bool) -> None: ... + def allow_numeric_leading_zeros(self, val: bool) -> None: ... + def allow_nonnumeric_numbers(self, val: bool) -> None: ... + def set_na_values(self, vals: list[str]) -> None: ... + @staticmethod + def builder(source: SourceInfo) -> JsonReaderOptionsBuilder: ... + +class JsonReaderOptionsBuilder: + def compression(self, compression: CompressionType) -> Self: ... + def lines(self, lines: bool) -> Self: ... + def byte_range_offset(self, byte_range_offset: int) -> Self: ... + def byte_range_size(self, byte_range_size: int) -> Self: ... + def recovery_mode(self, recovery_mode: JSONRecoveryMode) -> Self: ... + def build(self) -> JsonReaderOptions: ... + +def read_json(options: JsonReaderOptions) -> TableWithMetadata: ... class JsonWriterOptions: @staticmethod @@ -38,22 +60,18 @@ class JsonWriterOptions: def set_rows_per_chunk(self, val: int) -> None: ... def set_true_value(self, val: str) -> None: ... def set_false_value(self, val: str) -> None: ... + def set_compression(self, comptype: CompressionType) -> None: ... class JsonWriterOptionsBuilder: def metadata(self, tbl_w_meta: TableWithMetadata) -> Self: ... def na_rep(self, val: str) -> Self: ... def include_nulls(self, val: bool) -> Self: ... def lines(self, val: bool) -> Self: ... + def compression(self, comptype: CompressionType) -> Self: ... def build(self) -> JsonWriterOptions: ... def write_json(options: JsonWriterOptions) -> None: ... def chunked_read_json( - source_info: SourceInfo, - dtypes: list[NameAndType] | None = None, - compression: CompressionType = CompressionType.AUTO, - keep_quotes: bool = False, - mixed_types_as_string: bool = False, - prune_columns: bool = False, - recovery_mode: JSONRecoveryMode = JSONRecoveryMode.FAIL, + options: JsonReaderOptions, chunk_size: int = 100_000_000, ) -> tuple[list[Column], list[str], ChildNameToTypeMap]: ... diff --git a/python/pylibcudf/pylibcudf/io/json.pyx b/python/pylibcudf/pylibcudf/io/json.pyx index 16078b31566..cf286378902 100644 --- a/python/pylibcudf/pylibcudf/io/json.pyx +++ b/python/pylibcudf/pylibcudf/io/json.pyx @@ -25,6 +25,8 @@ __all__ = [ "chunked_read_json", "read_json", "write_json", + "JsonReaderOptions", + "JsonReaderOptionsBuilder", "JsonWriterOptions", "JsonWriterOptionsBuilder" ] @@ -51,23 +53,21 @@ cdef map[string, schema_element] _generate_schema_map(list dtypes): return schema_map -cdef json_reader_options _setup_json_reader_options( +cpdef JsonReaderOptions _setup_json_reader_options( SourceInfo source_info, list dtypes, - compression_type compression, - bool lines, - size_t byte_range_offset, - size_t byte_range_size, - bool keep_quotes, - bool mixed_types_as_string, - bool prune_columns, - json_recovery_mode_t recovery_mode, - dict extra_parameters=None): - - cdef vector[string] na_vec - cdef vector[data_type] types_vec - cdef json_reader_options opts = ( - json_reader_options.builder(source_info.c_obj) + compression_type compression = compression_type.AUTO, + bool lines = False, + size_t byte_range_offset = 0, + size_t byte_range_size = 0, + bool keep_quotes = False, + bool mixed_types_as_string = False, + bool prune_columns = False, + json_recovery_mode_t recovery_mode = json_recovery_mode_t.FAIL, + dict extra_parameters=None, +): + options = ( + JsonReaderOptions.builder(source_info) .compression(compression) .lines(lines) .byte_range_offset(byte_range_offset) @@ -77,88 +77,359 @@ cdef json_reader_options _setup_json_reader_options( ) if dtypes is not None: - if isinstance(dtypes[0], tuple): - opts.set_dtypes(move(_generate_schema_map(dtypes))) - else: - for dtype in dtypes: - types_vec.push_back((dtype).c_obj) - opts.set_dtypes(types_vec) + options.set_dtypes(dtypes) - opts.enable_keep_quotes(keep_quotes) - opts.enable_mixed_types_as_string(mixed_types_as_string) - opts.enable_prune_columns(prune_columns) + options.enable_keep_quotes(keep_quotes) + options.enable_mixed_types_as_string(mixed_types_as_string) + options.enable_prune_columns(prune_columns) # These hidden options are subjected to change without deprecation cycle. # These are used to test libcudf JSON reader features, not used in cuDF. if extra_parameters is not None: for key, value in extra_parameters.items(): if key == 'delimiter': - opts.set_delimiter(ord(value)) + options.set_delimiter(value) elif key == 'dayfirst': - opts.enable_dayfirst(value) + options.enable_dayfirst(value) elif key == 'experimental': - opts.enable_experimental(value) + options.enable_experimental(value) elif key == 'normalize_single_quotes': - opts.enable_normalize_single_quotes(value) + options.enable_normalize_single_quotes(value) elif key == 'normalize_whitespace': - opts.enable_normalize_whitespace(value) + options.enable_normalize_whitespace(value) elif key == 'strict_validation': - opts.set_strict_validation(value) + options.set_strict_validation(value) elif key == 'allow_unquoted_control_chars': - opts.allow_unquoted_control_chars(value) + options.allow_unquoted_control_chars(value) elif key == 'allow_numeric_leading_zeros': - opts.allow_numeric_leading_zeros(value) + options.allow_numeric_leading_zeros(value) elif key == 'allow_nonnumeric_numbers': - opts.allow_nonnumeric_numbers(value) + options.allow_nonnumeric_numbers(value) elif key == 'na_values': - for na_val in value: - if isinstance(na_val, str): - na_vec.push_back(na_val.encode()) - opts.set_na_values(na_vec) + options.set_na_values(value) else: raise ValueError( "cudf engine doesn't support the " f"'{key}' keyword argument for read_json" ) - return opts + return options + + +cdef class JsonReaderOptions: + """ + The settings to use for ``read_json`` + + For details, see `:cpp:class:`cudf::io::json_reader_options` + """ + @staticmethod + def builder(SourceInfo source): + """ + Create a JsonReaderOptionsBuilder object + + For details, see :cpp:func:`cudf::io::json_reader_options::builder` + + Parameters + ---------- + sink : SourceInfo + The source to read the JSON file from. + + Returns + ------- + JsonReaderOptionsBuilder + Builder to build JsonReaderOptions + """ + cdef JsonReaderOptionsBuilder json_builder = ( + JsonReaderOptionsBuilder.__new__(JsonReaderOptionsBuilder) + ) + json_builder.c_obj = json_reader_options.builder(source.c_obj) + json_builder.source = source + return json_builder + + cpdef void set_dtypes(self, list types): + """ + Set data types for columns to be read. + + Parameters + ---------- + types : list + List of dtypes or a list of tuples of + column names, dtypes, and list of tuples + (to support nested column hierarchy) + + Returns + ------- + None + """ + cdef vector[data_type] types_vec + if isinstance(types[0], tuple): + self.c_obj.set_dtypes(_generate_schema_map(types)) + else: + types_vec.reserve(len(types)) + for dtype in types: + types_vec.push_back((dtype).c_obj) + self.c_obj.set_dtypes(types_vec) + + cpdef void enable_keep_quotes(self, bool keep_quotes): + """ + Set whether the reader should keep quotes of string values. + + Parameters + ---------- + keep_quotes : bool + Boolean value to indicate whether the reader should + keep quotes of string values + + Returns + ------- + None + """ + self.c_obj.enable_keep_quotes(keep_quotes) + + cpdef void enable_mixed_types_as_string(self, bool mixed_types_as_string): + """ + Set whether to parse mixed types as a string column. + Also enables forcing to read a struct as string column using schema. + + Parameters + ---------- + mixed_types_as_string : bool + Boolean value to enable/disable parsing mixed types + as a string column + + Returns + ------- + None + """ + self.c_obj.enable_mixed_types_as_string(mixed_types_as_string) + + cpdef void enable_prune_columns(self, bool prune_columns): + """ + Set whether to prune columns on read, selected + based on the ``set_dtypes`` option. + + Parameters + ---------- + prune_columns : bool + When set as true, if the reader options include + ``set_dtypes``, then the reader will only return those + columns which are mentioned in ``set_dtypes``. If false, + then all columns are returned, independent of the + ``set_dtypes`` setting. + + Returns + ------- + None + """ + self.c_obj.enable_prune_columns(prune_columns) + + cpdef void set_byte_range_offset(self, size_t offset): + """ + Set number of bytes to skip from source start. + + Parameters + ---------- + offset : size_t + Number of bytes of offset + + Returns + ------- + None + """ + self.c_obj.set_byte_range_offset(offset) + + cpdef void set_byte_range_size(self, size_t size): + """ + Set number of bytes to read. + + Parameters + ---------- + size : size_t + Number of bytes to read + + Returns + ------- + None + """ + self.c_obj.set_byte_range_size(size) + + cpdef void enable_lines(self, bool val): + """ + Set whether to read the file as a json object per line. + + Parameters + ---------- + val : bool + Boolean value to enable/disable the option + to read each line as a json object + + Returns + ------- + None + """ + self.c_obj.enable_lines(val) + + # These hidden options are subjected to change without deprecation cycle. + # These are used to test libcudf JSON reader features, not used in cuDF. + + cpdef void set_delimiter(self, str val): + self.c_obj.set_delimiter(val.encode()) + + cpdef void enable_dayfirst(self, bool val): + self.c_obj.enable_dayfirst(val) + + cpdef void enable_experimental(self, bool val): + self.c_obj.enable_experimental(val) + + cpdef void enable_normalize_single_quotes(self, bool val): + self.c_obj.enable_normalize_single_quotes(val) + + cpdef void enable_normalize_whitespace(self, bool val): + self.c_obj.enable_normalize_whitespace(val) + + cpdef void set_strict_validation(self, bool val): + self.c_obj.set_strict_validation(val) + + cpdef void allow_unquoted_control_chars(self, bool val): + self.c_obj.allow_unquoted_control_chars(val) + + cpdef void allow_numeric_leading_zeros(self, bool val): + self.c_obj.allow_numeric_leading_zeros(val) + + cpdef void allow_nonnumeric_numbers(self, bool val): + self.c_obj.allow_nonnumeric_numbers(val) + + cpdef void set_na_values(self, list vals): + cdef vector[string] vec + for val in vals: + if isinstance(val, str): + vec.push_back(val.encode()) + self.c_obj.set_na_values(vec) + + +cdef class JsonReaderOptionsBuilder: + cpdef JsonReaderOptionsBuilder compression(self, compression_type compression): + """ + Sets compression type. + + Parameters + ---------- + compression : CompressionType + The compression type to use + + Returns + ------- + Self + """ + self.c_obj.compression(compression) + return self + + cpdef JsonReaderOptionsBuilder lines(self, bool val): + """ + Set whether to read the file as a json object per line. + + Parameters + ---------- + val : bool + Boolean value to enable/disable the option + to read each line as a json object + + Returns + ------- + Self + """ + self.c_obj.lines(val) + return self + + cpdef JsonReaderOptionsBuilder keep_quotes(self, bool val): + """ + Set whether the reader should keep quotes of string values. + + Parameters + ---------- + val : bool + Boolean value to indicate whether the + reader should keep quotes of string values + + Returns + ------- + Self + """ + self.c_obj.keep_quotes(val) + return self + + cpdef JsonReaderOptionsBuilder byte_range_offset(self, size_t byte_range_offset): + """ + Set number of bytes to skip from source start. + + Parameters + ---------- + byte_range_offset : size_t + Number of bytes of offset + + Returns + ------- + Self + """ + self.c_obj.byte_range_offset(byte_range_offset) + return self + + cpdef JsonReaderOptionsBuilder byte_range_size(self, size_t byte_range_size): + """ + Set number of bytes to read. + + Parameters + ---------- + byte_range_size : size_t + Number of bytes to read + + Returns + ------- + Self + """ + self.c_obj.byte_range_size(byte_range_size) + return self + + cpdef JsonReaderOptionsBuilder recovery_mode( + self, + json_recovery_mode_t recovery_mode + ): + """ + Specifies the JSON reader's behavior on invalid JSON lines. + + Parameters + ---------- + recovery_mode : json_recovery_mode_t + An enum value to indicate the JSON reader's + behavior on invalid JSON lines. + + Returns + ------- + Self + """ + self.c_obj.recovery_mode(recovery_mode) + return self + + cpdef build(self): + """Create a JsonReaderOptions object""" + cdef JsonReaderOptions json_options = JsonReaderOptions.__new__( + JsonReaderOptions + ) + json_options.c_obj = move(self.c_obj.build()) + json_options.source = self.source + return json_options cpdef tuple chunked_read_json( - SourceInfo source_info, - list dtypes = None, - compression_type compression = compression_type.AUTO, - bool keep_quotes = False, - bool mixed_types_as_string = False, - bool prune_columns = False, - json_recovery_mode_t recovery_mode = json_recovery_mode_t.FAIL, + JsonReaderOptions options, int chunk_size=100_000_000, ): - """Reads an JSON file into a :py:class:`~.types.TableWithMetadata`. + """ + Reads chunks of a JSON file into a :py:class:`~.types.TableWithMetadata`. Parameters ---------- - source_info : SourceInfo - The SourceInfo object to read the JSON file from. - dtypes : list, default None - Set data types for the columns in the JSON file. - - Each element of the list has the format - (column_name, column_dtype, list of child dtypes), where - the list of child dtypes is an empty list if the child is not - a nested type (list or struct dtype), and is of format - (column_child_name, column_child_type, list of grandchild dtypes). - compression: CompressionType, default CompressionType.AUTO - The compression format of the JSON source. - keep_quotes : bool, default False - Whether the reader should keep quotes of string values. - mixed_types_as_string : bool, default False - If True, mixed type columns are returned as string columns. - If `False` parsing mixed type columns will thrown an error. - prune_columns : bool, default False - Whether to only read columns specified in dtypes. - recover_mode : JSONRecoveryMode, default JSONRecoveryMode.FAIL - Whether to raise an error or set corresponding values to null - when encountering an invalid JSON line. + options : JsonReaderOptions + Settings for controlling reading behavior chunk_size : int, default 100_000_000 bytes. The number of bytes to be read in chunks. The chunk_size should be set to at least row_size. @@ -171,20 +442,6 @@ cpdef tuple chunked_read_json( cdef size_type c_range_size = ( chunk_size if chunk_size is not None else 0 ) - cdef json_reader_options opts = _setup_json_reader_options( - source_info=source_info, - dtypes=dtypes, - compression=compression, - lines=True, - byte_range_offset=0, - byte_range_size=0, - keep_quotes=keep_quotes, - mixed_types_as_string=mixed_types_as_string, - prune_columns=prune_columns, - recovery_mode=recovery_mode, - ) - - # Read JSON cdef table_with_metadata c_result final_columns = [] @@ -192,12 +449,13 @@ cpdef tuple chunked_read_json( child_names = None i = 0 while True: - opts.set_byte_range_offset(c_range_size * i) - opts.set_byte_range_size(c_range_size) + options.enable_lines(True) + options.set_byte_range_offset(c_range_size * i) + options.set_byte_range_size(c_range_size) try: with nogil: - c_result = move(cpp_read_json(opts)) + c_result = move(cpp_read_json(options.c_obj)) except (ValueError, OverflowError): break if meta_names is None: @@ -225,75 +483,30 @@ cpdef tuple chunked_read_json( cpdef TableWithMetadata read_json( - SourceInfo source_info, - list dtypes = None, - compression_type compression = compression_type.AUTO, - bool lines = False, - size_t byte_range_offset = 0, - size_t byte_range_size = 0, - bool keep_quotes = False, - bool mixed_types_as_string = False, - bool prune_columns = False, - json_recovery_mode_t recovery_mode = json_recovery_mode_t.FAIL, - dict extra_parameters = None, + JsonReaderOptions options ): - """Reads an JSON file into a :py:class:`~.types.TableWithMetadata`. + """ + Read from JSON format. + + The source to read from and options are encapsulated + by the `options` object. + + For details, see :cpp:func:`read_json`. Parameters ---------- - source_info : SourceInfo - The SourceInfo object to read the JSON file from. - dtypes : list, default None - Set data types for the columns in the JSON file. - - Each element of the list has the format - (column_name, column_dtype, list of child dtypes), where - the list of child dtypes is an empty list if the child is not - a nested type (list or struct dtype), and is of format - (column_child_name, column_child_type, list of grandchild dtypes). - compression: CompressionType, default CompressionType.AUTO - The compression format of the JSON source. - byte_range_offset : size_t, default 0 - Number of bytes to skip from source start. - byte_range_size : size_t, default 0 - Number of bytes to read. By default, will read all bytes. - keep_quotes : bool, default False - Whether the reader should keep quotes of string values. - mixed_types_as_string : bool, default False - If True, mixed type columns are returned as string columns. - If `False` parsing mixed type columns will thrown an error. - prune_columns : bool, default False - Whether to only read columns specified in dtypes. - recover_mode : JSONRecoveryMode, default JSONRecoveryMode.FAIL - Whether to raise an error or set corresponding values to null - when encountering an invalid JSON line. - extra_parameters : dict, default None - Additional hidden parameters to pass to the JSON reader. + options: JsonReaderOptions + Settings for controlling reading behavior Returns ------- TableWithMetadata The Table and its corresponding metadata (column names) that were read in. """ - cdef json_reader_options opts = _setup_json_reader_options( - source_info=source_info, - dtypes=dtypes, - compression=compression, - lines=lines, - byte_range_offset=byte_range_offset, - byte_range_size=byte_range_size, - keep_quotes=keep_quotes, - mixed_types_as_string=mixed_types_as_string, - prune_columns=prune_columns, - recovery_mode=recovery_mode, - extra_parameters=extra_parameters, - ) - - # Read JSON cdef table_with_metadata c_result with nogil: - c_result = move(cpp_read_json(opts)) + c_result = move(cpp_read_json(options.c_obj)) return TableWithMetadata.from_libcudf(c_result) @@ -374,6 +587,20 @@ cdef class JsonWriterOptions: """ self.c_obj.set_false_value(val.encode()) + cpdef void set_compression(self, compression_type comptype): + """ + Sets compression type to be used + + Parameters + ---------- + comptype : CompressionType + Compression type for sink + + Returns + ------- + None + """ + self.c_obj.set_compression(comptype) cdef class JsonWriterOptionsBuilder: cpdef JsonWriterOptionsBuilder metadata(self, TableWithMetadata tbl_w_meta): @@ -440,6 +667,22 @@ cdef class JsonWriterOptionsBuilder: self.c_obj.lines(val) return self + cpdef JsonWriterOptionsBuilder compression(self, compression_type comptype): + """ + Sets compression type of output sink. + + Parameters + ---------- + comptype : CompressionType + Compression type used + + Returns + ------- + Self + """ + self.c_obj.compression(comptype) + return self + cpdef JsonWriterOptions build(self): """Create a JsonWriterOptions object""" cdef JsonWriterOptions json_options = JsonWriterOptions.__new__( diff --git a/python/pylibcudf/pylibcudf/io/orc.pxd b/python/pylibcudf/pylibcudf/io/orc.pxd index 671f0692444..7531608519c 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pxd +++ b/python/pylibcudf/pylibcudf/io/orc.pxd @@ -1,5 +1,5 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from libc.stdint cimport uint64_t +from libc.stdint cimport uint64_t, int64_t from libcpp cimport bool from libcpp.optional cimport optional from libcpp.string cimport string @@ -19,6 +19,8 @@ from pylibcudf.libcudf.io.orc_metadata cimport ( ) from pylibcudf.libcudf.io.orc cimport ( orc_chunked_writer, + orc_reader_options, + orc_reader_options_builder, orc_writer_options, orc_writer_options_builder, chunked_orc_writer_options, @@ -32,17 +34,23 @@ from pylibcudf.libcudf.io.types cimport ( statistics_freq, ) -cpdef TableWithMetadata read_orc( - SourceInfo source_info, - list columns = *, - list stripes = *, - size_type skip_rows = *, - size_type nrows = *, - bool use_index = *, - bool use_np_dtypes = *, - DataType timestamp_type = *, - list decimal128_columns = * -) +cdef class OrcReaderOptions: + cdef orc_reader_options c_obj + cdef SourceInfo source + cpdef void set_num_rows(self, int64_t nrows) + cpdef void set_skip_rows(self, int64_t skip_rows) + cpdef void set_stripes(self, list stripes) + cpdef void set_decimal128_columns(self, list val) + cpdef void set_timestamp_type(self, DataType type_) + cpdef void set_columns(self, list col_names) + +cdef class OrcReaderOptionsBuilder: + cdef orc_reader_options_builder c_obj + cdef SourceInfo source + cpdef OrcReaderOptionsBuilder use_index(self, bool use) + cpdef OrcReaderOptions build(self) + +cpdef TableWithMetadata read_orc(OrcReaderOptions options) cdef class OrcColumnStatistics: cdef optional[uint64_t] number_of_values_c diff --git a/python/pylibcudf/pylibcudf/io/orc.pyi b/python/pylibcudf/pylibcudf/io/orc.pyi index 516f97981e9..c496b7a2152 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pyi +++ b/python/pylibcudf/pylibcudf/io/orc.pyi @@ -1,6 +1,8 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from typing import Any, Self +from typing import Any + +from typing_extensions import Self from pylibcudf.io.types import ( CompressionType, @@ -11,19 +13,21 @@ from pylibcudf.io.types import ( TableWithMetadata, ) from pylibcudf.table import Table -from pylibcudf.types import DataType -def read_orc( - source_info: SourceInfo, - columns: list[str] | None = None, - stripes: list[list[int]] | None = None, - skip_rows: int = 0, - nrows: int = -1, - use_index: bool = True, - use_np_dtypes: bool = True, - timestamp_type: DataType | None = None, - decimal128_columns: list[str] | None = None, -) -> TableWithMetadata: ... +class OrcReaderOptions: + def set_num_rows(self, nrows: int) -> None: ... + def set_skip_rows(self, skip_rows: int) -> None: ... + def set_stripes(self, stripes: list[list[int]]) -> None: ... + def set_decimal128_columns(self, val: list[str]) -> None: ... + def set_columns(self, col_names: list[str]) -> None: ... + @staticmethod + def builder(source: SourceInfo) -> OrcReaderOptionsBuilder: ... + +class OrcReaderOptionsBuilder: + def use_index(self, use: bool) -> Self: ... + def build(self) -> OrcReaderOptions: ... + +def read_orc(options: OrcReaderOptions) -> TableWithMetadata: ... class OrcColumnStatistics: def __init__(self): ... diff --git a/python/pylibcudf/pylibcudf/io/orc.pyx b/python/pylibcudf/pylibcudf/io/orc.pyx index 63eab4a9634..c125d7e76fa 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pyx +++ b/python/pylibcudf/pylibcudf/io/orc.pyx @@ -46,6 +46,8 @@ __all__ = [ "read_orc", "read_parsed_orc_statistics", "write_orc", + "OrcReaderOptions", + "OrcReaderOptionsBuilder", "OrcWriterOptions", "OrcWriterOptionsBuilder", "OrcChunkedWriter", @@ -237,84 +239,190 @@ cdef class ParsedOrcStatistics: return out -cpdef TableWithMetadata read_orc( - SourceInfo source_info, - list columns = None, - list stripes = None, - size_type skip_rows = 0, - size_type nrows = -1, - bool use_index = True, - bool use_np_dtypes = True, - DataType timestamp_type = None, - list decimal128_columns = None, -): - """Reads an ORC file into a :py:class:`~.types.TableWithMetadata`. - - Parameters - ---------- - source_info : SourceInfo - The SourceInfo object to read the Parquet file from. - columns : list, default None - The string names of the columns to be read. - stripes : list[list[size_type]], default None - List of stripes to be read. - skip_rows : int64_t, default 0 - The number of rows to skip from the start of the file. - nrows : size_type, default -1 - The number of rows to read. By default, read the entire file. - use_index : bool, default True - Whether to use the row index to speed up reading. - use_np_dtypes : bool, default True - Whether to use numpy compatible dtypes. - timestamp_type : DataType, default None - The timestamp type to use for the timestamp columns. - decimal128_columns : list, default None - List of column names to be read as 128-bit decimals. +cdef class OrcReaderOptions: + """ + The settings to use for ``read_orc`` - Returns - ------- - TableWithMetadata - The Table and its corresponding metadata (column names) that were read in. + For details, see :cpp:class:`cudf::io::orc_reader_options` """ - cdef orc_reader_options opts - cdef vector[vector[size_type]] c_stripes - opts = ( - orc_reader_options.builder(source_info.c_obj) - .use_index(use_index) - .build() - ) - if nrows >= 0: - opts.set_num_rows(nrows) - if skip_rows >= 0: - opts.set_skip_rows(skip_rows) - if stripes is not None: - c_stripes = stripes - opts.set_stripes(c_stripes) - if timestamp_type is not None: - opts.set_timestamp_type(timestamp_type.c_obj) - - cdef vector[string] c_decimal128_columns - if decimal128_columns is not None and len(decimal128_columns) > 0: - c_decimal128_columns.reserve(len(decimal128_columns)) - for col in decimal128_columns: + @staticmethod + def builder(SourceInfo source): + """ + Create a OrcReaderOptionsBuilder object + + For details, see :cpp:func:`cudf::io::orc_reader_options::builder` + + Parameters + ---------- + sink : SourceInfo + The source to read the ORC file from. + + Returns + ------- + OrcReaderOptionsBuilder + Builder to build OrcReaderOptions + """ + cdef OrcReaderOptionsBuilder orc_builder = ( + OrcReaderOptionsBuilder.__new__(OrcReaderOptionsBuilder) + ) + orc_builder.c_obj = orc_reader_options.builder(source.c_obj) + orc_builder.source = source + return orc_builder + + cpdef void set_num_rows(self, int64_t nrows): + """ + Sets number of row to read. + + Parameters + ---------- + nrows: int64_t + Number of rows + + Returns + ------- + None + """ + self.c_obj.set_num_rows(nrows) + + cpdef void set_skip_rows(self, int64_t skip_rows): + """ + Sets number of rows to skip from the start. + + Parameters + ---------- + skip_rows: int64_t + Number of rows + + Returns + ------- + None + """ + self.c_obj.set_skip_rows(skip_rows) + + cpdef void set_stripes(self, list stripes): + """ + Sets list of stripes to read for each input source. + + Parameters + ---------- + stripes: list[list[size_type]] + List of lists, mapping stripes to read to input sources + + Returns + ------- + None + """ + cdef vector[vector[size_type]] c_stripes + cdef vector[size_type] vec + for sub_list in stripes: + for x in sub_list: + vec.push_back(x) + c_stripes.push_back(vec) + vec.clear() + self.c_obj.set_stripes(c_stripes) + + cpdef void set_decimal128_columns(self, list val): + """ + Set columns that should be read as 128-bit Decimal. + + Parameters + ---------- + val: list[str] + List of fully qualified column names + + Returns + ------- + None + """ + cdef vector[string] c_decimal128_columns + c_decimal128_columns.reserve(len(val)) + for col in val: if not isinstance(col, str): raise TypeError("Decimal 128 column names must be strings!") c_decimal128_columns.push_back(col.encode()) - opts.set_decimal128_columns(c_decimal128_columns) + self.c_obj.set_decimal128_columns(c_decimal128_columns) + + cpdef void set_timestamp_type(self, DataType type_): + """ + Sets timestamp type to which timestamp column will be cast. + + Parameters + ---------- + type_: DataType + Type of timestamp - cdef vector[string] c_column_names - if columns is not None and len(columns) > 0: - c_column_names.reserve(len(columns)) - for col in columns: + Returns + ------- + None + """ + self.c_obj.set_timestamp_type(type_.c_obj) + + cpdef void set_columns(self, list col_names): + """ + Sets names of the column to read. + + Parameters + ---------- + col_names: list[str] + List of column names + + Returns + ------- + None + """ + cdef vector[string] c_column_names + c_column_names.reserve(len(col_names)) + for col in col_names: if not isinstance(col, str): raise TypeError("Column names must be strings!") c_column_names.push_back(col.encode()) - opts.set_columns(c_column_names) + self.c_obj.set_columns(c_column_names) + +cdef class OrcReaderOptionsBuilder: + cpdef OrcReaderOptionsBuilder use_index(self, bool use): + """ + Enable/Disable use of row index to speed-up reading. + + Parameters + ---------- + use : bool + Boolean value to enable/disable row index use + Returns + ------- + OrcReaderOptionsBuilder + """ + self.c_obj.use_index(use) + return self + + cpdef OrcReaderOptions build(self): + """Create a OrcReaderOptions object""" + cdef OrcReaderOptions orc_options = OrcReaderOptions.__new__( + OrcReaderOptions + ) + orc_options.c_obj = move(self.c_obj.build()) + orc_options.source = self.source + return orc_options + + +cpdef TableWithMetadata read_orc(OrcReaderOptions options): + """ + Read from ORC format. + + The source to read from and options are encapsulated + by the `options` object. + + For details, see :cpp:func:`read_orc`. + + Parameters + ---------- + options: OrcReaderOptions + Settings for controlling reading behavior + """ cdef table_with_metadata c_result with nogil: - c_result = move(cpp_read_orc(opts)) + c_result = move(cpp_read_orc(options.c_obj)) return TableWithMetadata.from_libcudf(c_result) @@ -503,7 +611,7 @@ cpdef void write_orc(OrcWriterOptions options): The table to write, output paths, and options are encapsulated by the `options` object. - For details, see :cpp:func:`write_csv`. + For details, see :cpp:func:`write_orc`. Parameters ---------- diff --git a/python/pylibcudf/pylibcudf/libcudf/io/json.pxd b/python/pylibcudf/pylibcudf/libcudf/io/json.pxd index c241c478f25..d23dd0685d1 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/json.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/json.pxd @@ -167,6 +167,8 @@ cdef extern from "cudf/io/json.hpp" \ size_type get_rows_per_chunk() except +libcudf_exception_handler string get_true_value() except +libcudf_exception_handler string get_false_value() except +libcudf_exception_handler + cudf_io_types.compression_type get_compression()\ + except +libcudf_exception_handler # setter void set_table( @@ -181,6 +183,9 @@ cdef extern from "cudf/io/json.hpp" \ void set_rows_per_chunk(size_type val) except +libcudf_exception_handler void set_true_value(string val) except +libcudf_exception_handler void set_false_value(string val) except +libcudf_exception_handler + void set_compression( + cudf_io_types.compression_type comptype + ) except +libcudf_exception_handler @staticmethod json_writer_options_builder builder( @@ -218,6 +223,9 @@ cdef extern from "cudf/io/json.hpp" \ json_writer_options_builder& false_value( string val ) except +libcudf_exception_handler + json_writer_options_builder& compression( + cudf_io_types.compression_type comptype + ) except +libcudf_exception_handler json_writer_options build() except +libcudf_exception_handler diff --git a/python/pylibcudf/pylibcudf/tests/io/test_json.py b/python/pylibcudf/pylibcudf/tests/io/test_json.py index 9b0c5a29fe8..747bbfa1370 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_json.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_json.py @@ -167,9 +167,12 @@ def test_read_json_basic( source.seek(0) res = plc.io.json.read_json( - plc.io.SourceInfo([source]), - compression=compression_type, - lines=lines, + ( + plc.io.json.JsonReaderOptions.builder(plc.io.SourceInfo([source])) + .compression(compression_type) + .lines(lines) + .build() + ) ) # Adjustments to correct for the fact orient=records is lossy @@ -243,9 +246,14 @@ def get_child_types(typ): new_schema = pa.schema(new_fields) - res = plc.io.json.read_json( - plc.io.SourceInfo([source]), dtypes=dtypes, lines=True + options = ( + plc.io.json.JsonReaderOptions.builder(plc.io.SourceInfo([source])) + .lines(True) + .build() ) + options.set_dtypes(dtypes) + + res = plc.io.json.read_json(options) new_table = pa_table.cast(new_schema) # orient=records is lossy @@ -269,10 +277,15 @@ def test_read_json_lines_byte_range(source_or_sink, chunk_size): for chunk_start in range(0, len(json_str.encode("utf-8")), chunk_size): tbls_w_meta.append( plc.io.json.read_json( - plc.io.SourceInfo([source]), - lines=True, - byte_range_offset=chunk_start, - byte_range_size=chunk_start + chunk_size, + ( + plc.io.json.JsonReaderOptions.builder( + plc.io.SourceInfo([source]) + ) + .lines(True) + .byte_range_offset(chunk_start) + .byte_range_size(chunk_start + chunk_size) + .build() + ) ) ) @@ -302,7 +315,12 @@ def test_read_json_lines_keep_quotes(keep_quotes, source_or_sink): write_source_str(source, json_bytes) tbl_w_meta = plc.io.json.read_json( - plc.io.SourceInfo([source]), lines=True, keep_quotes=keep_quotes + ( + plc.io.json.JsonReaderOptions.builder(plc.io.SourceInfo([source])) + .lines(True) + .keep_quotes(keep_quotes) + .build() + ) ) template = "{0}" @@ -330,20 +348,19 @@ def test_read_json_lines_recovery_mode(recovery_mode, source_or_sink): json_str = '{"a":1,"b":10}\n{"a":2,"b":11}\nabc\n{"a":3,"b":12}\n' write_source_str(source, json_str) + options = ( + plc.io.json.JsonReaderOptions.builder(plc.io.SourceInfo([source])) + .lines(True) + .recovery_mode(recovery_mode) + .build() + ) + if recovery_mode == plc.io.types.JSONRecoveryMode.FAIL: with pytest.raises(RuntimeError): - plc.io.json.read_json( - plc.io.SourceInfo([source]), - lines=True, - recovery_mode=recovery_mode, - ) + plc.io.json.read_json(options) else: # Recover case (bad values replaced with nulls) - tbl_w_meta = plc.io.json.read_json( - plc.io.SourceInfo([source]), - lines=True, - recovery_mode=recovery_mode, - ) + tbl_w_meta = plc.io.json.read_json(options) exp = pa.Table.from_arrays( [[1, 2, None, 3], [10, 11, None, 12]], names=["a", "b"] ) diff --git a/python/pylibcudf/pylibcudf/tests/io/test_orc.py b/python/pylibcudf/pylibcudf/tests/io/test_orc.py index 2557e40c935..fe35255505c 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_orc.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_orc.py @@ -37,12 +37,17 @@ def test_read_orc_basic( binary_source_or_sink, pa_table, **_COMMON_ORC_SOURCE_KWARGS ) - res = plc.io.orc.read_orc( - plc.io.SourceInfo([source]), - nrows=nrows, - skip_rows=skiprows, - columns=columns, - ) + options = plc.io.orc.OrcReaderOptions.builder( + plc.io.types.SourceInfo([source]) + ).build() + if nrows >= 0: + options.set_num_rows(nrows) + if skiprows >= 0: + options.set_skip_rows(skiprows) + if columns is not None and len(columns) > 0: + options.set_columns(columns) + + res = plc.io.orc.read_orc(options) if columns is not None: pa_table = pa_table.select(columns) diff --git a/python/pylibcudf/pylibcudf/tests/test_interop.py b/python/pylibcudf/pylibcudf/tests/test_interop.py index af80b6e5978..ca42eacdfdb 100644 --- a/python/pylibcudf/pylibcudf/tests/test_interop.py +++ b/python/pylibcudf/pylibcudf/tests/test_interop.py @@ -40,6 +40,28 @@ def test_struct_dtype_roundtrip(): assert arrow_type == struct_type +def test_table_with_nested_dtype_to_arrow(): + pa_array = pa.array([[{"": 1}]]) + plc_table = plc.Table([plc.interop.from_arrow(pa_array)]) + result = plc.interop.to_arrow(plc_table) + expected_schema = pa.schema( + [ + pa.field( + "", + pa.list_( + pa.field( + "", + pa.struct([pa.field("", pa.int64(), nullable=False)]), + nullable=False, + ) + ), + nullable=False, + ) + ] + ) + assert result.schema == expected_schema + + def test_decimal128_roundtrip(): decimal_type = pa.decimal128(10, 2) plc_type = plc.interop.from_arrow(decimal_type)