From 5ab4cec6dee1021a368ad08eb6288bed884d9ac7 Mon Sep 17 00:00:00 2001 From: LinGeLin <1057445597@qq.com> Date: Thu, 2 Nov 2023 10:39:30 +0800 Subject: [PATCH] Replace sparse_fill_empty_rows_op with the tf native interface --- .../dynamic_embedding/core/BUILD | 4 - .../core/kernels/sparse_fill_empty_rows_op.cc | 62 ---- .../kernels/sparse_fill_empty_rows_op.cu.cc | 274 ------------------ .../core/kernels/sparse_fill_empty_rows_op.h | 38 --- .../dynamic_embedding/python/ops/math_ops.py | 54 +--- 5 files changed, 1 insertion(+), 431 deletions(-) delete mode 100644 tensorflow_recommenders_addons/dynamic_embedding/core/kernels/sparse_fill_empty_rows_op.cc delete mode 100644 tensorflow_recommenders_addons/dynamic_embedding/core/kernels/sparse_fill_empty_rows_op.cu.cc delete mode 100644 tensorflow_recommenders_addons/dynamic_embedding/core/kernels/sparse_fill_empty_rows_op.h diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/BUILD b/tensorflow_recommenders_addons/dynamic_embedding/core/BUILD index 08e32fe32..a2e030135 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/BUILD +++ b/tensorflow_recommenders_addons/dynamic_embedding/core/BUILD @@ -76,8 +76,6 @@ custom_op_library( "kernels/segment_reduction_ops.h", "kernels/segment_reduction_ops_impl.cc", "kernels/segment_reduction_ops_impl.h", - "kernels/sparse_fill_empty_rows_op.cc", - "kernels/sparse_fill_empty_rows_op.h", "ops/math_ops.cc", "utils/utils.h", ], @@ -89,8 +87,6 @@ custom_op_library( cuda_srcs = [ "kernels/segment_reduction_ops.h", "kernels/segment_reduction_ops_gpu.cu.cc", - "kernels/sparse_fill_empty_rows_op.h", - "kernels/sparse_fill_empty_rows_op.cu.cc", ], ) diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/sparse_fill_empty_rows_op.cc b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/sparse_fill_empty_rows_op.cc deleted file mode 100644 index eed78bf2c..000000000 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/sparse_fill_empty_rows_op.cc +++ /dev/null @@ -1,62 +0,0 @@ -/* Copyright 2021 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#define EIGEN_USE_THREADS - -#include "sparse_fill_empty_rows_op.h" - -#include -#include -#include -#include -#include - -#include "tensorflow/core/framework/op_kernel.h" -#include "tensorflow/core/framework/register_types.h" -#include "tensorflow/core/framework/tensor.h" -#include "tensorflow/core/framework/tensor_util.h" -#include "tensorflow/core/framework/types.h" -#include "tensorflow/core/lib/gtl/inlined_vector.h" -#include "tensorflow/core/util/sparse/sparse_tensor.h" - -namespace tensorflow { - -using GPUDevice = Eigen::GpuDevice; - -template -class SparseFillEmptyRowsOp : public OpKernel { - public: - explicit SparseFillEmptyRowsOp(OpKernelConstruction* context) - : OpKernel(context) {} - - void Compute(OpKernelContext* context) override { - functor::SparseFillEmptyRowsFunctor()(context); - } -}; - -#if GOOGLE_CUDA -#define REGISTER_KERNELS(type) \ - REGISTER_KERNEL_BUILDER(Name("TfraSparseFillEmptyRows") \ - .Device(DEVICE_GPU) \ - .TypeConstraint("T"), \ - SparseFillEmptyRowsOp) -TF_CALL_int8(REGISTER_KERNELS); -TF_CALL_int32(REGISTER_KERNELS); -TF_CALL_half(REGISTER_KERNELS); -TF_CALL_float(REGISTER_KERNELS); -TF_CALL_int64(REGISTER_KERNELS); -#undef REGISTER_KERNELS -#endif // GOOGLE_CUDA -} // namespace tensorflow diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/sparse_fill_empty_rows_op.cu.cc b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/sparse_fill_empty_rows_op.cu.cc deleted file mode 100644 index e12c79163..000000000 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/sparse_fill_empty_rows_op.cu.cc +++ /dev/null @@ -1,274 +0,0 @@ -/* Copyright 2021 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#if GOOGLE_CUDA - -#define EIGEN_USE_GPU - -#include -#include -#include -#include -#include - -#include "tensorflow/core/framework/op_kernel.h" -#include "tensorflow/core/framework/register_types.h" -#include "tensorflow/core/framework/tensor.h" -#include "tensorflow/core/framework/tensor_util.h" -#include "tensorflow/core/framework/types.h" -#include "tensorflow/core/lib/gtl/inlined_vector.h" -#include "tensorflow/core/util/gpu_kernel_helper.h" -#include "tensorflow_recommenders_addons/dynamic_embedding/core/kernels/sparse_fill_empty_rows_op.h" -#include "tensorflow_recommenders_addons/dynamic_embedding/core/lib/nvhash/cub/cub/device/device_scan.cuh" - -namespace tensorflow { - -using GPUDevice = Eigen::GpuDevice; - -// calculate how many rows are empty and record their location -__global__ void SparseFillEmptyRowCountKernel( - const int64* indices, const int nnz, const int64* input_shape, - int* row_nnz_count, // size: num_rows - int64* input_row_offset, // size: num_rows + 1 - int64* output_row_offset // size: num_rows + 1 -) { - GPU_1D_KERNEL_LOOP(idx, nnz) { - int64 _row = indices[idx * 2]; - atomicAdd(row_nnz_count + _row, 1); - } -} - -__global__ void SparseFillEmptyRowAddOneKernel(const int64* input_shape, - int* row_nnz_count) { - const int64 num_rows = input_shape[0]; - GPU_1D_KERNEL_LOOP(id_row, num_rows) { - if (row_nnz_count[id_row] == 0) { - row_nnz_count[id_row] += 1; - } - } -} - -// copy the original data to output data address and fill default value to empty -// rows -template -__global__ void SparseFillEmptyRowFillKernel( - // inputs - const int64* input_indices, const T* input_values, const int64* input_shape, - const T* default_value, const int64* input_row_offset, - const int64* output_row_offset, - // outputs - int64* output_indices, T* output_values, bool* empty_row_indicator, - int64* reverse_index_map) { - const int64 num_rows = input_shape[0]; - GPU_1D_KERNEL_LOOP(id_row, num_rows) { -#pragma unroll - for (int i = 0; i < input_row_offset[id_row + 1] - input_row_offset[id_row]; - i++) { - output_values[output_row_offset[id_row] + i] = - input_values[input_row_offset[id_row] + i]; - output_indices[2 * (output_row_offset[id_row] + i) + 0] = - id_row; // no need to read indices from input again; - output_indices[2 * (output_row_offset[id_row] + i) + 1] = - input_indices[2 * (input_row_offset[id_row] + i) + 1]; - if (reverse_index_map) { - reverse_index_map[input_row_offset[id_row] + i] = - output_row_offset[id_row] + i; - } - } - - // for empty rows - if (input_row_offset[id_row + 1] == input_row_offset[id_row]) { - // insert default value - output_values[output_row_offset[id_row]] = *default_value; - output_indices[2 * output_row_offset[id_row] + 0] = id_row; - output_indices[2 * output_row_offset[id_row] + 1] = 0; - - // mark as empty - if (empty_row_indicator) { - empty_row_indicator[id_row] = true; - } - } - } - return; -} - -namespace functor { -template -void SparseFillEmptyRowsGpuImpl(OpKernelContext* context, - const int64* input_indices, - const T* input_values, const int64 nnz, - const int64* input_shape, - const T* default_value) { - auto d = context->eigen_gpu_device(); - auto OpStream = d.stream(); - int64 dense_row_number; - - // get the dense shape, which is stored in GPU. - // If the dense shape is already in CPU, we don't need to do the copy here. - cudaMemcpyAsync(&dense_row_number, input_shape, sizeof(int64), - cudaMemcpyDeviceToHost, OpStream); - cudaStreamSynchronize(OpStream); - - // temp vector to store start index of each row - Tensor input_row_offset; - Tensor output_row_offset; - Tensor row_nnz_count; // temp buffer for the count kernel, count number of - // non-zero values on each row. - - // the size of input_row_offset and output_row_offset is dense_row_number+1, - // because we need one extra place to store the initial value of the offset 0 - OP_REQUIRES_OK(context, context->allocate_temp( - DT_INT64, TensorShape({dense_row_number + 1}), - &input_row_offset)); - - OP_REQUIRES_OK(context, context->allocate_temp( - DT_INT64, TensorShape({dense_row_number + 1}), - &output_row_offset)); - - OP_REQUIRES_OK( - context, context->allocate_temp( - // use DT_INT32 instead of DT_INT64, because CUDA atomic_add - // only support int32 - DT_INT32, TensorShape({dense_row_number}), &row_nnz_count)); - - cudaMemset(row_nnz_count.flat().data(), 0, - sizeof(int) * dense_row_number); - cudaMemset(input_row_offset.flat().data(), 0, sizeof(int64)); - cudaMemset(output_row_offset.flat().data(), 0, sizeof(int64)); - - // Get the number of rows in each row - GpuLaunchConfig count_kernel_config = GetGpuLaunchConfig(nnz, d); - TF_CHECK_OK(GpuLaunchKernel( - SparseFillEmptyRowCountKernel, count_kernel_config.block_count, - count_kernel_config.thread_per_block, 0, d.stream(), input_indices, nnz, - input_shape, row_nnz_count.flat().data(), - input_row_offset.flat().data(), - output_row_offset.flat().data())); - - /* Calculate the offset of each row of input - * example: the number of rows in each row: [3, 4, 0, 0, 6] - * the offset of each row of input: [0, 3, 7, 7, 7, 13] - */ - // Determine temporary device storage requirements for inclusive prefix sum - size_t temp_storage_bytes = 0; - cub::DeviceScan::InclusiveSum( - NULL, temp_storage_bytes, row_nnz_count.flat().data(), - input_row_offset.flat().data() + 1, dense_row_number); - - // Allocate temporary storage for inclusive prefix sum - Tensor temp_storage; - OP_REQUIRES_OK( - context, - context->allocate_temp( - DT_INT8, TensorShape({static_cast(temp_storage_bytes)}), - &temp_storage)); - void* d_temp_storage = temp_storage.flat().data(); - - // Run inclusive prefix sum - cub::DeviceScan::InclusiveSum( - d_temp_storage, temp_storage_bytes, row_nnz_count.flat().data(), - input_row_offset.flat().data() + 1, dense_row_number); - - /* Add 1 to the row whose row count is 0 - * example: the number of rows in each row(row_nnz_count): [3, 4, 0, 0, 6] - * row_nnz_count after the kernel: [3, 4, 1, 1, 6] - */ - GpuLaunchConfig add_kernel_config = GetGpuLaunchConfig(nnz, d); - TF_CHECK_OK(GpuLaunchKernel( - SparseFillEmptyRowAddOneKernel, count_kernel_config.block_count, - count_kernel_config.thread_per_block, 0, d.stream(), input_shape, - row_nnz_count.flat().data())); - - // Calculate the offset of each row of output - cub::DeviceScan::InclusiveSum( - d_temp_storage, temp_storage_bytes, row_nnz_count.flat().data(), - output_row_offset.flat().data() + 1, dense_row_number); - - // Read the output size from GPU, which is result of the first kernel. - // copy nnz + num_of_empty_row = output_nnz to CPU - int64 output_nnz; - cudaMemcpyAsync(&output_nnz, - output_row_offset.flat().data() + dense_row_number, - sizeof(int64), cudaMemcpyDeviceToHost, OpStream); - cudaStreamSynchronize(OpStream); - - // Allocate output tensors. - Tensor* output_indices; - Tensor* output_values; - OP_REQUIRES_OK(context, - context->allocate_output(0, TensorShape({output_nnz, 2}), - &output_indices)); - OP_REQUIRES_OK(context, context->allocate_output(1, TensorShape({output_nnz}), - &output_values)); - - bool* empty_row_indicator = nullptr; - if (context->output_required(2)) { - Tensor* empty_row_indicator_t = nullptr; - OP_REQUIRES_OK(context, - context->allocate_output(2, TensorShape({dense_row_number}), - &empty_row_indicator_t)); - empty_row_indicator = empty_row_indicator_t->vec().data(); - // assume row not empty first - cudaMemset(empty_row_indicator, false, sizeof(bool) * dense_row_number); - } - - int64* reverse_index_map = nullptr; - if (context->output_required(3)) { - Tensor* reverse_index_map_t = nullptr; - OP_REQUIRES_OK(context, context->allocate_output(3, TensorShape({nnz}), - &reverse_index_map_t)); - reverse_index_map = reverse_index_map_t->vec().data(); - } - - // Launch the second Kernel to move data and insert value to empty rows. - GpuLaunchConfig config = GetGpuLaunchConfig(dense_row_number, d); - TF_CHECK_OK(GpuLaunchKernel( - SparseFillEmptyRowFillKernel, config.block_count, - config.thread_per_block, 0, d.stream(), input_indices, input_values, - input_shape, default_value, input_row_offset.flat().data(), - output_row_offset.flat().data(), - output_indices->flat().data(), output_values->flat().data(), - empty_row_indicator, reverse_index_map)); -} - -template -struct SparseFillEmptyRowsFunctor { - void operator()(OpKernelContext* context) { - auto input_indices = context->input(0); - auto input_values = context->input(1); - auto input_shape = context->input(2); - auto default_value = context->input(3); - - const int64 nnz = input_indices.shape().dim_size(0); - - SparseFillEmptyRowsGpuImpl(context, input_indices.flat().data(), - input_values.flat().data(), nnz, - input_shape.flat().data(), - default_value.flat().data()); - } -}; - -#define DEFINE_GPU_KERNELS(type) \ - template struct SparseFillEmptyRowsFunctor; - -TF_CALL_int8(DEFINE_GPU_KERNELS); -TF_CALL_int32(DEFINE_GPU_KERNELS); -TF_CALL_half(DEFINE_GPU_KERNELS); -TF_CALL_float(DEFINE_GPU_KERNELS); -TF_CALL_int64(DEFINE_GPU_KERNELS); -} // namespace functor -} // namespace tensorflow - -#endif // GOOGLE_CUDA diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/sparse_fill_empty_rows_op.h b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/sparse_fill_empty_rows_op.h deleted file mode 100644 index 08b0bf1f9..000000000 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/sparse_fill_empty_rows_op.h +++ /dev/null @@ -1,38 +0,0 @@ -/* Copyright 2021 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#include -#include -#include -#include -#include - -#include "tensorflow/core/framework/op_kernel.h" -#include "tensorflow/core/framework/register_types.h" -#include "tensorflow/core/framework/tensor.h" -#include "tensorflow/core/framework/tensor_util.h" -#include "tensorflow/core/framework/types.h" -#include "tensorflow/core/lib/gtl/inlined_vector.h" - -namespace tensorflow { -namespace functor { - -template -struct SparseFillEmptyRowsFunctor { - void operator()(OpKernelContext* ctx); -}; - -} // namespace functor -} // namespace tensorflow diff --git a/tensorflow_recommenders_addons/dynamic_embedding/python/ops/math_ops.py b/tensorflow_recommenders_addons/dynamic_embedding/python/ops/math_ops.py index c9db07a44..ee693a849 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/python/ops/math_ops.py +++ b/tensorflow_recommenders_addons/dynamic_embedding/python/ops/math_ops.py @@ -186,59 +186,7 @@ def sparse_fill_empty_rows(sp_input, default_value, name=None): input row was empty. """ gpu_devices = config.list_physical_devices('GPU') - if gpu_devices: - if context.executing_eagerly(): - try: - return _sparse_fill_empty_rows_gpu(sp_input, default_value, name=name) - except errors.NotFoundError: - tf_logging.warn( - '`tfra.dynamic_embedding.math.sparse_fill_empty_rows` is not' - ' found. Use tf.sparse.fill_empty_rows instead.') - return tf.sparse.fill_empty_rows(sp_input, default_value, name=name) - - else: - predef = _sparse_fill_empty_rows_gpu(sp_input, default_value, name=name) - - use_origin = False - if predef[0].values.device == '': - tf_logging.warn( - 'SparseFillEmptyRows({}) has not been assigned device, ' - 'while GPU are available: {}, so use GPU by default.'.format( - predef[0].values.name, gpu_devices)) - else: - device_type = predef[0].values.device.split(':')[-2][-3:].lower() - if 'gpu' in device_type: - use_origin = True - - if use_origin: - return tf.sparse.fill_empty_rows(sp_input, default_value, name=name) - return predef - - else: - return tf.sparse.fill_empty_rows(sp_input, default_value, name=name) - - -def _sparse_fill_empty_rows_gpu(sp_input, default_value, name=None): - if not hasattr(tfra_math_ops, 'tfra_sparse_fill_empty_rows'): - tf_logging.warn( - '`tfra.dynamic_embedding.math.sparse_fill_empty_rows` is not' - ' found. Use tf.sparse.fill_empty_rows instead.') - return tf.sparse.fill_empty_rows(sp_input, default_value, name=name) - - sp_input = _convert_to_sparse_tensor(sp_input) - with ops.name_scope(name, "SparseFillEmptyRows", [sp_input]): - default_value = ops.convert_to_tensor(default_value, - dtype=sp_input.values.dtype) - (output_indices, output_values, empty_row_indicator, - unused_reverse_index_map) = tfra_math_ops.tfra_sparse_fill_empty_rows( - indices=sp_input.indices, - values=sp_input.values, - dense_shape=sp_input.dense_shape, - default_value=default_value) - return (sparse_tensor.SparseTensor(indices=output_indices, - values=output_values, - dense_shape=sp_input.dense_shape), - empty_row_indicator) + return tf.sparse.fill_empty_rows(sp_input, default_value, name=name) def sparse_reshape(sp_input, shape, name=None):