spark Pipeline操作
< 返回列表时间: 2018-11-20来源:OSCHINA
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>> import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} import org.apache.spark.sql.Row // Prepare training data from a list of (id, text, label) tuples. val training = (spark.createDataFrame(Seq( (0L, "a b c d e spark", 1.0), (1L, "b d", 0.0), (2L, "spark f g h", 1.0), (3L, "hadoop mapreduce", 0.0), (4L, "b spark who", 1.0), (5L, "g d a y", 0.0), (6L, "spark fly", 1.0), (7L, "was mapreduce", 0.0), (8L, "e spark program", 1.0), (9L, "a e c l", 0.0), (10L, "spark compile", 1.0), (11L, "hadoop software", 0.0) )).toDF("id", "text", "label")) // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. val tokenizer = (new Tokenizer() .setInputCol("text") .setOutputCol("words")) val hashingTF = (new HashingTF() .setInputCol(tokenizer.getOutputCol) .setOutputCol("features")) val lr = (new LogisticRegression() .setMaxIter(10)) val pipeline = (new Pipeline() .setStages(Array(tokenizer, hashingTF, lr))) // We use a ParamGridBuilder to construct a grid of parameters to search over. // With 3 values for hashingTF.numFeatures and 2 values for lr.regParam, // this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from. val paramGrid = (new ParamGridBuilder() .addGrid(hashingTF.numFeatures, Array(10, 100, 1000)) .addGrid(lr.regParam, Array(0.1, 0.01)) .build()) // We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance. // This will allow us to jointly choose parameters for all Pipeline stages. // A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. // Note that the evaluator here is a BinaryClassificationEvaluator and its default metric // is areaUnderROC. val cv = (new CrossValidator() .setEstimator(pipeline) .setEvaluator(new BinaryClassificationEvaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(3)) // Run cross-validation, and choose the best set of parameters. val cvModel = cv.fit(training) // Prepare test documents, which are unlabeled (id, text) tuples. val test = spark.createDataFrame(Seq( (12L, "spark i j k"), (13L, "l m n"), (14L, "mapreduce spark"), (15L, "apache hadoop") )).toDF("id", "text") // Make predictions on test documents. cvModel uses the best model found (lrModel). (cvModel.transform(test) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => println(s"($id, $text) --> prob=$prob, prediction=$prediction") }) //output /* scala> (cvModel.transform(test) | .select("id", "text", "probability", "prediction") | .collect() | .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => | println(s"($id, $text) --> prob=$prob, prediction=$prediction") | }) (12, spark i j k) --> prob=[0.25806842225846466,0.7419315777415353], prediction=1.0 (13, l m n) --> prob=[0.9185597412653913,0.08144025873460858], prediction=0.0 (14, mapreduce spark) --> prob=[0.43203205663918753,0.5679679433608125], prediction=1.0 (15, apache hadoop) --> prob=[0.6766082856652199,0.32339171433478003], prediction=0.0 */
热门排行