Skip to content

Commit

Permalink
atomic global
Browse files Browse the repository at this point in the history
  • Loading branch information
vuule committed Sep 21, 2023
1 parent c2e8ff0 commit da12224
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 14 deletions.
10 changes: 6 additions & 4 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ __global__ void __launch_bounds__(decode_block_size)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int* error_code)
int32_t* error_code)
{
__shared__ __align__(16) page_state_s state_g;
__shared__ __align__(16)
Expand Down Expand Up @@ -602,8 +602,10 @@ __global__ void __launch_bounds__(decode_block_size)
}
__syncthreads();
}

if (!t and s->error != 0) { atomicOr(error_code, s->error); }
if (!t and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.store(s->error, cuda::std::memory_order_relaxed);
}
}

struct mask_tform {
Expand All @@ -629,7 +631,7 @@ void __host__ DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
size_t num_rows,
size_t min_row,
int level_type_size,
int* error_code,
int32_t* error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ __global__ void __launch_bounds__(96)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int* error_code)
int32_t* error_code)
{
using cudf::detail::warp_size;
__shared__ __align__(16) delta_binary_decoder db_state;
Expand Down Expand Up @@ -151,7 +151,10 @@ __global__ void __launch_bounds__(96)
__syncthreads();
}

if (!t and s->error != 0) { atomicOr(error_code, s->error); }
if (!t and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.store(s->error, cuda::std::memory_order_relaxed);
}
}

} // anonymous namespace
Expand All @@ -164,7 +167,7 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages
size_t num_rows,
size_t min_row,
int level_type_size,
int* error_code,
int32_t* error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ __global__ void __launch_bounds__(decode_block_size)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int* error_code)
int32_t* error_code)
{
__shared__ __align__(16) page_state_s state_g;
__shared__ __align__(4) size_type last_offset;
Expand Down Expand Up @@ -748,7 +748,10 @@ __global__ void __launch_bounds__(decode_block_size)
auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);

if (!t and s->error != 0) { atomicOr(error_code, s->error); }
if (!t and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.store(s->error, cuda::std::memory_order_relaxed);
}
}

} // anonymous namespace
Expand Down Expand Up @@ -782,7 +785,7 @@ void __host__ DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pa
size_t num_rows,
size_t min_row,
int level_type_size,
int* error_code,
int32_t* error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ void DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
size_t num_rows,
size_t min_row,
int level_type_size,
int* error_code,
int32_t* error_code,
rmm::cuda_stream_view stream);

/**
Expand All @@ -596,7 +596,7 @@ void DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
size_t num_rows,
size_t min_row,
int level_type_size,
int* error_code,
int32_t* error_code,
rmm::cuda_stream_view stream);

/**
Expand All @@ -618,7 +618,7 @@ void DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages,
size_t num_rows,
size_t min_row,
int level_type_size,
int* error_code,
int32_t* error_code,
rmm::cuda_stream_view stream);

/**
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
chunk_nested_valids.host_to_device_async(_stream);
chunk_nested_data.host_to_device_async(_stream);

rmm::device_scalar<int> error_code(0, _stream);
rmm::device_scalar<int32_t> error_code(0, _stream);

// get the number of streams we need from the pool and tell them to wait on the H2D copies
int const nkernels = std::bitset<32>(kernel_mask).count();
Expand Down

0 comments on commit da12224

Please sign in to comment.