-
Notifications
You must be signed in to change notification settings - Fork 0
/
SparkUtil.sc
52 lines (45 loc) · 1.59 KB
/
SparkUtil.sc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import org.apache.spark.mllib.classification.ClassificationModel
import org.apache.spark.mllib.evaluation.{BinaryClassificationMetrics, RegressionMetrics}
import org.apache.spark.mllib.regression.{LabeledPoint, RegressionModel}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
/*
Return:
Array[(train set, test set)], length = nFold
*/
def stratifiedKFold(d: RDD[LabeledPoint], nFold: Int): Array[(RDD[LabeledPoint], RDD[LabeledPoint])] = {
val keys = d.map(_.label).distinct.collect
keys.map{ key =>
d.filter(k => k.label == key)
} map { dl =>
MLUtils.kFold(dl, nFold, scala.util.Random.nextInt())
} reduce { (a1, a2) =>
a1 zip a2 map { case (a1t, a2t) =>
(a1t._1 union a2t._1, a1t._2 union a2t._2)
}
}
}
def cvCls(arr: Array[(RDD[LabeledPoint], RDD[LabeledPoint])],
classifier: { def run(input: RDD[LabeledPoint]): ClassificationModel}): Array[Double] = {
val res = arr.map{ case (train, test) =>
val model = classifier.run(train)
val pao = test.map{case LabeledPoint(label, features) =>
(model.predict(features), label)
}
val metric = new BinaryClassificationMetrics(pao)
metric.areaUnderROC()
}
res
}
def cvReg(arr: Array[(RDD[LabeledPoint], RDD[LabeledPoint])],
regressor: { def run(input: RDD[LabeledPoint]): RegressionModel}): Array[Double] = {
val res = arr.map{ case (train, test) =>
val model = regressor.run(train)
val pao = test.map{case LabeledPoint(label, features) =>
(model.predict(features), label)
}
val metric = new RegressionMetrics(pao)
metric.r2
}
res
}