Skip to content

Commit

Permalink
[ML-305] Solve the upper limit of the array size as int max limit (#331)
Browse files Browse the repository at this point in the history
* update spark to 3.3.3

Signed-off-by: minmingzhu <[email protected]>

* 1. implement cNewDoubleArray, cNewFloatArray, cCopyDoubleArrayToNative, cCopyFloatArrayToNative in JNI
2. support float and double for all of algorithms
3. copy array to native ti fix scala array size limit int amx

Signed-off-by: minmingzhu <[email protected]>

* remove float data type, Spark supports data type was double.

Signed-off-by: minmingzhu <[email protected]>

* update

Signed-off-by: minmingzhu <[email protected]>

* update

Signed-off-by: minmingzhu <[email protected]>

* update

Signed-off-by: minmingzhu <[email protected]>

* update

Signed-off-by: minmingzhu <[email protected]>

* update

Signed-off-by: minmingzhu <[email protected]>

---------

Signed-off-by: minmingzhu <[email protected]>
  • Loading branch information
minmingzhu authored Sep 25, 2023
1 parent 0b981a8 commit 67747d0
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ public HomogenTable(long rowCount,
dataLayout, device);
}

public HomogenTable(long rowCount,
long colCount,
long dataAddress,
Common.DataType dataType,
Common.ComputeDevice device){
super();
// default
Common.DataLayout dataLayout = Common.DataLayout.ROW_MAJOR;
impl = new HomogenTableImpl(rowCount, colCount, dataAddress, dataType,
dataLayout, device);
}

