predictionio-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From don...@apache.org
Subject incubator-predictionio git commit: [PIO-105] Batch Predictions
Date Tue, 01 Aug 2017 21:58:04 GMT
Repository: incubator-predictionio
Updated Branches:
  refs/heads/develop 965c73f0f -> cfa3f5dab


[PIO-105] Batch Predictions

Implement a new pio batchpredict command to enable massive, fast,
batch predictions from a trained model. Read a multi-object JSON file as
the input format, with one query object per line. Similarly, write
results to a multi-object JSON file, with one prediction result + its
original query per line.

Closes #412


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/cfa3f5da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/cfa3f5da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/cfa3f5da

Branch: refs/heads/develop
Commit: cfa3f5dab533a67688ec2f84182eccedc56fa84e
Parents: 965c73f
Author: Mars Hall <mars@heroku.com>
Authored: Tue Aug 1 14:56:47 2017 -0700
Committer: Donald Szeto <donald@apache.org>
Committed: Tue Aug 1 14:56:47 2017 -0700

----------------------------------------------------------------------
 .../predictionio/workflow/BatchPredict.scala    | 230 +++++++++++++++++++
 .../predictionio/tools/RunBatchPredict.scala    |  72 ++++++
 .../predictionio/tools/commands/Engine.scala    |  55 ++++-
 .../predictionio/tools/console/Console.scala    |  58 ++++-
 .../apache/predictionio/tools/console/Pio.scala |  13 +-
 .../tools/console/batchpredict.scala.txt        |  25 ++
 .../predictionio/tools/console/main.scala.txt   |   1 +
 7 files changed, 450 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala
