Skip to content

Commit

Permalink
Merge pull request #6378 from NVIDIA/branch-22.08
Browse files Browse the repository at this point in the history
Merge Branch 22.08 to main [skip ci]
  • Loading branch information
pxLi authored Aug 22, 2022
2 parents a2f8627 + a078b94 commit 082f792
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 1,018 deletions.
8 changes: 3 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Change log
Generated on 2022-08-18
Generated on 2022-08-20

## Release 22.08

Expand Down Expand Up @@ -50,10 +50,6 @@ Generated on 2022-08-18
|||
|:---|:---|
|[#6060](https://github.com/NVIDIA/spark-rapids/issues/6060)|[FEA] Add experimental multi-threaded BypassMergeSortShuffleWriter|
|[#5636](https://github.com/NVIDIA/spark-rapids/issues/5636)|[FEA] Update GeneratedInternalRowToCudfRowIterator for string transitions|
|[#5633](https://github.com/NVIDIA/spark-rapids/issues/5633)|[FEA] Enable Strings as a supported type for GpuColumnarToRow transitions|
|[#5634](https://github.com/NVIDIA/spark-rapids/issues/5634)|[FEA] Update CudfUnsafeRow to include size estimates for strings and implementation for getUTF8String|
|[#5635](https://github.com/NVIDIA/spark-rapids/issues/5635)|[FEA] Update AcceleratedColumnarToRowIterator to support strings|
|[#5453](https://github.com/NVIDIA/spark-rapids/issues/5453)|[FEA] Support runtime filters for BatchScanExec|
|[#5075](https://github.com/NVIDIA/spark-rapids/issues/5075)|Performance can be very slow when reading just a few columns out of many on parquet|
|[#5624](https://github.com/NVIDIA/spark-rapids/issues/5624)|[FEA] Let CPU handle Delta table's metadata related queries|
Expand Down Expand Up @@ -140,6 +136,8 @@ Generated on 2022-08-18
### PRs
|||
|:---|:---|
|[#6367](https://github.com/NVIDIA/spark-rapids/pull/6367)|Revert "Enable Strings as a supported type for GpuColumnarToRow transitions"|
|[#6354](https://github.com/NVIDIA/spark-rapids/pull/6354)|Update 22.08 changelog to latest [skip ci]|
|[#6348](https://github.com/NVIDIA/spark-rapids/pull/6348)|Update plugin jni version to released 22.08.0|
|[#6234](https://github.com/NVIDIA/spark-rapids/pull/6234)|[Doc] Add 22.08 docs' links [skip ci]|
|[#6288](https://github.com/NVIDIA/spark-rapids/pull/6288)|CPU fallback for Map scalars with key vectors|
Expand Down
45 changes: 0 additions & 45 deletions integration_tests/src/main/python/row_conversion_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,48 +88,3 @@ def test_host_columnar_transition(spark_tmp_path, data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.parquet(data_path).filter("a IS NOT NULL"),
conf={ 'spark.rapids.sql.exec.FileSourceScanExec' : 'false'})

# This is one of the most basic tests where we verify that we can
# move data onto and off of the GPU when the schema is variable width (no nulls).
def test_row_conversions_var_width_basic():
def do_it(spark):
schema = StructType([StructField("col_00_int", IntegerType(), nullable=False),
StructField("col_01_str", StringType(), nullable=False),
StructField("col_02_int", IntegerType(), nullable=False),
StructField("col_03_str", StringType(), nullable=False)])
df = spark.createDataFrame([(1, "string_val_00", 2, "string_val_01"),
(3, "string_val_10", 4, "string_val_11")],
schema=schema).selectExpr("*", "col_00_int as 1st_column")
return df
assert_gpu_and_cpu_are_equal_collect(lambda spark : do_it(spark))

# This is one of the tests where we verify that we can move data onto and off of the GPU when the
# schema is variable width. Note that the supported variable width types (i.e., string)
# are scattered so that the test covers packing, which is where columns are reordered for smaller
# data size by placing columns with the same alignment requirements next to each other.
def test_row_conversions_var_width():
gens = [["a", byte_gen], ["b", short_gen], ["c", int_gen], ["d", long_gen],
["e", float_gen], ["f", double_gen], ["g", boolean_gen], ["h", string_gen],
["i", timestamp_gen], ["j", date_gen], ["k", string_gen], ["l", decimal_gen_64bit],
["m", decimal_gen_32bit], ["n", string_gen]]
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, gens).selectExpr("*", "a as a_again"))

def test_row_conversions_var_width_wide():
gens = [["a{}".format(i), ByteGen(nullable=True)] for i in range(10)] + \
[["b{}".format(i), ShortGen(nullable=True)] for i in range(10)] + \
[["c{}".format(i), IntegerGen(nullable=True)] for i in range(10)] + \
[["d{}".format(i), LongGen(nullable=True)] for i in range(10)] + \
[["e{}".format(i), FloatGen(nullable=True)] for i in range(10)] + \
[["f{}".format(i), DoubleGen(nullable=True)] for i in range(10)] + \
[["g{}".format(i), StringGen(nullable=True)] for i in range(5)] + \
[["h{}".format(i), BooleanGen(nullable=True)] for i in range(10)] + \
[["i{}".format(i), StringGen(nullable=True)] for i in range(5)] + \
[["j{}".format(i), TimestampGen(nullable=True)] for i in range(10)] + \
[["k{}".format(i), DateGen(nullable=True)] for i in range(10)] + \
[["l{}".format(i), DecimalGen(precision=12, scale=2, nullable=True)] for i in range(10)] + \
[["m{}".format(i), DecimalGen(precision=7, scale=3, nullable=True)] for i in range(10)]
def do_it(spark):
df=gen_df(spark, gens, length=1).selectExpr("*", "a0 as a_again")
return df
assert_gpu_and_cpu_are_equal_collect(do_it)
85 changes: 44 additions & 41 deletions sql-plugin/src/main/java/com/nvidia/spark/rapids/CudfUnsafeRow.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,7 +44,26 @@
* UnsafeRow works.
*/
public final class CudfUnsafeRow extends InternalRow {
public static int alignOffset(int offset, int alignment) {
return (offset + alignment - 1) & -alignment;
}

public static int calculateBitSetWidthInBytes(int numFields) {
return (numFields + 7)/ 8;
}

public static int getRowSizeEstimate(Attribute[] attributes) {
// This needs to match what is in cudf and what is in the constructor.
int offset = 0;
for (Attribute attr : attributes) {
int length = GpuColumnVector.getNonNestedRapidsType(attr.dataType()).getSizeInBytes();
offset = alignOffset(offset, length);
offset += length;
}
int bitSetWidthInBytes = calculateBitSetWidthInBytes(attributes.length);
// Each row is 64-bit aligned
return alignOffset(offset + bitSetWidthInBytes, 8);
}

//////////////////////////////////////////////////////////////////////////////
// Private fields and methods
Expand All @@ -56,15 +75,15 @@ public final class CudfUnsafeRow extends InternalRow {
private long address;

/**
* For each column the starting location to read from. The index is the position in
* the row bytes, not the user facing ordinal.
* For each column the starting location to read from. The index to the is the position in
* the row bytes, not the user faceing ordinal.
*/
private int[] startOffsets;

/**
* At what point validity data starts from the beginning of a row's data.
* At what point validity data starts.
*/
private int validityOffsetInBytes;
private int fixedWidthSizeInBytes;

/**
* The size of this row's backing data, in bytes.
Expand All @@ -76,8 +95,6 @@ public final class CudfUnsafeRow extends InternalRow {
*/
private int[] remapping;

private boolean variableWidthSchema;

/**
* Get the address where a field is stored.
* @param ordinal the user facing ordinal.
Expand Down Expand Up @@ -114,11 +131,17 @@ private void assertIndexIsValid(int index) {
* backing row.
*/
public CudfUnsafeRow(Attribute[] attributes, int[] remapping) {
int offset = 0;
startOffsets = new int[attributes.length];
JCudfUtil.RowOffsetsCalculator jCudfBuilder =
JCudfUtil.getRowOffsetsCalculator(attributes, startOffsets);
this.validityOffsetInBytes = jCudfBuilder.getValidityBytesOffset();
this.variableWidthSchema = jCudfBuilder.hasVarSizeData();
for (int i = 0; i < attributes.length; i++) {
Attribute attr = attributes[i];
int length = GpuColumnVector.getNonNestedRapidsType(attr.dataType()).getSizeInBytes();
assert length > 0 : "Only fixed width types are currently supported.";
offset = alignOffset(offset, length);
startOffsets[i] = offset;
offset += length;
}
fixedWidthSizeInBytes = offset;
this.remapping = remapping;
assert startOffsets.length == remapping.length;
}
Expand Down Expand Up @@ -160,7 +183,7 @@ public boolean isNullAt(int ordinal) {
assertIndexIsValid(i);
int validByteIndex = i / 8;
int validBitIndex = i % 8;
byte b = Platform.getByte(null, address + validityOffsetInBytes + validByteIndex);
byte b = Platform.getByte(null, address + fixedWidthSizeInBytes + validByteIndex);
return ((1 << validBitIndex) & b) == 0;
}

Expand All @@ -170,9 +193,9 @@ public void setNullAt(int ordinal) {
assertIndexIsValid(i);
int validByteIndex = i / 8;
int validBitIndex = i % 8;
byte b = Platform.getByte(null, address + validityOffsetInBytes + validByteIndex);
byte b = Platform.getByte(null, address + fixedWidthSizeInBytes + validByteIndex);
b = (byte)((b & ~(1 << validBitIndex)) & 0xFF);
Platform.putByte(null, address + validityOffsetInBytes + validByteIndex, b);
Platform.putByte(null, address + fixedWidthSizeInBytes + validByteIndex, b);
}

@Override
Expand Down Expand Up @@ -230,15 +253,12 @@ public Decimal getDecimal(int ordinal, int precision, int scale) {

@Override
public UTF8String getUTF8String(int ordinal) {
if (isNullAt(ordinal)) {
return null;
}
final long columnOffset = getFieldAddressFromOrdinal(ordinal);
// data format for the fixed-width portion of variable-width data is 4 bytes of offset from the
// start of the row followed by 4 bytes of length.
final int offset = Platform.getInt(null, columnOffset);
final int size = Platform.getInt(null, columnOffset + 4);
return UTF8String.fromAddress(null, address + offset, size);
// if (isNullAt(ordinal)) return null;
// final long offsetAndSize = getLong(ordinal);
// final int offset = (int) (offsetAndSize >> 32);
// final int size = (int) offsetAndSize;
// return UTF8String.fromAddress(null, address + offset, size);
throw new IllegalArgumentException("NOT IMPLEMENTED YET");
}

@Override
Expand Down Expand Up @@ -377,21 +397,4 @@ public boolean anyNull() {
throw new IllegalArgumentException("NOT IMPLEMENTED YET");
// return BitSetMethods.anySet(baseObject, address, bitSetWidthInBytes / 8);
}

public boolean isVariableWidthSchema() {
return variableWidthSchema;
}

public int getValidityOffsetInBytes() {
return validityOffsetInBytes;
}

/**
* Calculates the offset of the variable width section.
* This can be used to get the offset of the variable-width data. Note that the data-offset is 1-byte aligned.
* @return Total bytes used by the fixed width offsets and the validity bytes without row-alignment.
*/
public int getFixedWidthInBytes() {
return getValidityOffsetInBytes() + JCudfUtil.calculateBitSetWidthInBytes(numFields());
}
}
}
Loading

0 comments on commit 082f792

Please sign in to comment.