public HomogenTable(long rowCount,
long colCount,
double[] data,
Expand All @@ -97,6 +109,18 @@ public HomogenTable(long rowCount,
impl = new HomogenTableImpl(rowCount, colCount, data,
dataLayout, device);
}

public HomogenTable(long rowCount,
long colCount,
long dataAddress,
Common.DataType dataType,
Common.DataLayout dataLayout,
Common.ComputeDevice device){
super();
impl = new HomogenTableImpl(rowCount, colCount, dataAddress, dataType,
dataLayout, device);
}

@Override
public Long getColumnCount() {
return impl.getColumnCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,25 @@ public HomogenTableImpl(long rowCount,

}

public HomogenTableImpl(long rowCount,
long colCount,
long dataPtr,
Common.DataType dataType,
Common.DataLayout dataLayout,
Common.ComputeDevice computeDevice) {
this.device = computeDevice;
switch (dataType) {
case FLOAT64:
this.cObject = dPtrInit(rowCount, colCount, dataPtr, dataLayout.ordinal(),
this.device.ordinal());
break;
default:
System.err.println("OAP MLlib currently only support input data of " +
"double type!");
System.exit(-1);
}
}

@Override
public long getColumnCount() {
return cGetColumnCount(this.cObject);
Expand Down Expand Up @@ -165,6 +184,13 @@ private native long lInit(long rowCount,
long[] data,
int dataLayoutIndex,
int computeDeviceIndex);

private native long dPtrInit(long rowCount,
long colCount,
long dataPtr,
int dataLayoutIndex,
int computeDeviceIndex);

private native long cGetColumnCount(long cObject);
private native long cGetRowCount(long cObject);
private native long cGetKind(long cObject);
Expand Down
43 changes: 43 additions & 0 deletions mllib-dal/src/main/native/OneDAL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ using namespace daal::data_management;
// Use oneDAL lib function
extern bool daal_check_is_intel_cpu();

// Define a global native array
typedef std::shared_ptr<double[]> NativeDoubleArrayPtr;

std::mutex g_amtx;
std::vector<NativeDoubleArrayPtr> g_NativeDoubleArrayPtrVector;

void saveDoubleArrayPtrToVector(const NativeDoubleArrayPtr &ptr) {
g_amtx.lock();
g_NativeDoubleArrayPtrVector.push_back(ptr);
g_amtx.unlock();
}

JNIEXPORT void JNICALL Java_com_intel_oap_mllib_OneDAL_00024_cAddNumericTable(
JNIEnv *, jobject, jlong rowMergedNumericTableAddr,
jlong numericTableAddr) {
Expand Down Expand Up @@ -155,3 +167,34 @@ Java_com_intel_oap_mllib_OneDAL_00024_cNewCSRNumericTableDouble(

return (jlong)ret;
}

/*
* Class: com_intel_oap_mllib_OneDAL__
* Method: cNewDoubleArray
* Signature: (J)J
*/
JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_OneDAL_00024_cNewDoubleArray(
JNIEnv *env, jobject, jlong size) {
std::cout << "create new native array size : " << size << std::endl;
NativeDoubleArrayPtr arrayPtr(new double[size],
[](double *ptr) { delete[] ptr; });
saveDoubleArrayPtrToVector(arrayPtr);
return (jlong)arrayPtr.get();
}

/*
* Class: com_intel_oap_mllib_OneDAL__
* Method: cCopyDoubleArrayToNative
* Signature: (J[DJ)V
*/
JNIEXPORT void JNICALL
Java_com_intel_oap_mllib_OneDAL_00024_cCopyDoubleArrayToNative(
JNIEnv *env, jobject, jlong nativeArrayPtr, jdoubleArray sourceArray,
jlong index) {
double *nativeArray = reinterpret_cast<double *>(nativeArrayPtr);
jsize sourceLength = env->GetArrayLength(sourceArray);
jdouble *source = static_cast<jdouble *>(
env->GetPrimitiveArrayCritical(sourceArray, NULL));
std::copy(source, source + sourceLength, nativeArray + index);
env->ReleasePrimitiveArrayCritical(sourceArray, source, 0);
}
16 changes: 16 additions & 0 deletions mllib-dal/src/main/native/javah/com_intel_oap_mllib_OneDAL__.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions mllib-dal/src/main/native/oneapi/dal/HomogenTableImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,47 @@ JNIEXPORT jlong JNICALL Java_com_intel_oneapi_dal_table_HomogenTableImpl_lInit(
return (jlong)tablePtr.get();
}

/*
* Class: com_intel_oneapi_dal_table_HomogenTableImpl
* Method: Init
* Signature: (JJJIII)J
*/
JNIEXPORT jlong JNICALL Java_com_intel_oneapi_dal_table_HomogenTableImpl_dPtrInit(
JNIEnv *env, jobject, jlong cRowCount, jlong cColCount, jlong dataPtr, jint cLayout, jint computeDeviceOrdinal) {
printf("HomogenTable dPtrInit\n");
double *fData = reinterpret_cast<double *>(dataPtr);
if (fData == NULL) {
std::cout << "Error: unable to obtain critical array" << std::endl;
exit(-1);
}
const std::vector<sycl::event> dependencies = {};
HomogenTablePtr tablePtr;
ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal);
switch(device) {
case ComputeDevice::host:{
tablePtr = std::make_shared<homogen_table>(fData, cRowCount, cColCount,
detail::empty_delete<const double>(),
getDataLayout(cLayout));
break;
}
case ComputeDevice::cpu:
case ComputeDevice::gpu:{
auto queue = getQueue(device);
auto data = sycl::malloc_shared<double>(cRowCount * cColCount, queue);
queue.memcpy(data, fData, sizeof(double) * cRowCount * cColCount).wait();
tablePtr = std::make_shared<homogen_table>(queue, data, cRowCount, cColCount,
detail::make_default_delete<const double>(queue),
dependencies, getDataLayout(cLayout));
break;
}
default: {
deviceError();
}
}
saveHomogenTablePtrToVector(tablePtr);
return (jlong)tablePtr.get();
}

/*
* Class: com_intel_oneapi_dal_table_HomogenTableImpl
* Method: cGetColumnCount
Expand Down
40 changes: 24 additions & 16 deletions mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala
Original file line number Diff line number Diff line change
Expand Up @@ -444,26 +444,27 @@ object OneDAL {
val coalescedTables = coalescedRdd.mapPartitionsWithIndex { (index: Int, it: Iterator[Row]) =>
val list = it.toList
val subRowCount: Int = list.size / numberCores
val labeledPointsList: ListBuffer[Future[(Array[Double], Array[Double])]] =
new ListBuffer[Future[(Array[Double], Array[Double])]]()
val labeledPointsList: ListBuffer[Future[(Array[Double], Long)]] =
new ListBuffer[Future[(Array[Double], Long)]]()
val numRows = list.size
val numCols = list(0).getAs[Vector](1).toArray.size

val labelsArray = new Array[Double](numRows)
val featuresArray = new Array[Double](numRows * numCols)
for ( i <- 0 until numberCores) {
val featuresAddress= OneDAL.cNewDoubleArray(numRows.toLong * numCols)
for ( i <- 0 until numberCores) {
val f = Future {
val iter = list.iterator
val slice = if (i == numberCores - 1) {
iter.slice(subRowCount * i, numRows * numCols)
iter.slice(subRowCount * i, numRows)
} else {
iter.slice(subRowCount * i, subRowCount * i + subRowCount)
}
slice.toArray.zipWithIndex.map { case (row, index) =>
val length = row.getAs[Vector](1).toArray.length
System.arraycopy(row.getAs[Vector](1).toArray, 0, featuresArray, subRowCount * numCols * i + length * index, length)
OneDAL.cCopyDoubleArrayToNative(featuresAddress, row.getAs[Vector](1).toArray, subRowCount.toLong * numCols * i + length * index)
labelsArray(subRowCount * i + index) = row.getAs[Double](0)
}
(labelsArray, featuresArray)
(labelsArray, featuresAddress)
}
labeledPointsList += f

Expand All @@ -472,7 +473,7 @@ object OneDAL {
}
val labelsTable = new HomogenTable(numRows.toLong, 1, labelsArray,
device)
val featuresTable = new HomogenTable(numRows.toLong, numCols.toLong, featuresArray,
val featuresTable = new HomogenTable(numRows.toLong, numCols.toLong, featuresAddress, Common.DataType.FLOAT64,
device)

Iterator((featuresTable.getcObejct(), labelsTable.getcObejct()))
Expand Down Expand Up @@ -618,31 +619,32 @@ object OneDAL {
val coalescedTables = coalescedRdd.mapPartitionsWithIndex { (index: Int, it: Iterator[Vector]) =>
val list = it.toList
val subRowCount: Int = list.size / numberCores
val futureList: ListBuffer[Future[Array[Double]]] = new ListBuffer[Future[Array[Double]]]()
val futureList: ListBuffer[Future[Long]] = new ListBuffer[Future[Long]]()
val numRows = list.size
val numCols = list(0).toArray.size
val targetArray = new Array[Double](numRows * numCols)
for ( i <- 0 until numberCores) {
val size = numRows.toLong * numCols.toLong
val targetArrayAddress = OneDAL.cNewDoubleArray(size)
for ( i <- 0 until numberCores) {
val f = Future {
val iter = list.iterator
val slice = if (i == numberCores - 1) {
iter.slice(subRowCount * i, numRows * numCols)
iter.slice(subRowCount * i, numRows)
} else {
iter.slice(subRowCount * i, subRowCount * i + subRowCount)
}
slice.toArray.zipWithIndex.map { case (vector, index) =>
val length = vector.toArray.length
System.arraycopy(vector.toArray, 0, targetArray, subRowCount * numCols * i + length * index, length)
OneDAL.cCopyDoubleArrayToNative(targetArrayAddress, vector.toArray, subRowCount.toLong * numCols * i + length * index)
}
targetArray
targetArrayAddress
}
futureList += f

val result = Future.sequence(futureList)
Await.result(result, Duration.Inf)
}
val table = new HomogenTable(numRows.toLong, numCols.toLong, targetArray,
device)
val table = new HomogenTable(numRows.toLong, numCols.toLong, targetArrayAddress,
Common.DataType.FLOAT64, device)

Iterator(table.getcObejct())
}.setName("coalescedTables").cache()
Expand Down Expand Up @@ -750,4 +752,10 @@ object OneDAL {
@native def cNewCSRNumericTableDouble(data: Array[Double],
colIndices: Array[Long], rowOffsets: Array[Long],
nFeatures: Long, nVectors: Long): Long

@native def cNewDoubleArray(size: Long): Long

@native def cCopyDoubleArrayToNative(arrayAddr: Long,
data: Array[Double],
index: Long): Unit
}

0 comments on commit 67747d0

Please sign in to comment.