new file mode 100644
index 0000000..2fb0545
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.workflow
+
+import java.io.Serializable
+
+import com.twitter.bijection.Injection
+import com.twitter.chill.{KryoBase, KryoInjection, ScalaKryoInstantiator}
+import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer
+import grizzled.slf4j.Logging
+import org.apache.predictionio.controller.{Engine, Utils}
+import org.apache.predictionio.core.{BaseAlgorithm, BaseServing, Doer}
+import org.apache.predictionio.data.storage.{EngineInstance, Storage}
+import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
+import org.apache.spark.rdd.RDD
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import scala.language.existentials
+
+case class BatchPredictConfig(
+  inputFilePath: String = "batchpredict-input.json",
+  outputFilePath: String = "batchpredict-output.json",
+  queryPartitions: Option[Int] = None,
+  engineInstanceId: String = "",
+  engineId: Option[String] = None,
+  engineVersion: Option[String] = None,
+  engineVariant: String = "",
+  env: Option[String] = None,
+  verbose: Boolean = false,
+  debug: Boolean = false,
+  jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both)
+
+object BatchPredict extends Logging {
+
+  class KryoInstantiator(classLoader: ClassLoader) extends ScalaKryoInstantiator {
+    override def newKryo(): KryoBase = {
+      val kryo = super.newKryo()
+      kryo.setClassLoader(classLoader)
+      SynchronizedCollectionsSerializer.registerSerializers(kryo)
+      kryo
+    }
+  }
+
+  object KryoInstantiator extends Serializable {
+    def newKryoInjection : Injection[Any, Array[Byte]] = {
+      val kryoInstantiator = new KryoInstantiator(getClass.getClassLoader)
+      KryoInjection.instance(kryoInstantiator)
+    }
+  }
+
+  val engineInstances = Storage.getMetaDataEngineInstances
+  val modeldata = Storage.getModelDataModels
+
+  def main(args: Array[String]): Unit = {
+    val parser = new scopt.OptionParser[BatchPredictConfig]("BatchPredict") {
+      opt[String]("input") action { (x, c) =>
+        c.copy(inputFilePath = x)
+      } text("Path to file containing input queries; a " +
+        "multi-object JSON file with one object per line.")
+      opt[String]("output") action { (x, c) =>
+        c.copy(outputFilePath = x)
+      } text("Path to file containing output predictions; a " +
+        "multi-object JSON file with one object per line.")
+      opt[Int]("query-partitions") action { (x, c) =>
+        c.copy(queryPartitions = Some(x))
+      } text("Limit concurrency of predictions by setting the number " +
+        "of partitions used internally for the RDD of queries.")
+      opt[String]("engineId") action { (x, c) =>
+        c.copy(engineId = Some(x))
+      } text("Engine ID.")
+      opt[String]("engineId") action { (x, c) =>
+        c.copy(engineId = Some(x))
+      } text("Engine ID.")
+      opt[String]("engineVersion") action { (x, c) =>
+        c.copy(engineVersion = Some(x))
+      } text("Engine version.")
+      opt[String]("engine-variant") required() action { (x, c) =>
+        c.copy(engineVariant = x)
+      } text("Engine variant JSON.")
+      opt[String]("env") action { (x, c) =>
+        c.copy(env = Some(x))
+      } text("Comma-separated list of environmental variables (in 'FOO=BAR' " +
+        "format) to pass to the Spark execution environment.")
+      opt[String]("engineInstanceId") required() action { (x, c) =>
+        c.copy(engineInstanceId = x)
+      } text("Engine instance ID.")
+      opt[Unit]("verbose") action { (x, c) =>
+        c.copy(verbose = true)
+      } text("Enable verbose output.")
+      opt[Unit]("debug") action { (x, c) =>
+        c.copy(debug = true)
+      } text("Enable debug output.")
+      opt[String]("json-extractor") action { (x, c) =>
+        c.copy(jsonExtractor = JsonExtractorOption.withName(x))
+      }
+    }
+
+    parser.parse(args, BatchPredictConfig()) map { config =>
+      WorkflowUtils.modifyLogging(config.verbose)
+      engineInstances.get(config.engineInstanceId) map { engineInstance =>
+
+        val engine = getEngine(engineInstance)
+
+        run(config, engineInstance, engine)
+
+      } getOrElse {
+        error(s"Invalid engine instance ID. Aborting batch predict.")
+      }
+    }
+  }
+
+  def getEngine(engineInstance: EngineInstance): Engine[_, _, _, _, _, _] = {
+
+    val engineFactoryName = engineInstance.engineFactory
+
+    val (engineLanguage, engineFactory) =
+      WorkflowUtils.getEngine(engineFactoryName, getClass.getClassLoader)
+    val maybeEngine = engineFactory()
+
+    // EngineFactory return a base engine, which may not be deployable.
+    if (!maybeEngine.isInstanceOf[Engine[_,_,_,_,_,_]]) {
+      throw new NoSuchMethodException(
+        s"Engine $maybeEngine cannot be used for batch predict")
+    }
+
+    maybeEngine.asInstanceOf[Engine[_,_,_,_,_,_]]
+  }
+
+  def run[Q, P](
+    config: BatchPredictConfig,
+    engineInstance: EngineInstance,
+    engine: Engine[_, _, _, Q, P, _]): Unit = {
+
+    val engineParams = engine.engineInstanceToEngineParams(
+      engineInstance, config.jsonExtractor)
+
+    val kryo = KryoInstantiator.newKryoInjection
+
+    val modelsFromEngineInstance =
+      kryo.invert(modeldata.get(engineInstance.id).get.models).get.
+      asInstanceOf[Seq[Any]]
+
+    val prepareSparkContext = WorkflowContext(
+      batch = engineInstance.engineFactory,
+      executorEnv = engineInstance.env,
+      mode = "Batch Predict (model)",
+      sparkEnv = engineInstance.sparkConf)
+
+    val models = engine.prepareDeploy(
+      prepareSparkContext,
+      engineParams,
+      engineInstance.id,
+      modelsFromEngineInstance,
+      params = WorkflowParams()
+    )
+
+    val algorithms = engineParams.algorithmParamsList.map { case (n, p) =>
+      Doer(engine.algorithmClassMap(n), p)
+    }
+
+    val servingParamsWithName = engineParams.servingParams
+
+    val serving = Doer(engine.servingClassMap(servingParamsWithName._1),
+      servingParamsWithName._2)
+
+    val runSparkContext = WorkflowContext(
+      batch = engineInstance.engineFactory,
+      executorEnv = engineInstance.env,
+      mode = "Batch Predict (runner)",
+      sparkEnv = engineInstance.sparkConf)
+
+    val inputRDD: RDD[String] = runSparkContext.
+      textFile(config.inputFilePath).
+      filter(_.trim.nonEmpty)
+    val queriesRDD: RDD[String] = config.queryPartitions match {
+      case Some(p) => inputRDD.repartition(p)
+      case None => inputRDD
+    }
+
+    val predictionsRDD: RDD[String] = queriesRDD.map { queryString =>
+      val jsonExtractorOption = config.jsonExtractor
+      // Extract Query from Json
+      val query = JsonExtractor.extract(
+        jsonExtractorOption,
+        queryString,
+        algorithms.head.queryClass,
+        algorithms.head.querySerializer,
+        algorithms.head.gsonTypeAdapterFactories
+      )
+      // Deploy logic. First call Serving.supplement, then Algo.predict,
+      // finally Serving.serve.
+      val supplementedQuery = serving.supplementBase(query)
+      // TODO: Parallelize the following.
+      val predictions = algorithms.zipWithIndex.map { case (a, ai) =>
+        a.predictBase(models(ai), supplementedQuery)
+      }
+      // Notice that it is by design to call Serving.serve with the
+      // *original* query.
+      val prediction = serving.serveBase(query, predictions)
+      // Combine query with prediction, so the batch results are
+      // self-descriptive.
+      val predictionJValue = JsonExtractor.toJValue(
+        jsonExtractorOption,
+        Map("query" -> query,
+            "prediction" -> prediction),
+        algorithms.head.querySerializer,
+        algorithms.head.gsonTypeAdapterFactories)
+      // Return JSON string
+      compact(render(predictionJValue))
+    }
+
+    predictionsRDD.saveAsTextFile(config.outputFilePath)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala
new file mode 100644
index 0000000..35572c9
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.tools
+
+import org.apache.predictionio.tools.Common._
+import org.apache.predictionio.tools.ReturnTypes._
+import org.apache.predictionio.workflow.JsonExtractorOption
+import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
+
+import java.io.File
+import grizzled.slf4j.Logging
+
+import scala.sys.process._
+
+case class BatchPredictArgs(
+  inputFilePath: String = "batchpredict-input.json",
+  outputFilePath: String = "batchpredict-output.json",
+  queryPartitions: Option[Int] = None,
+  variantJson: Option[File] = None,
+  jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both)
+
+
+object RunBatchPredict extends Logging {
+
+  def runBatchPredict(
+    engineInstanceId: String,
+    batchPredictArgs: BatchPredictArgs,
+    sparkArgs: SparkArgs,
+    pioHome: String,
+    engineDirPath: String,
+    verbose: Boolean = false): Expected[(Process, () => Unit)] = {
+
+    val jarFiles = jarFilesForScala(engineDirPath).map(_.toURI) ++
+      Option(new File(pioHome, "plugins").listFiles())
+        .getOrElse(Array.empty[File]).map(_.toURI)
+    val args = Seq[String](
+      "--input",
+      batchPredictArgs.inputFilePath,
+      "--output",
+      batchPredictArgs.outputFilePath,
+      "--engineInstanceId",
+      engineInstanceId,
+      "--engine-variant",
+      batchPredictArgs.variantJson.getOrElse(
+        new File(engineDirPath, "engine.json")).getCanonicalPath) ++
+      (if (batchPredictArgs.queryPartitions.isEmpty) Seq()
+        else Seq("--query-partitions",
+                  batchPredictArgs.queryPartitions.get.toString)) ++
+      (if (verbose) Seq("--verbose") else Seq()) ++
+      Seq("--json-extractor", batchPredictArgs.jsonExtractor.toString)
+
+    Runner.runOnSpark(
+      "org.apache.predictionio.workflow.BatchPredict",
+      args, sparkArgs, jarFiles, pioHome, verbose)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala
index e49c3fc..e3460a5 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala
@@ -21,8 +21,9 @@ import org.apache.predictionio.core.BuildInfo
 import org.apache.predictionio.controller.Utils
 import org.apache.predictionio.data.storage
 import org.apache.predictionio.tools.EitherLogging
-import org.apache.predictionio.tools.{RunWorkflow, RunServer}
-import org.apache.predictionio.tools.{DeployArgs, WorkflowArgs, SparkArgs, ServerArgs}
+import org.apache.predictionio.tools.{RunWorkflow, RunServer, RunBatchPredict}
+import org.apache.predictionio.tools.{
+  DeployArgs, WorkflowArgs, SparkArgs, ServerArgs, BatchPredictArgs}
 import org.apache.predictionio.tools.console.Console
 import org.apache.predictionio.tools.Common._
 import org.apache.predictionio.tools.ReturnTypes._
@@ -262,6 +263,56 @@ object Engine extends EitherLogging {
     }
   }
 
+  /** Batch predict with an engine.
+    *
+    * @param ea An instance of [[EngineArgs]]
+    * @param engineInstanceId An instance of [[engineInstanceId]]
+    * @param batchPredictArgs An instance of [[BatchPredictArgs]]
+    * @param sparkArgs An instance of [[SparkArgs]]
+    * @param pioHome [[String]] with a path to PIO installation
+    * @param verbose A [[Boolean]]
+    * @return An instance of [[Expected]] contaning either [[Left]]
+    *         with an error message or [[Right]] with a handle to process
+    *         of a running angine  and a function () => Unit,
+    *         that must be called when the process is complete
+    */
+  def batchPredict(
+    ea: EngineArgs,
+    engineInstanceId: Option[String],
+    batchPredictArgs: BatchPredictArgs,
+    sparkArgs: SparkArgs,
+    pioHome: String,
+    verbose: Boolean = false): Expected[(Process, () => Unit)] = {
+
+    val engineDirPath = getEngineDirPath(ea.engineDir)
+    val verifyResult = Template.verifyTemplateMinVersion(
+      new File(engineDirPath, "template.json"))
+    if (verifyResult.isLeft) {
+      return Left(verifyResult.left.get)
+    }
+    val ei = Console.getEngineInfo(
+      batchPredictArgs.variantJson.getOrElse(new File(engineDirPath, "engine.json")),
+      engineDirPath)
+    val engineInstances = storage.Storage.getMetaDataEngineInstances
+    val engineInstance = engineInstanceId map { eid =>
+      engineInstances.get(eid)
+    } getOrElse {
+      engineInstances.getLatestCompleted(
+        ei.engineId, ei.engineVersion, ei.variantId)
+    }
+    engineInstance map { r =>
+      RunBatchPredict.runBatchPredict(
+        r.id, batchPredictArgs, sparkArgs, pioHome, engineDirPath, verbose)
+    } getOrElse {
+      engineInstanceId map { eid =>
+        logAndFail(s"Invalid engine instance ID ${eid}. Aborting.")
+      } getOrElse {
+        logAndFail(s"No valid engine instance found for engine ${ei.engineId} " +
+          s"${ei.engineVersion}.\nTry running 'train' before 'batchpredict'. Aborting.")
+      }
+    }
+  }
+
   /** Running a driver on spark.
     *  The function starts a process and returns immediately
     *

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
index 535905a..4a72635 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
@@ -27,7 +27,8 @@ import org.apache.predictionio.tools.commands.{
   DashboardArgs, AdminServerArgs, ImportArgs, ExportArgs,
   BuildArgs, EngineArgs}
 import org.apache.predictionio.tools.{
-  EventServerArgs, SparkArgs, WorkflowArgs, ServerArgs, DeployArgs}
+  EventServerArgs, SparkArgs, WorkflowArgs, ServerArgs,
+  DeployArgs, BatchPredictArgs}
 import org.apache.predictionio.workflow.{JsonExtractorOption, WorkflowUtils}
 import org.json4s._
 import org.json4s.native.JsonMethods._
@@ -42,6 +43,7 @@ case class ConsoleArgs(
   workflow: WorkflowArgs = WorkflowArgs(),
   accessKey: AccessKeyArgs = AccessKeyArgs(),
   deploy: DeployArgs = DeployArgs(),
+  batchPredict: BatchPredictArgs = BatchPredictArgs(),
   eventServer: EventServerArgs = EventServerArgs(),
   adminServer: AdminServerArgs = AdminServerArgs(),
   dashboard: DashboardArgs = DashboardArgs(),
@@ -323,6 +325,46 @@ object Console extends Logging {
           } text("Port to unbind from. Default: 8000")
         )
       note("")
+      cmd("batchpredict").
+        text("Use an engine instance to process batch predictions. This\n" +
+              "command will pass all pass-through arguments to its underlying\n" +
+              "spark-submit command. All algorithm classes used in the engine\n" +
+              "must be serializable.").
+        action { (_, c) =>
+          c.copy(commands = c.commands :+ "batchpredict")
+        } children(
+          opt[String]("input") action { (x, c) =>
+            c.copy(batchPredict = c.batchPredict.copy(inputFilePath = x))
+          } text("Path to file containing queries; a multi-object JSON file\n" +
+                  "with one query object per line. Accepts any valid Hadoop\n" +
+                  "file URL. Default: batchpredict-input.json"),
+          opt[String]("output") action { (x, c) =>
+            c.copy(batchPredict = c.batchPredict.copy(outputFilePath = x))
+          } text("Path to file to receive results; a multi-object JSON file\n" +
+                  "with one object per line, the prediction + original query.\n" +
+                  "Accepts any valid Hadoop file URL. Actual output will be\n" +
+                  "written as Hadoop partition files in a directory with the\n" +
+                  "output name. Default: batchpredict-output.json"),
+          opt[Int]("query-partitions") action { (x, c) =>
+            c.copy(batchPredict = c.batchPredict.copy(queryPartitions = Some(x)))
+          } text("Limit concurrency of predictions by setting the number\n" +
+                  "of partitions used internally for the RDD of queries.\n" +
+                  "Default: number created by Spark context's `textFile`"),
+          opt[String]("engine-instance-id") action { (x, c) =>
+            c.copy(engineInstanceId = Some(x))
+          } text("Engine instance ID."),
+          opt[String]("json-extractor") action { (x, c) =>
+            c.copy(workflow = c.workflow.copy(jsonExtractor = JsonExtractorOption.withName(x)))
+          } validate { x =>
+            if (JsonExtractorOption.values.map(_.toString).contains(x)) {
+              success
+            } else {
+              val validOptions = JsonExtractorOption.values.mkString("|")
+              failure(s"$x is not a valid json-extractor option [$validOptions]")
+            }
+          }
+        )
+      note("")
       cmd("dashboard").
         text("Launch a dashboard at the specific IP and port.").
         action { (_, c) =>
@@ -644,6 +686,19 @@ object Console extends Logging {
             ca.verbose)
         case Seq("undeploy") =>
           Pio.undeploy(ca.deploy)
+        case Seq("batchpredict") =>
+          Pio.batchPredict(
+            ca.engine,
+            ca.engineInstanceId,
+            BatchPredictArgs(
+              ca.batchPredict.inputFilePath,
+              ca.batchPredict.outputFilePath,
+              ca.batchPredict.queryPartitions,
+              ca.workflow.variantJson,
+              ca.workflow.jsonExtractor),
+            ca.spark,
+            ca.pioHome.get,
+            ca.verbose)
         case Seq("dashboard") =>
           Pio.dashboard(ca.dashboard)
         case Seq("eventserver") =>
@@ -756,6 +811,7 @@ object Console extends Logging {
     "build" -> txt.build().toString,
     "train" -> txt.train().toString,
     "deploy" -> txt.deploy().toString,
+    "batchpredict" -> txt.batchpredict().toString,
     "eventserver" -> txt.eventserver().toString,
     "adminserver" -> txt.adminserver().toString,
     "app" -> txt.app().toString,

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala
index dd78717..ef4581b 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala
@@ -18,7 +18,8 @@
 package org.apache.predictionio.tools.console
 
 import org.apache.predictionio.tools.{
-  EventServerArgs, SparkArgs, WorkflowArgs, ServerArgs, DeployArgs}
+  EventServerArgs, SparkArgs, WorkflowArgs, ServerArgs,
+  DeployArgs, BatchPredictArgs}
 import org.apache.predictionio.tools.commands.{
   DashboardArgs, AdminServerArgs, ImportArgs, ExportArgs,
   BuildArgs, EngineArgs, Management, Engine, Import, Export,
@@ -104,6 +105,16 @@ object Pio extends Logging {
 
   def undeploy(da: DeployArgs): Int = Engine.undeploy(da)
 
+  def batchPredict(
+    ea: EngineArgs,
+    engineInstanceId: Option[String],
+    batchPredictArgs: BatchPredictArgs,
+    sparkArgs: SparkArgs,
+    pioHome: String,
+    verbose: Boolean = false): Int =
+      processAwaitAndClean(Engine.batchPredict(
+        ea, engineInstanceId, batchPredictArgs, sparkArgs, pioHome, verbose))
+
   def dashboard(da: DashboardArgs): Int = {
     Management.dashboard(da).awaitTermination
     0

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/tools/src/main/twirl/org/apache/predictionio/tools/console/batchpredict.scala.txt
----------------------------------------------------------------------
diff --git a/tools/src/main/twirl/org/apache/predictionio/tools/console/batchpredict.scala.txt
b/tools/src/main/twirl/org/apache/predictionio/tools/console/batchpredict.scala.txt
new file mode 100644
index 0000000..d9d5d74
--- /dev/null
+++ b/tools/src/main/twirl/org/apache/predictionio/tools/console/batchpredict.scala.txt
@@ -0,0 +1,25 @@
+Usage: pio batchpredict [--input <value>]
+                        [--output <value>]
+                        [--query-partitions <value>]
+                        [--engine-instance-id <value>]
+
+Use an engine instance to process batch predictions. This command will pass all
+pass-through arguments to its underlying spark-submit command. All algorithm
+classes used in the engine must be serializable.
+
+  --input <value>
+      Path to file containing queries; a multi-object JSON file with one
+      query object per line. Accepts any valid Hadoop file URL.
+      Default: batchpredict-input.json
+  --output <value>
+      Path to file to receive results; a multi-object JSON file with one
+      object per line, the prediction + original query. Accepts any
+      valid Hadoop file URL. Actual output will be written as Hadoop
+      partition files in a directory with the output name.
+      Default: batchpredict-output.json
+  --query-partitions <value>
+      Limit concurrency of predictions by setting the number of partitions
+      used internally for the RDD of queries.
+      Default: number created by Spark context's `textFile`
+  --engine-instance-id <value>
+      Engine instance ID. Default: the latest trained instance.

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt
----------------------------------------------------------------------
diff --git a/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt b/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt
index 5efa4bf..01be96d 100644
--- a/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt
+++ b/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt
@@ -38,6 +38,7 @@ The most commonly used pio commands are:
     build         Build an engine at the current directory
     train         Kick off a training using an engine
     deploy        Deploy an engine as an engine server
+    batchpredict  Process bulk predictions with an engine
     eventserver   Launch an Event Server
     app           Manage apps that are used by the Event Server
     accesskey     Manage app access keys


Mime
View raw message