From 67747d01c460a161d2cfb3a1234bbcb691055b79 Mon Sep 17 00:00:00 2001 From: minmingzhu <45281494+minmingzhu@users.noreply.github.com> Date: Mon, 25 Sep 2023 16:45:28 +0800 Subject: [PATCH] [ML-305] Solve the upper limit of the array size as int max limit (#331) * update spark to 3.3.3 Signed-off-by: minmingzhu * 1. implement cNewDoubleArray, cNewFloatArray, cCopyDoubleArrayToNative, cCopyFloatArrayToNative in JNI 2. support float and double for all of algorithms 3. copy array to native ti fix scala array size limit int amx Signed-off-by: minmingzhu * remove float data type, Spark supports data type was double. Signed-off-by: minmingzhu * update Signed-off-by: minmingzhu * update Signed-off-by: minmingzhu * update Signed-off-by: minmingzhu * update Signed-off-by: minmingzhu * update Signed-off-by: minmingzhu --------- Signed-off-by: minmingzhu --- .../intel/oneapi/dal/table/HomogenTable.java | 24 +++++++++++ .../oneapi/dal/table/HomogenTableImpl.java | 26 +++++++++++ mllib-dal/src/main/native/OneDAL.cpp | 43 +++++++++++++++++++ .../javah/com_intel_oap_mllib_OneDAL__.h | 16 +++++++ ..._intel_oneapi_dal_table_HomogenTableImpl.h | 8 ++++ .../native/oneapi/dal/HomogenTableImpl.cpp | 41 ++++++++++++++++++ .../scala/com/intel/oap/mllib/OneDAL.scala | 40 ++++++++++------- 7 files changed, 182 insertions(+), 16 deletions(-) diff --git a/mllib-dal/src/main/java/com/intel/oneapi/dal/table/HomogenTable.java b/mllib-dal/src/main/java/com/intel/oneapi/dal/table/HomogenTable.java index 60263d483..f95a05cb8 100644 --- a/mllib-dal/src/main/java/com/intel/oneapi/dal/table/HomogenTable.java +++ b/mllib-dal/src/main/java/com/intel/oneapi/dal/table/HomogenTable.java @@ -88,6 +88,18 @@ public HomogenTable(long rowCount, dataLayout, device); } + public HomogenTable(long rowCount, + long colCount, + long dataAddress, + Common.DataType dataType, + Common.ComputeDevice device){ + super(); + // default + Common.DataLayout dataLayout = Common.DataLayout.ROW_MAJOR; + impl = new HomogenTableImpl(rowCount, colCount, dataAddress, dataType, + dataLayout, device); + } + public HomogenTable(long rowCount, long colCount, double[] data, @@ -97,6 +109,18 @@ public HomogenTable(long rowCount, impl = new HomogenTableImpl(rowCount, colCount, data, dataLayout, device); } + + public HomogenTable(long rowCount, + long colCount, + long dataAddress, + Common.DataType dataType, + Common.DataLayout dataLayout, + Common.ComputeDevice device){ + super(); + impl = new HomogenTableImpl(rowCount, colCount, dataAddress, dataType, + dataLayout, device); + } + @Override public Long getColumnCount() { return impl.getColumnCount(); diff --git a/mllib-dal/src/main/java/com/intel/oneapi/dal/table/HomogenTableImpl.java b/mllib-dal/src/main/java/com/intel/oneapi/dal/table/HomogenTableImpl.java index 1e8b317aa..d7d46232f 100644 --- a/mllib-dal/src/main/java/com/intel/oneapi/dal/table/HomogenTableImpl.java +++ b/mllib-dal/src/main/java/com/intel/oneapi/dal/table/HomogenTableImpl.java @@ -65,6 +65,25 @@ public HomogenTableImpl(long rowCount, } + public HomogenTableImpl(long rowCount, + long colCount, + long dataPtr, + Common.DataType dataType, + Common.DataLayout dataLayout, + Common.ComputeDevice computeDevice) { + this.device = computeDevice; + switch (dataType) { + case FLOAT64: + this.cObject = dPtrInit(rowCount, colCount, dataPtr, dataLayout.ordinal(), + this.device.ordinal()); + break; + default: + System.err.println("OAP MLlib currently only support input data of " + + "double type!"); + System.exit(-1); + } + } + @Override public long getColumnCount() { return cGetColumnCount(this.cObject); @@ -165,6 +184,13 @@ private native long lInit(long rowCount, long[] data, int dataLayoutIndex, int computeDeviceIndex); + + private native long dPtrInit(long rowCount, + long colCount, + long dataPtr, + int dataLayoutIndex, + int computeDeviceIndex); + private native long cGetColumnCount(long cObject); private native long cGetRowCount(long cObject); private native long cGetKind(long cObject); diff --git a/mllib-dal/src/main/native/OneDAL.cpp b/mllib-dal/src/main/native/OneDAL.cpp index ff2323031..4874f895c 100644 --- a/mllib-dal/src/main/native/OneDAL.cpp +++ b/mllib-dal/src/main/native/OneDAL.cpp @@ -26,6 +26,18 @@ using namespace daal::data_management; // Use oneDAL lib function extern bool daal_check_is_intel_cpu(); +// Define a global native array +typedef std::shared_ptr NativeDoubleArrayPtr; + +std::mutex g_amtx; +std::vector g_NativeDoubleArrayPtrVector; + +void saveDoubleArrayPtrToVector(const NativeDoubleArrayPtr &ptr) { + g_amtx.lock(); + g_NativeDoubleArrayPtrVector.push_back(ptr); + g_amtx.unlock(); +} + JNIEXPORT void JNICALL Java_com_intel_oap_mllib_OneDAL_00024_cAddNumericTable( JNIEnv *, jobject, jlong rowMergedNumericTableAddr, jlong numericTableAddr) { @@ -155,3 +167,34 @@ Java_com_intel_oap_mllib_OneDAL_00024_cNewCSRNumericTableDouble( return (jlong)ret; } + +/* + * Class: com_intel_oap_mllib_OneDAL__ + * Method: cNewDoubleArray + * Signature: (J)J + */ +JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_OneDAL_00024_cNewDoubleArray( + JNIEnv *env, jobject, jlong size) { + std::cout << "create new native array size : " << size << std::endl; + NativeDoubleArrayPtr arrayPtr(new double[size], + [](double *ptr) { delete[] ptr; }); + saveDoubleArrayPtrToVector(arrayPtr); + return (jlong)arrayPtr.get(); +} + +/* + * Class: com_intel_oap_mllib_OneDAL__ + * Method: cCopyDoubleArrayToNative + * Signature: (J[DJ)V + */ +JNIEXPORT void JNICALL +Java_com_intel_oap_mllib_OneDAL_00024_cCopyDoubleArrayToNative( + JNIEnv *env, jobject, jlong nativeArrayPtr, jdoubleArray sourceArray, + jlong index) { + double *nativeArray = reinterpret_cast(nativeArrayPtr); + jsize sourceLength = env->GetArrayLength(sourceArray); + jdouble *source = static_cast( + env->GetPrimitiveArrayCritical(sourceArray, NULL)); + std::copy(source, source + sourceLength, nativeArray + index); + env->ReleasePrimitiveArrayCritical(sourceArray, source, 0); +} diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_OneDAL__.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_OneDAL__.h index bf1a4388d..c3c53c532 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_OneDAL__.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_OneDAL__.h @@ -63,6 +63,22 @@ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_OneDAL_00024_cNewCSRNumericTabl JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_OneDAL_00024_cNewCSRNumericTableDouble (JNIEnv *, jobject, jdoubleArray, jlongArray, jlongArray, jlong, jlong); +/* + * Class: com_intel_oap_mllib_OneDAL__ + * Method: cNewDoubleArray + * Signature: (J)J + */ +JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_OneDAL_00024_cNewDoubleArray + (JNIEnv *, jobject, jlong); + +/* + * Class: com_intel_oap_mllib_OneDAL__ + * Method: cCopyDoubleArrayToNative + * Signature: (J[DJ)V + */ +JNIEXPORT void JNICALL Java_com_intel_oap_mllib_OneDAL_00024_cCopyDoubleArrayToNative + (JNIEnv *, jobject, jlong, jdoubleArray, jlong); + #ifdef __cplusplus } #endif diff --git a/mllib-dal/src/main/native/javah/com_intel_oneapi_dal_table_HomogenTableImpl.h b/mllib-dal/src/main/native/javah/com_intel_oneapi_dal_table_HomogenTableImpl.h index eff8517d4..046f8d84e 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oneapi_dal_table_HomogenTableImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oneapi_dal_table_HomogenTableImpl.h @@ -39,6 +39,14 @@ JNIEXPORT jlong JNICALL Java_com_intel_oneapi_dal_table_HomogenTableImpl_dInit JNIEXPORT jlong JNICALL Java_com_intel_oneapi_dal_table_HomogenTableImpl_lInit (JNIEnv *, jobject, jlong, jlong, jlongArray, jint, jint); +/* + * Class: com_intel_oneapi_dal_table_HomogenTableImpl + * Method: dPtrInit + * Signature: (JJJII)J + */ +JNIEXPORT jlong JNICALL Java_com_intel_oneapi_dal_table_HomogenTableImpl_dPtrInit + (JNIEnv *, jobject, jlong, jlong, jlong, jint, jint); + /* * Class: com_intel_oneapi_dal_table_HomogenTableImpl * Method: cGetColumnCount diff --git a/mllib-dal/src/main/native/oneapi/dal/HomogenTableImpl.cpp b/mllib-dal/src/main/native/oneapi/dal/HomogenTableImpl.cpp index 69149c106..17a7940af 100644 --- a/mllib-dal/src/main/native/oneapi/dal/HomogenTableImpl.cpp +++ b/mllib-dal/src/main/native/oneapi/dal/HomogenTableImpl.cpp @@ -293,6 +293,47 @@ JNIEXPORT jlong JNICALL Java_com_intel_oneapi_dal_table_HomogenTableImpl_lInit( return (jlong)tablePtr.get(); } +/* + * Class: com_intel_oneapi_dal_table_HomogenTableImpl + * Method: Init + * Signature: (JJJIII)J + */ +JNIEXPORT jlong JNICALL Java_com_intel_oneapi_dal_table_HomogenTableImpl_dPtrInit( + JNIEnv *env, jobject, jlong cRowCount, jlong cColCount, jlong dataPtr, jint cLayout, jint computeDeviceOrdinal) { + printf("HomogenTable dPtrInit\n"); + double *fData = reinterpret_cast(dataPtr); + if (fData == NULL) { + std::cout << "Error: unable to obtain critical array" << std::endl; + exit(-1); + } + const std::vector dependencies = {}; + HomogenTablePtr tablePtr; + ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); + switch(device) { + case ComputeDevice::host:{ + tablePtr = std::make_shared(fData, cRowCount, cColCount, + detail::empty_delete(), + getDataLayout(cLayout)); + break; + } + case ComputeDevice::cpu: + case ComputeDevice::gpu:{ + auto queue = getQueue(device); + auto data = sycl::malloc_shared(cRowCount * cColCount, queue); + queue.memcpy(data, fData, sizeof(double) * cRowCount * cColCount).wait(); + tablePtr = std::make_shared(queue, data, cRowCount, cColCount, + detail::make_default_delete(queue), + dependencies, getDataLayout(cLayout)); + break; + } + default: { + deviceError(); + } + } + saveHomogenTablePtrToVector(tablePtr); + return (jlong)tablePtr.get(); +} + /* * Class: com_intel_oneapi_dal_table_HomogenTableImpl * Method: cGetColumnCount diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala index 8458b8060..9a925fb92 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala @@ -444,26 +444,27 @@ object OneDAL { val coalescedTables = coalescedRdd.mapPartitionsWithIndex { (index: Int, it: Iterator[Row]) => val list = it.toList val subRowCount: Int = list.size / numberCores - val labeledPointsList: ListBuffer[Future[(Array[Double], Array[Double])]] = - new ListBuffer[Future[(Array[Double], Array[Double])]]() + val labeledPointsList: ListBuffer[Future[(Array[Double], Long)]] = + new ListBuffer[Future[(Array[Double], Long)]]() val numRows = list.size val numCols = list(0).getAs[Vector](1).toArray.size + val labelsArray = new Array[Double](numRows) - val featuresArray = new Array[Double](numRows * numCols) - for ( i <- 0 until numberCores) { + val featuresAddress= OneDAL.cNewDoubleArray(numRows.toLong * numCols) + for ( i <- 0 until numberCores) { val f = Future { val iter = list.iterator val slice = if (i == numberCores - 1) { - iter.slice(subRowCount * i, numRows * numCols) + iter.slice(subRowCount * i, numRows) } else { iter.slice(subRowCount * i, subRowCount * i + subRowCount) } slice.toArray.zipWithIndex.map { case (row, index) => val length = row.getAs[Vector](1).toArray.length - System.arraycopy(row.getAs[Vector](1).toArray, 0, featuresArray, subRowCount * numCols * i + length * index, length) + OneDAL.cCopyDoubleArrayToNative(featuresAddress, row.getAs[Vector](1).toArray, subRowCount.toLong * numCols * i + length * index) labelsArray(subRowCount * i + index) = row.getAs[Double](0) } - (labelsArray, featuresArray) + (labelsArray, featuresAddress) } labeledPointsList += f @@ -472,7 +473,7 @@ object OneDAL { } val labelsTable = new HomogenTable(numRows.toLong, 1, labelsArray, device) - val featuresTable = new HomogenTable(numRows.toLong, numCols.toLong, featuresArray, + val featuresTable = new HomogenTable(numRows.toLong, numCols.toLong, featuresAddress, Common.DataType.FLOAT64, device) Iterator((featuresTable.getcObejct(), labelsTable.getcObejct())) @@ -618,31 +619,32 @@ object OneDAL { val coalescedTables = coalescedRdd.mapPartitionsWithIndex { (index: Int, it: Iterator[Vector]) => val list = it.toList val subRowCount: Int = list.size / numberCores - val futureList: ListBuffer[Future[Array[Double]]] = new ListBuffer[Future[Array[Double]]]() + val futureList: ListBuffer[Future[Long]] = new ListBuffer[Future[Long]]() val numRows = list.size val numCols = list(0).toArray.size - val targetArray = new Array[Double](numRows * numCols) - for ( i <- 0 until numberCores) { + val size = numRows.toLong * numCols.toLong + val targetArrayAddress = OneDAL.cNewDoubleArray(size) + for ( i <- 0 until numberCores) { val f = Future { val iter = list.iterator val slice = if (i == numberCores - 1) { - iter.slice(subRowCount * i, numRows * numCols) + iter.slice(subRowCount * i, numRows) } else { iter.slice(subRowCount * i, subRowCount * i + subRowCount) } slice.toArray.zipWithIndex.map { case (vector, index) => val length = vector.toArray.length - System.arraycopy(vector.toArray, 0, targetArray, subRowCount * numCols * i + length * index, length) + OneDAL.cCopyDoubleArrayToNative(targetArrayAddress, vector.toArray, subRowCount.toLong * numCols * i + length * index) } - targetArray + targetArrayAddress } futureList += f val result = Future.sequence(futureList) Await.result(result, Duration.Inf) } - val table = new HomogenTable(numRows.toLong, numCols.toLong, targetArray, - device) + val table = new HomogenTable(numRows.toLong, numCols.toLong, targetArrayAddress, + Common.DataType.FLOAT64, device) Iterator(table.getcObejct()) }.setName("coalescedTables").cache() @@ -750,4 +752,10 @@ object OneDAL { @native def cNewCSRNumericTableDouble(data: Array[Double], colIndices: Array[Long], rowOffsets: Array[Long], nFeatures: Long, nVectors: Long): Long + + @native def cNewDoubleArray(size: Long): Long + + @native def cCopyDoubleArrayToNative(arrayAddr: Long, + data: Array[Double], + index: Long): Unit }