-
Notifications
You must be signed in to change notification settings - Fork 313
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Getting Key Not Found Exception while Serializing to a mleap bundle #872
Comments
I suspect something is wrong with the op registry, though don't see anything obviously wrong with what you've put in the reference.conf. I usually use sbt and merge strategies to handle that. Not sure what the gradle equivalent would be. Is it just PipelineModel that is having issues or is it everything (and PipelineModel just gets hit first)? E.g., try just doing the first stage all by itself. If that fails too, then I suspect nothing is getting registered at all. If the first stage works by itself, then something is messed up with the PipelineOp registry. |
I am getting the best model from the Cross Validator before serializing to mleap bundle. I have updated the step above. As far as the reference.conf is concerned, it is under src/main/resources folder. I am not doing any steps in gradle to include the reference.conf in the fat jar. I think by default the gradle packages the reference.conf under the resources folder for the fat jar. I have verified this by unzipping the fat jar. I don't have application.properties file. I am using my own json file and parsing it in my code. Surprisingly, the unit test case passes in local i.e. I am able to serialize the pipeline model to a mleap bundle with the same above steps. problem occurs only when the spark job is submitted in the cluster thru SparkApplication. |
That is extra strange. Can you confirm that Spark's jvm and your local jvm have the same packages installed. |
The Reason for the error is the reference.conf being overwritten from various dependencies and I have used the ShadowJar gradle Plugin to append all the entries in reference.conf and now, the model is being serialized to a mleap bundle. Thanks so much for your help. Now, I would like to save the bundle directly to a minio bucket . The SparkContext is already having the properties like access key / secret key / minio host etc implicit val sbc: SparkBundleContext = SparkBundleContext().withDataset(transformedDf) implicit val sbc: SparkBundleContext = SparkBundleContext().withDataset(transformedDf) Pls let me know if it is possible to directly save the bundle to a minio bucket. |
you might need to add Not sure if direct writing to s3 will work or not. As a workaround, you can always write to the local filesystem and then use AWS sdk to copy it over. |
Hello Team,
When I am trying to serialize a Spark Pipeline Model to a Mleap bundle. I am getting the below exception
java.util.NoSuchElementException: key not found: org.apache.spark.ml.PipelineModel
at scala.collection.immutable.Map$Map2.apply(Map.scala:227)
at ml.combust.bundle.BundleRegistry.opForObj(BundleRegistry.scala:102)
at ml.combust.bundle.BundleWriter.$anonfun$save$1(BundleWriter.scala:28)
at scala.Option.getOrElse(Option.scala:189)
at ml.combust.bundle.BundleWriter.save(BundleWriter.scala:28)
at ml.combust.bundle.BundleWriter.$anonfun$save$3(BundleWriter.scala:41)
at resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
at scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252)
at scala.util.control.Exception$Catch.apply(Exception.scala:228)
at scala.util.control.Exception$Catch.either(Exception.scala:252)
at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88)
at resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26)
at resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26)
at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50)
at resource.DeferredExtractableManagedResource.$anonfun$tried$1(AbstractManagedResource.scala:33)
at scala.util.Try$.apply(Try.scala:213)
at resource.DeferredExtractableManagedResource.tried(AbstractManagedResource.scala:33)
at ml.combust.bundle.BundleWriter.save(BundleWriter.scala:40)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at scala.util.Success.flatMap(Try.scala:251)
at scala.util.Try$WithFilter.flatMap(Try.scala:142)
at scala.util.Success.flatMap(Try.scala:251)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
mleap version - 0.23.1 spark - 3.4.0 xgboost - 1.7.6
Follow is the code to serialize the Spark Pipeline Model to a Mleap Bundle.
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.ml.{PipelineModel, Transformer}
// MLeap/Bundle.ML Serialization Libraries
import org.apache.spark.ml.bundle.SparkBundleContext
import ml.combust.mleap.spark.SparkSupport._
import scala.util.{Failure, Success, Try}
import java.net.URI
def saveModelAsMleapBundle(bucketName: String, minioPath: String, bestModel : PipelineModel, transformedDf: DataFrame): Try[Unit] = {
Try {
implicit val sbc: SparkBundleContext = SparkBundleContext().withDataset(transformedDf)
val localPath = "/tmp/modelu.zip"
val bundleURI = new URI(s"jar:file:$localPath")
val result = bestModel .writeBundle.save(bundleURI)(sbc).get
}
}
Following is the code used to create the model
val labelEncoderPipelineStage = new StringIndexer().setInputCols(labelEncoderInputCols).setOutputCols(labelEncoderOutputCols).setHandleInvalid("keep")
val assemblerPipelineStage = new VectorAssembler().setInputCols(allFeatureColumns)
.setOutputCol("features").setHandleInvalid("keep")
def get_param(): mutable.HashMap[String, Any] = {
val params = new mutable.HashMapString, Any
params += "objective" -> "multi:softprob"
params += "num_class" -> 7
params += "tree_method" -> "auto"
params += "num_workers" -> 3,
params += "num_early_stopping_rounds" -> 3,
params += "maximize_evaluation_metrics" -> false,
params += "verbosity" -> 3,
param += "missing" -> 0.0,
params += "eta" -> 0.2
params += "seed" -> 50
return params
}
// Create an XGBoost Classifier
val xgb = new XGBoostClassifier(get_param().toMap)
.setFeaturesCol("features")
.setLabelCol()
val xgbParamGrid = (new ParamGridBuilder()
.addGrid(xgb.missing, Array(0.0))
.addGrid(xgb.maxDepth, Array(16))
.addGrid(xgb.eta, Array(0.2))
.addGrid(xgb.gamma, Array(0))
.addGrid(xgb.subSample, Array(0.6, 0.65, 0.7))
.addGrid(xgb.numRound, Array(0.8, 0.9))
.addGrid(xgb.colsampleBytree, Array(1.0))
.addGrid(xgb.minChildWeight, Array(1.0))
.build())
val labelConverterPipelineStage = new IndexToString().setInputCol(predictioncolumn)
.setOutputCol(prediction_indextostringcolumn).setLabels(labelsArray)
val pipeline = new Pipeline().setStages(Array(labelEncoderPipelineStage, assemblerPipelineStage, xgb,labelConverterPipelineStage ))
val evaluator = new MultilabelClassificationEvaluator()
.setLabelCol(label_column)
.setPredictionCol("prediction")
.setMetricName("accuracy")
// Create the Cross Validation pipeline, using XGBoost as the estimator, the
// Binary Classification evaluator, and xgbParamGrid for hyperparameters
val cv = (new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(xgbParamGrid)
.setNumFolds(3))
// Create the model by fitting the training data
val cvModel = cv.fit(trainDF)
val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]
model is successfully created but while doing the serializing to mleap bundle, getting the above issue
Following the reference.conf entry in my src/resources/reference.conf
ml.combust.mleap.spark.xgboost.ops = [
"ml.dmlc.xgboost4j.scala.spark.mleap.XGBoostClassificationModelOp",
"ml.dmlc.xgboost4j.scala.spark.mleap.XGBoostRegressionModelOp"
]
ml.combust.mleap.spark.registry.default.ops += "ml.combust.mleap.spark.xgboost.ops"
ml.combust.mleap.spark.registry.builtin-ops = [
"org.apache.spark.ml.bundle.ops.classification.DecisionTreeClassifierOp",
"org.apache.spark.ml.bundle.ops.classification.NaiveBayesClassifierOp",
"org.apache.spark.ml.bundle.ops.classification.MultiLayerPerceptronClassifierOp",
"org.apache.spark.ml.bundle.ops.classification.OneVsRestOp",
"org.apache.spark.ml.bundle.ops.classification.RandomForestClassifierOp",
"org.apache.spark.ml.bundle.ops.clustering.GaussianMixtureOp",
"org.apache.spark.ml.bundle.ops.clustering.KMeansOp",
"org.apache.spark.ml.bundle.ops.clustering.BisectingKMeansOp",
"org.apache.spark.ml.bundle.ops.clustering.LDAModelOp",
"org.apache.spark.ml.bundle.ops.feature.BinarizerOp",
"org.apache.spark.ml.bundle.ops.feature.BucketizerOp",
"org.apache.spark.ml.bundle.ops.feature.ChiSqSelectorOp",
"org.apache.spark.ml.bundle.ops.feature.CountVectorizerOp",
"org.apache.spark.ml.bundle.ops.feature.DCTOp",
"org.apache.spark.ml.bundle.ops.feature.ElementwiseProductOp",
"org.apache.spark.ml.bundle.ops.feature.HashingTermFrequencyOp",
"org.apache.spark.ml.bundle.ops.feature.IDFOp",
"org.apache.spark.ml.bundle.ops.feature.InteractionOp",
"org.apache.spark.ml.bundle.ops.feature.MaxAbsScalerOp",
"org.apache.spark.ml.bundle.ops.feature.MinMaxScalerOp",
"org.apache.spark.ml.bundle.ops.feature.NGramOp",
"org.apache.spark.ml.bundle.ops.feature.NormalizerOp",
"org.apache.spark.ml.bundle.ops.feature.PcaOp",
"org.apache.spark.ml.bundle.ops.feature.PolynomialExpansionOp",
"org.apache.spark.ml.bundle.ops.feature.ReverseStringIndexerOp",
"org.apache.spark.ml.bundle.ops.feature.StandardScalerOp",
"org.apache.spark.ml.bundle.ops.feature.StopWordsRemoverOp",
"org.apache.spark.ml.bundle.ops.feature.TokenizerOp",
"org.apache.spark.ml.bundle.ops.feature.VectorAssemblerOp",
"org.apache.spark.ml.bundle.ops.feature.VectorIndexerOp",
"org.apache.spark.ml.bundle.ops.feature.VectorSlicerOp",
"org.apache.spark.ml.bundle.ops.feature.WordToVectorOp",
"org.apache.spark.ml.bundle.ops.feature.RegexTokenizerOp",
"org.apache.spark.ml.bundle.ops.regression.AFTSurvivalRegressionOp",
"org.apache.spark.ml.bundle.ops.regression.DecisionTreeRegressionOp",
"org.apache.spark.ml.bundle.ops.regression.GBTRegressionOp",
"org.apache.spark.ml.bundle.ops.regression.GeneralizedLinearRegressionOp",
"org.apache.spark.ml.bundle.ops.regression.IsotonicRegressionOp",
"org.apache.spark.ml.bundle.ops.regression.LinearRegressionOp",
"org.apache.spark.ml.bundle.ops.regression.RandomForestRegressionOp",
"org.apache.spark.ml.bundle.ops.recommendation.ALSOp",
"org.apache.spark.ml.bundle.ops.tuning.CrossValidatorOp",
"org.apache.spark.ml.bundle.ops.tuning.TrainValidationSplitOp",
"org.apache.spark.ml.bundle.ops.feature.MinHashLSHOp",
"org.apache.spark.ml.bundle.ops.feature.BucketedRandomProjectionLSHOp",
"org.apache.spark.ml.bundle.ops.classification.GBTClassifierOp",
"org.apache.spark.ml.bundle.ops.classification.LogisticRegressionOp",
"org.apache.spark.ml.classification.bundle.ops.LinearSVCOp",
"org.apache.spark.ml.bundle.ops.feature.FeatureHasherOp",
"org.apache.spark.ml.bundle.ops.feature.StringIndexerOp",
"org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp",
"org.apache.spark.ml.bundle.ops.PipelineOp"
]
ml.combust.mleap.spark.registry.default.ops += "ml.combust.mleap.spark.registry.builtin-ops"
ml.combust.mleap.spark.extension.ops = [
"org.apache.spark.ml.bundle.extension.ops.classification.OneVsRestOp",
"org.apache.spark.ml.bundle.extension.ops.classification.SupportVectorMachineOp",
"org.apache.spark.ml.bundle.extension.ops.feature.ImputerOp",
"org.apache.spark.ml.bundle.extension.ops.feature.MathBinaryOp",
"org.apache.spark.ml.bundle.extension.ops.feature.MathUnaryOp",
"org.apache.spark.ml.bundle.extension.ops.feature.MultinomialLabelerOp",
"org.apache.spark.ml.bundle.extension.ops.feature.WordLengthFilterOp",
"org.apache.spark.ml.bundle.extension.ops.feature.StringMapOp"
]
ml.combust.mleap.spark.registry.default.ops += "ml.combust.mleap.spark.extension.ops"
ml.combust.mleap.registry.builtin-ops = [
"ml.combust.mleap.bundle.ops.classification.DecisionTreeClassifierOp",
"ml.combust.mleap.bundle.ops.classification.GBTClassifierOp",
"ml.combust.mleap.bundle.ops.classification.LogisticRegressionOp",
"ml.combust.mleap.bundle.ops.classification.NaiveBayesClassifierOp",
"ml.combust.mleap.bundle.ops.classification.MultiLayerPerceptronClassifierOp",
"ml.combust.mleap.bundle.ops.classification.OneVsRestOp",
"ml.combust.mleap.bundle.ops.classification.RandomForestClassifierOp",
"ml.combust.mleap.bundle.ops.classification.SupportVectorMachineOp",
"ml.combust.mleap.bundle.ops.classification.LinearSVCOp",
"ml.combust.mleap.bundle.ops.clustering.GaussianMixtureOp",
"ml.combust.mleap.bundle.ops.clustering.KMeansOp",
"ml.combust.mleap.bundle.ops.clustering.BisectingKMeansOp",
"ml.combust.mleap.bundle.ops.clustering.LDAModelOp",
"ml.combust.mleap.bundle.ops.feature.BinarizerOp",
"ml.combust.mleap.bundle.ops.sklearn.BinarizerOp",
"ml.combust.mleap.bundle.ops.feature.BucketedRandomProjectionLSHOp",
"ml.combust.mleap.bundle.ops.feature.BucketizerOp",
"ml.combust.mleap.bundle.ops.feature.ChiSqSelectorOp",
"ml.combust.mleap.bundle.ops.feature.CoalesceOp",
"ml.combust.mleap.bundle.ops.feature.CountVectorizerOp",
"ml.combust.mleap.bundle.ops.feature.DCTOp",
"ml.combust.mleap.bundle.ops.feature.ElementwiseProductOp",
"ml.combust.mleap.bundle.ops.feature.FeatureHasherOp",
"ml.combust.mleap.bundle.ops.feature.HashingTermFrequencyOp",
"ml.combust.mleap.bundle.ops.feature.IDFOp",
"ml.combust.mleap.bundle.ops.feature.ImputerOp",
"ml.combust.mleap.bundle.ops.feature.InteractionOp",
"ml.combust.mleap.bundle.ops.feature.MapEntrySelectorOp",
"ml.combust.mleap.bundle.ops.feature.MathBinaryOp",
"ml.combust.mleap.bundle.ops.feature.MathUnaryOp",
"ml.combust.mleap.bundle.ops.feature.MaxAbsScalerOp",
"ml.combust.mleap.bundle.ops.feature.MinHashLSHOp",
"ml.combust.mleap.bundle.ops.feature.MinMaxScalerOp",
"ml.combust.mleap.bundle.ops.feature.MultinomialLabelerOp",
"ml.combust.mleap.bundle.ops.feature.NGramOp",
"ml.combust.mleap.bundle.ops.feature.NormalizerOp",
"ml.combust.mleap.bundle.ops.feature.OneHotEncoderOp",
"ml.combust.mleap.bundle.ops.feature.PcaOp",
"ml.combust.mleap.bundle.ops.feature.PolynomialExpansionOp",
"ml.combust.mleap.bundle.ops.sklearn.PolynomialFeaturesOp",
"ml.combust.mleap.bundle.ops.feature.ReverseStringIndexerOp",
"ml.combust.mleap.bundle.ops.feature.StandardScalerOp",
"ml.combust.mleap.bundle.ops.feature.StopWordsRemoverOp",
"ml.combust.mleap.bundle.ops.feature.StringIndexerOp",
"ml.combust.mleap.bundle.ops.feature.StringMapOp",
"ml.combust.mleap.bundle.ops.feature.TokenizerOp",
"ml.combust.mleap.bundle.ops.feature.VectorAssemblerOp",
"ml.combust.mleap.bundle.ops.feature.VectorIndexerOp",
"ml.combust.mleap.bundle.ops.feature.VectorSlicerOp",
"ml.combust.mleap.bundle.ops.feature.WordToVectorOp",
"ml.combust.mleap.bundle.ops.feature.RegexTokenizerOp",
"ml.combust.mleap.bundle.ops.feature.RegexIndexerOp",
"ml.combust.mleap.bundle.ops.feature.WordLengthFilterOp",
"ml.combust.mleap.bundle.ops.regression.AFTSurvivalRegressionOp",
"ml.combust.mleap.bundle.ops.regression.DecisionTreeRegressionOp",
"ml.combust.mleap.bundle.ops.regression.GBTRegressionOp",
"ml.combust.mleap.bundle.ops.regression.GeneralizedLinearRegressionOp",
"ml.combust.mleap.bundle.ops.regression.IsotonicRegressionOp",
"ml.combust.mleap.bundle.ops.regression.LinearRegressionOp",
"ml.combust.mleap.bundle.ops.regression.RandomForestRegressionOp",
"ml.combust.mleap.bundle.ops.ensemble.CategoricalDrilldownOp",
"ml.combust.mleap.bundle.ops.recommendation.ALSOp",
"ml.combust.mleap.bundle.ops.PipelineOp"
]
ml.combust.mleap.registry.default.ops += "ml.combust.mleap.registry.builtin-ops"
ml.combust.mleap.xgboost.ops = [
"ml.combust.mleap.xgboost.runtime.bundle.ops.XGBoostClassificationOp",
"ml.combust.mleap.xgboost.runtime.bundle.ops.XGBoostRegressionOp"
]
ml.combust.mleap.registry.default.ops += "ml.combust.mleap.xgboost.ops"
I have added the below mleap dependencies in my gradle
implementation group: 'ml.combust.mleap', name : "mleap-xgboost-spark_${scalaVersion}", version: '0.23.1'
implementation group: 'ml.combust.mleap', name: "mleap-xgboost-runtime_${scalaVersion}", version: '0.23.1'
implementation group: 'ml.combust.mleap', name : "mleap-spark_${scalaVersion}", version: '0.23.1'
implementation group: 'ml.combust.mleap', name: "mleap-spark-extension_${scalaVersion}", version: '0.23.1'
implementation group: 'ml.combust.mleap', name: "mleap-runtime_${scalaVersion}", version: '0.23.1'
Please let me know if you I need to add any additional config to solve this error.
The text was updated successfully, but these errors were encountered: