predictionio-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From don...@apache.org
Subject [23/34] incubator-predictionio git commit: rename all except examples
Date Mon, 18 Jul 2016 20:17:54 GMT
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/CreateWorkflow.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/workflow/CreateWorkflow.scala b/core/src/main/scala/io/prediction/workflow/CreateWorkflow.scala
deleted file mode 100644
index af5aa14..0000000
--- a/core/src/main/scala/io/prediction/workflow/CreateWorkflow.scala
+++ /dev/null
@@ -1,274 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.workflow
-
-import java.net.URI
-
-import com.github.nscala_time.time.Imports._
-import com.google.common.io.ByteStreams
-import grizzled.slf4j.Logging
-import io.prediction.controller.Engine
-import io.prediction.core.BaseEngine
-import io.prediction.data.storage.EngineInstance
-import io.prediction.data.storage.EvaluationInstance
-import io.prediction.data.storage.Storage
-import io.prediction.workflow.JsonExtractorOption.JsonExtractorOption
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
-import org.json4s.JValue
-import org.json4s.JString
-import org.json4s.native.JsonMethods.parse
-
-import scala.language.existentials
-
-object CreateWorkflow extends Logging {
-
-  case class WorkflowConfig(
-    deployMode: String = "",
-    batch: String = "",
-    engineId: String = "",
-    engineVersion: String = "",
-    engineVariant: String = "",
-    engineFactory: String = "",
-    engineParamsKey: String = "",
-    evaluationClass: Option[String] = None,
-    engineParamsGeneratorClass: Option[String] = None,
-    env: Option[String] = None,
-    skipSanityCheck: Boolean = false,
-    stopAfterRead: Boolean = false,
-    stopAfterPrepare: Boolean = false,
-    verbosity: Int = 0,
-    verbose: Boolean = false,
-    debug: Boolean = false,
-    logFile: Option[String] = None,
-    jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both)
-
-  case class AlgorithmParams(name: String, params: JValue)
-
-  private def stringFromFile(filePath: String): String = {
-    try {
-      val uri = new URI(filePath)
-      val fs = FileSystem.get(uri, new Configuration())
-      new String(ByteStreams.toByteArray(fs.open(new Path(uri))).map(_.toChar))
-    } catch {
-      case e: java.io.IOException =>
-        error(s"Error reading from file: ${e.getMessage}. Aborting workflow.")
-        sys.exit(1)
-    }
-  }
-
-  val parser = new scopt.OptionParser[WorkflowConfig]("CreateWorkflow") {
-    override def errorOnUnknownArgument: Boolean = false
-    opt[String]("batch") action { (x, c) =>
-      c.copy(batch = x)
-    } text("Batch label of the workflow run.")
-    opt[String]("engine-id") required() action { (x, c) =>
-      c.copy(engineId = x)
-    } text("Engine's ID.")
-    opt[String]("engine-version") required() action { (x, c) =>
-      c.copy(engineVersion = x)
-    } text("Engine's version.")
-    opt[String]("engine-variant") required() action { (x, c) =>
-      c.copy(engineVariant = x)
-    } text("Engine variant JSON.")
-    opt[String]("evaluation-class") action { (x, c) =>
-      c.copy(evaluationClass = Some(x))
-    } text("Class name of the run's evaluator.")
-    opt[String]("engine-params-generator-class") action { (x, c) =>
-      c.copy(engineParamsGeneratorClass = Some(x))
-    } text("Path to evaluator parameters")
-    opt[String]("env") action { (x, c) =>
-      c.copy(env = Some(x))
-    } text("Comma-separated list of environmental variables (in 'FOO=BAR' " +
-      "format) to pass to the Spark execution environment.")
-    opt[Unit]("verbose") action { (x, c) =>
-      c.copy(verbose = true)
-    } text("Enable verbose output.")
-    opt[Unit]("debug") action { (x, c) =>
-      c.copy(debug = true)
-    } text("Enable debug output.")
-    opt[Unit]("skip-sanity-check") action { (x, c) =>
-      c.copy(skipSanityCheck = true)
-    }
-    opt[Unit]("stop-after-read") action { (x, c) =>
-      c.copy(stopAfterRead = true)
-    }
-    opt[Unit]("stop-after-prepare") action { (x, c) =>
-      c.copy(stopAfterPrepare = true)
-    }
-    opt[String]("deploy-mode") action { (x, c) =>
-      c.copy(deployMode = x)
-    }
-    opt[Int]("verbosity") action { (x, c) =>
-      c.copy(verbosity = x)
-    }
-    opt[String]("engine-factory") action { (x, c) =>
-      c.copy(engineFactory = x)
-    }
-    opt[String]("engine-params-key") action { (x, c) =>
-      c.copy(engineParamsKey = x)
-    }
-    opt[String]("log-file") action { (x, c) =>
-      c.copy(logFile = Some(x))
-    }
-    opt[String]("json-extractor") action { (x, c) =>
-      c.copy(jsonExtractor = JsonExtractorOption.withName(x))
-    }
-  }
-
-  def main(args: Array[String]): Unit = {
-    val wfcOpt = parser.parse(args, WorkflowConfig())
-    if (wfcOpt.isEmpty) {
-      logger.error("WorkflowConfig is empty. Quitting")
-      return
-    }
-
-    val wfc = wfcOpt.get
-
-    WorkflowUtils.modifyLogging(wfc.verbose)
-
-    val evaluation = wfc.evaluationClass.map { ec =>
-      try {
-        WorkflowUtils.getEvaluation(ec, getClass.getClassLoader)._2
-      } catch {
-        case e @ (_: ClassNotFoundException | _: NoSuchMethodException) =>
-          error(s"Unable to obtain evaluation $ec. Aborting workflow.", e)
-          sys.exit(1)
-      }
-    }
-
-    val engineParamsGenerator = wfc.engineParamsGeneratorClass.map { epg =>
-      try {
-        WorkflowUtils.getEngineParamsGenerator(epg, getClass.getClassLoader)._2
-      } catch {
-        case e @ (_: ClassNotFoundException | _: NoSuchMethodException) =>
-          error(s"Unable to obtain engine parameters generator $epg. " +
-            "Aborting workflow.", e)
-          sys.exit(1)
-      }
-    }
-
-    val pioEnvVars = wfc.env.map(e =>
-      e.split(',').flatMap(p =>
-        p.split('=') match {
-          case Array(k, v) => List(k -> v)
-          case _ => Nil
-        }
-      ).toMap
-    ).getOrElse(Map())
-
-    if (evaluation.isEmpty) {
-      val variantJson = parse(stringFromFile(wfc.engineVariant))
-      val engineFactory = if (wfc.engineFactory == "") {
-        variantJson \ "engineFactory" match {
-          case JString(s) => s
-          case _ =>
-            error("Unable to read engine factory class name from " +
-              s"${wfc.engineVariant}. Aborting.")
-            sys.exit(1)
-        }
-      } else wfc.engineFactory
-      val variantId = variantJson \ "id" match {
-        case JString(s) => s
-        case _ =>
-          error("Unable to read engine variant ID from " +
-            s"${wfc.engineVariant}. Aborting.")
-          sys.exit(1)
-      }
-      val (engineLanguage, engineFactoryObj) = try {
-        WorkflowUtils.getEngine(engineFactory, getClass.getClassLoader)
-      } catch {
-        case e @ (_: ClassNotFoundException | _: NoSuchMethodException) =>
-          error(s"Unable to obtain engine: ${e.getMessage}. Aborting workflow.")
-          sys.exit(1)
-      }
-
-      val engine: BaseEngine[_, _, _, _] = engineFactoryObj()
-
-      val customSparkConf = WorkflowUtils.extractSparkConf(variantJson)
-      val workflowParams = WorkflowParams(
-        verbose = wfc.verbosity,
-        skipSanityCheck = wfc.skipSanityCheck,
-        stopAfterRead = wfc.stopAfterRead,
-        stopAfterPrepare = wfc.stopAfterPrepare,
-        sparkEnv = WorkflowParams().sparkEnv ++ customSparkConf)
-
-      // Evaluator Not Specified. Do training.
-      if (!engine.isInstanceOf[Engine[_,_,_,_,_,_]]) {
-        throw new NoSuchMethodException(s"Engine $engine is not trainable")
-      }
-
-      val trainableEngine = engine.asInstanceOf[Engine[_, _, _, _, _, _]]
-
-      val engineParams = if (wfc.engineParamsKey == "") {
-        trainableEngine.jValueToEngineParams(variantJson, wfc.jsonExtractor)
-      } else {
-        engineFactoryObj.engineParams(wfc.engineParamsKey)
-      }
-
-      val engineInstance = EngineInstance(
-        id = "",
-        status = "INIT",
-        startTime = DateTime.now,
-        endTime = DateTime.now,
-        engineId = wfc.engineId,
-        engineVersion = wfc.engineVersion,
-        engineVariant = variantId,
-        engineFactory = engineFactory,
-        batch = wfc.batch,
-        env = pioEnvVars,
-        sparkConf = workflowParams.sparkEnv,
-        dataSourceParams =
-          JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams),
-        preparatorParams =
-          JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams),
-        algorithmsParams =
-          JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList),
-        servingParams =
-          JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams))
-
-      val engineInstanceId = Storage.getMetaDataEngineInstances.insert(
-        engineInstance)
-
-      CoreWorkflow.runTrain(
-        env = pioEnvVars,
-        params = workflowParams,
-        engine = trainableEngine,
-        engineParams = engineParams,
-        engineInstance = engineInstance.copy(id = engineInstanceId))
-    } else {
-      val workflowParams = WorkflowParams(
-        verbose = wfc.verbosity,
-        skipSanityCheck = wfc.skipSanityCheck,
-        stopAfterRead = wfc.stopAfterRead,
-        stopAfterPrepare = wfc.stopAfterPrepare,
-        sparkEnv = WorkflowParams().sparkEnv)
-      val evaluationInstance = EvaluationInstance(
-        evaluationClass = wfc.evaluationClass.get,
-        engineParamsGeneratorClass = wfc.engineParamsGeneratorClass.get,
-        batch = wfc.batch,
-        env = pioEnvVars,
-        sparkConf = workflowParams.sparkEnv
-      )
-      Workflow.runEvaluation(
-        evaluation = evaluation.get,
-        engineParamsGenerator = engineParamsGenerator.get,
-        evaluationInstance = evaluationInstance,
-        params = workflowParams)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/EngineServerPlugin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/workflow/EngineServerPlugin.scala b/core/src/main/scala/io/prediction/workflow/EngineServerPlugin.scala
deleted file mode 100644
index 5b2649c..0000000
--- a/core/src/main/scala/io/prediction/workflow/EngineServerPlugin.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.workflow
-
-import io.prediction.data.storage.EngineInstance
-import org.json4s._
-
-trait EngineServerPlugin {
-  val pluginName: String
-  val pluginDescription: String
-  val pluginType: String
-
-  def start(context: EngineServerPluginContext): Unit
-
-  def process(
-    engineInstance: EngineInstance,
-    query: JValue,
-    prediction: JValue,
-    context: EngineServerPluginContext): JValue
-
-  def handleREST(arguments: Seq[String]): String
-}
-
-object EngineServerPlugin {
-  val outputBlocker = "outputblocker"
-  val outputSniffer = "outputsniffer"
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/EngineServerPluginContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/workflow/EngineServerPluginContext.scala b/core/src/main/scala/io/prediction/workflow/EngineServerPluginContext.scala
deleted file mode 100644
index eb04c6f..0000000
--- a/core/src/main/scala/io/prediction/workflow/EngineServerPluginContext.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.workflow
-
-import java.net.URI
-import java.util.ServiceLoader
-
-import akka.event.LoggingAdapter
-import com.google.common.io.ByteStreams
-import grizzled.slf4j.Logging
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
-import org.json4s.DefaultFormats
-import org.json4s.Formats
-import org.json4s.JObject
-import org.json4s.JValue
-import org.json4s.native.JsonMethods._
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-
-class EngineServerPluginContext(
-    val plugins: mutable.Map[String, mutable.Map[String, EngineServerPlugin]],
-    val pluginParams: mutable.Map[String, JValue],
-    val log: LoggingAdapter) {
-  def outputBlockers: Map[String, EngineServerPlugin] =
-    plugins.getOrElse(EngineServerPlugin.outputBlocker, Map()).toMap
-  def outputSniffers: Map[String, EngineServerPlugin] =
-    plugins.getOrElse(EngineServerPlugin.outputSniffer, Map()).toMap
-}
-
-object EngineServerPluginContext extends Logging {
-  implicit val formats: Formats = DefaultFormats
-
-  def apply(log: LoggingAdapter, engineVariant: String): EngineServerPluginContext = {
-    val plugins = mutable.Map[String, mutable.Map[String, EngineServerPlugin]](
-      EngineServerPlugin.outputBlocker -> mutable.Map(),
-      EngineServerPlugin.outputSniffer -> mutable.Map())
-    val pluginParams = mutable.Map[String, JValue]()
-    val serviceLoader = ServiceLoader.load(classOf[EngineServerPlugin])
-    val variantJson = parse(stringFromFile(engineVariant))
-    (variantJson \ "plugins").extractOpt[JObject].foreach { pluginDefs =>
-      pluginDefs.obj.foreach { pluginParams += _ }
-    }
-    serviceLoader foreach { service =>
-      pluginParams.get(service.pluginName) map { params =>
-        if ((params \ "enabled").extractOrElse(false)) {
-          info(s"Plugin ${service.pluginName} is enabled.")
-          plugins(service.pluginType) += service.pluginName -> service
-        } else {
-          info(s"Plugin ${service.pluginName} is disabled.")
-        }
-      } getOrElse {
-        info(s"Plugin ${service.pluginName} is disabled.")
-      }
-    }
-    new EngineServerPluginContext(
-      plugins,
-      pluginParams,
-      log)
-  }
-
-  private def stringFromFile(filePath: String): String = {
-    try {
-      val uri = new URI(filePath)
-      val fs = FileSystem.get(uri, new Configuration())
-      new String(ByteStreams.toByteArray(fs.open(new Path(uri))).map(_.toChar))
-    } catch {
-      case e: java.io.IOException =>
-        error(s"Error reading from file: ${e.getMessage}. Aborting.")
-        sys.exit(1)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/EngineServerPluginsActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/workflow/EngineServerPluginsActor.scala b/core/src/main/scala/io/prediction/workflow/EngineServerPluginsActor.scala
deleted file mode 100644
index a346d8e..0000000
--- a/core/src/main/scala/io/prediction/workflow/EngineServerPluginsActor.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.workflow
-
-import akka.actor.Actor
-import akka.event.Logging
-import io.prediction.data.storage.EngineInstance
-import org.json4s.JValue
-
-class PluginsActor(engineVariant: String) extends Actor {
-  implicit val system = context.system
-  val log = Logging(system, this)
-
-  val pluginContext = EngineServerPluginContext(log, engineVariant)
-
-  def receive: PartialFunction[Any, Unit] = {
-    case (ei: EngineInstance, q: JValue, p: JValue) =>
-      pluginContext.outputSniffers.values.foreach(_.process(ei, q, p, pluginContext))
-    case h: PluginsActor.HandleREST =>
-      try {
-        sender() ! pluginContext.outputSniffers(h.pluginName).handleREST(h.pluginArgs)
-      } catch {
-        case e: Exception =>
-          sender() ! s"""{"message":"${e.getMessage}"}"""
-      }
-    case _ =>
-      log.error("Unknown message sent to the Engine Server output sniffer plugin host.")
-  }
-}
-
-object PluginsActor {
-  case class HandleREST(pluginName: String, pluginArgs: Seq[String])
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/EvaluationWorkflow.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/workflow/EvaluationWorkflow.scala b/core/src/main/scala/io/prediction/workflow/EvaluationWorkflow.scala
deleted file mode 100644
index ed70d87..0000000
--- a/core/src/main/scala/io/prediction/workflow/EvaluationWorkflow.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.workflow
-
-import io.prediction.controller.EngineParams
-import io.prediction.controller.Evaluation
-import io.prediction.core.BaseEvaluator
-import io.prediction.core.BaseEvaluatorResult
-import io.prediction.core.BaseEngine
-
-import grizzled.slf4j.Logger
-import org.apache.spark.SparkContext
-
-import scala.language.existentials
-
-object EvaluationWorkflow {
-  @transient lazy val logger = Logger[this.type]
-  def runEvaluation[EI, Q, P, A, R <: BaseEvaluatorResult](
-      sc: SparkContext,
-      evaluation: Evaluation,
-      engine: BaseEngine[EI, Q, P, A],
-      engineParamsList: Seq[EngineParams],
-      evaluator: BaseEvaluator[EI, Q, P, A, R],
-      params: WorkflowParams)
-    : R = {
-    val engineEvalDataSet = engine.batchEval(sc, engineParamsList, params)
-    evaluator.evaluateBase(sc, evaluation, engineEvalDataSet, params)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/FakeWorkflow.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/workflow/FakeWorkflow.scala b/core/src/main/scala/io/prediction/workflow/FakeWorkflow.scala
deleted file mode 100644
index 350a430..0000000
--- a/core/src/main/scala/io/prediction/workflow/FakeWorkflow.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.workflow
-
-import io.prediction.annotation.Experimental
-// FIXME(yipjustin): Remove wildcard import.
-import io.prediction.core._
-import io.prediction.controller._
-
-import grizzled.slf4j.Logger
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-
-@Experimental
-private[prediction] class FakeEngine
-extends BaseEngine[EmptyParams, EmptyParams, EmptyParams, EmptyParams] {
-  @transient lazy val logger = Logger[this.type]
-
-  def train(
-    sc: SparkContext,
-    engineParams: EngineParams,
-    engineInstanceId: String,
-    params: WorkflowParams): Seq[Any] = {
-    throw new StopAfterReadInterruption()
-  }
-
-  def eval(
-    sc: SparkContext,
-    engineParams: EngineParams,
-    params: WorkflowParams)
-  : Seq[(EmptyParams, RDD[(EmptyParams, EmptyParams, EmptyParams)])] = {
-    return Seq[(EmptyParams, RDD[(EmptyParams, EmptyParams, EmptyParams)])]()
-  }
-}
-
-@Experimental
-private[prediction] class FakeRunner(f: (SparkContext => Unit))
-    extends BaseEvaluator[EmptyParams, EmptyParams, EmptyParams, EmptyParams,
-      FakeEvalResult] {
-  @transient private lazy val logger = Logger[this.type]
-  def evaluateBase(
-    sc: SparkContext,
-    evaluation: Evaluation,
-    engineEvalDataSet:
-        Seq[(EngineParams, Seq[(EmptyParams, RDD[(EmptyParams, EmptyParams, EmptyParams)])])],
-    params: WorkflowParams): FakeEvalResult = {
-    f(sc)
-    FakeEvalResult()
-  }
-}
-
-@Experimental
-private[prediction] case class FakeEvalResult() extends BaseEvaluatorResult {
-  override val noSave: Boolean = true
-}
-
-/** FakeRun allows user to implement custom function under the exact enviroment
-  * as other PredictionIO workflow.
-  *
-  * Useful for developing new features. Only need to extend this trait and
-  * implement a function: (SparkContext => Unit). For example, the code below
-  * can be run with `pio eval HelloWorld`.
-  *
-  * {{{
-  * object HelloWorld extends FakeRun {
-  *   // func defines the function pio runs, must have signature (SparkContext => Unit).
-  *   func = f
-  *
-  *   def f(sc: SparkContext): Unit {
-  *     val logger = Logger[this.type]
-  *     logger.info("HelloWorld")
-  *   }
-  * }
-  * }}}
-  *
-  */
-@Experimental
-trait FakeRun extends Evaluation with EngineParamsGenerator {
-  private[this] var _runner: FakeRunner = _
-
-  def runner: FakeRunner = _runner
-  def runner_=(r: FakeRunner) {
-    engineEvaluator = (new FakeEngine(), r)
-    engineParamsList = Seq(new EngineParams())
-  }
-
-  def func: (SparkContext => Unit) = { (sc: SparkContext) => Unit }
-  def func_=(f: SparkContext => Unit) {
-    runner = new FakeRunner(f)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/JsonExtractor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/workflow/JsonExtractor.scala b/core/src/main/scala/io/prediction/workflow/JsonExtractor.scala
deleted file mode 100644
index 7034063..0000000
--- a/core/src/main/scala/io/prediction/workflow/JsonExtractor.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.workflow
-
-import com.google.gson.Gson
-import com.google.gson.GsonBuilder
-import com.google.gson.TypeAdapterFactory
-import io.prediction.controller.EngineParams
-import io.prediction.controller.Params
-import io.prediction.controller.Utils
-import io.prediction.workflow.JsonExtractorOption.JsonExtractorOption
-import org.json4s.Extraction
-import org.json4s.Formats
-import org.json4s.JsonAST.{JArray, JValue}
-import org.json4s.native.JsonMethods.compact
-import org.json4s.native.JsonMethods.pretty
-import org.json4s.native.JsonMethods.parse
-import org.json4s.native.JsonMethods.render
-import org.json4s.reflect.TypeInfo
-
-object JsonExtractor {
-
-  def toJValue(
-    extractorOption: JsonExtractorOption,
-    o: Any,
-    json4sFormats: Formats = Utils.json4sDefaultFormats,
-    gsonTypeAdapterFactories: Seq[TypeAdapterFactory] = Seq.empty[TypeAdapterFactory]): JValue = {
-
-    extractorOption match {
-      case JsonExtractorOption.Both =>
-
-          val json4sResult = Extraction.decompose(o)(json4sFormats)
-          json4sResult.children.size match {
-            case 0 => parse(gson(gsonTypeAdapterFactories).toJson(o))
-            case _ => json4sResult
-          }
-      case JsonExtractorOption.Json4sNative =>
-        Extraction.decompose(o)(json4sFormats)
-      case JsonExtractorOption.Gson =>
-        parse(gson(gsonTypeAdapterFactories).toJson(o))
-    }
-  }
-
-  def extract[T](
-    extractorOption: JsonExtractorOption,
-    json: String,
-    clazz: Class[T],
-    json4sFormats: Formats = Utils.json4sDefaultFormats,
-    gsonTypeAdapterFactories: Seq[TypeAdapterFactory] = Seq.empty[TypeAdapterFactory]): T = {
-
-    extractorOption match {
-      case JsonExtractorOption.Both =>
-        try {
-          extractWithJson4sNative(json, json4sFormats, clazz)
-        } catch {
-          case e: Exception =>
-            extractWithGson(json, clazz, gsonTypeAdapterFactories)
-        }
-      case JsonExtractorOption.Json4sNative =>
-        extractWithJson4sNative(json, json4sFormats, clazz)
-      case JsonExtractorOption.Gson =>
-        extractWithGson(json, clazz, gsonTypeAdapterFactories)
-    }
-  }
-
-  def paramToJson(extractorOption: JsonExtractorOption, param: (String, Params)): String = {
-    // to be replaced JValue needs to be done by Json4s, otherwise the tuple JValue will be wrong
-    val toBeReplacedJValue =
-      JsonExtractor.toJValue(JsonExtractorOption.Json4sNative, (param._1, null))
-    val paramJValue = JsonExtractor.toJValue(extractorOption, param._2)
-
-    compact(render(toBeReplacedJValue.replace(param._1 :: Nil, paramJValue)))
-  }
-
-  def paramsToJson(extractorOption: JsonExtractorOption, params: Seq[(String, Params)]): String = {
-    compact(render(paramsToJValue(extractorOption, params)))
-  }
-
-  def engineParamsToJson(extractorOption: JsonExtractorOption, params: EngineParams) : String = {
-    compact(render(engineParamsToJValue(extractorOption, params)))
-  }
-
-  def engineParamstoPrettyJson(
-    extractorOption: JsonExtractorOption,
-    params: EngineParams) : String = {
-
-    pretty(render(engineParamsToJValue(extractorOption, params)))
-  }
-
-  private def engineParamsToJValue(extractorOption: JsonExtractorOption, params: EngineParams) = {
-    var jValue = toJValue(JsonExtractorOption.Json4sNative, params)
-
-    val dataSourceParamsJValue = toJValue(extractorOption, params.dataSourceParams._2)
-    jValue = jValue.replace(
-      "dataSourceParams" :: params.dataSourceParams._1 :: Nil,
-      dataSourceParamsJValue)
-
-    val preparatorParamsJValue = toJValue(extractorOption, params.preparatorParams._2)
-    jValue = jValue.replace(
-      "preparatorParams" :: params.preparatorParams._1 :: Nil,
-      preparatorParamsJValue)
-
-    val algorithmParamsJValue = paramsToJValue(extractorOption, params.algorithmParamsList)
-    jValue = jValue.replace("algorithmParamsList" :: Nil, algorithmParamsJValue)
-
-    val servingParamsJValue = toJValue(extractorOption, params.servingParams._2)
-    jValue = jValue.replace("servingParams" :: params.servingParams._1 :: Nil, servingParamsJValue)
-
-    jValue
-  }
-
-  private
-  def paramsToJValue(extractorOption: JsonExtractorOption, params: Seq[(String, Params)]) = {
-    val jValues = params.map { case (name, param) =>
-      // to be replaced JValue needs to be done by Json4s, otherwise the tuple JValue will be wrong
-      val toBeReplacedJValue =
-        JsonExtractor.toJValue(JsonExtractorOption.Json4sNative, (name, null))
-      val paramJValue = JsonExtractor.toJValue(extractorOption, param)
-
-      toBeReplacedJValue.replace(name :: Nil, paramJValue)
-    }
-
-    JArray(jValues.toList)
-  }
-
-  private def extractWithJson4sNative[T](
-    json: String,
-    formats: Formats,
-    clazz: Class[T]): T = {
-
-    Extraction.extract(parse(json), TypeInfo(clazz, None))(formats).asInstanceOf[T]
-  }
-
-  private def extractWithGson[T](
-    json: String,
-    clazz: Class[T],
-    gsonTypeAdapterFactories: Seq[TypeAdapterFactory]): T = {
-
-    gson(gsonTypeAdapterFactories).fromJson(json, clazz)
-  }
-
-  private def gson(gsonTypeAdapterFactories: Seq[TypeAdapterFactory]): Gson = {
-    val gsonBuilder = new GsonBuilder()
-    gsonTypeAdapterFactories.foreach { typeAdapterFactory =>
-      gsonBuilder.registerTypeAdapterFactory(typeAdapterFactory)
-    }
-
-    gsonBuilder.create()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/JsonExtractorOption.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/workflow/JsonExtractorOption.scala b/core/src/main/scala/io/prediction/workflow/JsonExtractorOption.scala
deleted file mode 100644
index 60272fb..0000000
--- a/core/src/main/scala/io/prediction/workflow/JsonExtractorOption.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.workflow
-
-object JsonExtractorOption extends Enumeration {
-  type JsonExtractorOption = Value
-  val Json4sNative = Value
-  val Gson = Value
-  val Both = Value
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/PersistentModelManifest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/workflow/PersistentModelManifest.scala b/core/src/main/scala/io/prediction/workflow/PersistentModelManifest.scala
deleted file mode 100644
index c1c0a6d..0000000
--- a/core/src/main/scala/io/prediction/workflow/PersistentModelManifest.scala
+++ /dev/null
@@ -1,18 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.workflow
-
-case class PersistentModelManifest(className: String)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/Workflow.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/workflow/Workflow.scala b/core/src/main/scala/io/prediction/workflow/Workflow.scala
deleted file mode 100644
index c0543ab..0000000
--- a/core/src/main/scala/io/prediction/workflow/Workflow.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.workflow
-
-import io.prediction.annotation.Experimental
-import io.prediction.controller.EngineParams
-import io.prediction.controller.EngineParamsGenerator
-import io.prediction.controller.Evaluation
-import io.prediction.core.BaseEngine
-import io.prediction.core.BaseEvaluator
-import io.prediction.core.BaseEvaluatorResult
-import io.prediction.data.storage.EvaluationInstance
-
-/** Collection of workflow creation methods.
-  * @group Workflow
-  */
-object Workflow {
-  // evaluator is already instantiated.
-  // This is an undocumented way of using evaluator. Still experimental.
-  // evaluatorParams is used to write into EngineInstance, will be shown in
-  // dashboard.
-  /*
-  def runEval[EI, Q, P, A, ER <: AnyRef](
-      engine: BaseEngine[EI, Q, P, A],
-      engineParams: EngineParams,
-      evaluator: BaseEvaluator[EI, Q, P, A, ER],
-      evaluatorParams: Params,
-      env: Map[String, String] = WorkflowUtils.pioEnvVars,
-      params: WorkflowParams = WorkflowParams()) {
-
-    implicit lazy val formats = Utils.json4sDefaultFormats +
-      new NameParamsSerializer
-
-    val engineInstance = EngineInstance(
-      id = "",
-      status = "INIT",
-      startTime = DateTime.now,
-      endTime = DateTime.now,
-      engineId = "",
-      engineVersion = "",
-      engineVariant = "",
-      engineFactory = "FIXME",
-      evaluatorClass = evaluator.getClass.getName(),
-      batch = params.batch,
-      env = env,
-      sparkConf = params.sparkEnv,
-      dataSourceParams = write(engineParams.dataSourceParams),
-      preparatorParams = write(engineParams.preparatorParams),
-      algorithmsParams = write(engineParams.algorithmParamsList),
-      servingParams = write(engineParams.servingParams),
-      evaluatorParams = write(evaluatorParams),
-      evaluatorResults = "",
-      evaluatorResultsHTML = "",
-      evaluatorResultsJSON = "")
-
-    CoreWorkflow.runEval(
-      engine = engine,
-      engineParams = engineParams,
-      engineInstance = engineInstance,
-      evaluator = evaluator,
-      evaluatorParams = evaluatorParams,
-      env = env,
-      params = params)
-  }
-  */
-
-  def runEvaluation(
-      evaluation: Evaluation,
-      engineParamsGenerator: EngineParamsGenerator,
-      env: Map[String, String] = WorkflowUtils.pioEnvVars,
-      evaluationInstance: EvaluationInstance = EvaluationInstance(),
-      params: WorkflowParams = WorkflowParams()) {
-    runEvaluationTypeless(
-      evaluation = evaluation,
-      engine = evaluation.engine,
-      engineParamsList = engineParamsGenerator.engineParamsList,
-      evaluationInstance = evaluationInstance,
-      evaluator = evaluation.evaluator,
-      env = env,
-      params = params
-    )
-  }
-
-  def runEvaluationTypeless[
-      EI, Q, P, A, EEI, EQ, EP, EA, ER <: BaseEvaluatorResult](
-      evaluation: Evaluation,
-      engine: BaseEngine[EI, Q, P, A],
-      engineParamsList: Seq[EngineParams],
-      evaluationInstance: EvaluationInstance,
-      evaluator: BaseEvaluator[EEI, EQ, EP, EA, ER],
-      env: Map[String, String] = WorkflowUtils.pioEnvVars,
-      params: WorkflowParams = WorkflowParams()) {
-    runEvaluationViaCoreWorkflow(
-      evaluation = evaluation,
-      engine = engine,
-      engineParamsList = engineParamsList,
-      evaluationInstance = evaluationInstance,
-      evaluator = evaluator.asInstanceOf[BaseEvaluator[EI, Q, P, A, ER]],
-      env = env,
-      params = params)
-  }
-
-  /** :: Experimental :: */
-  @Experimental
-  def runEvaluationViaCoreWorkflow[EI, Q, P, A, R <: BaseEvaluatorResult](
-      evaluation: Evaluation,
-      engine: BaseEngine[EI, Q, P, A],
-      engineParamsList: Seq[EngineParams],
-      evaluationInstance: EvaluationInstance,
-      evaluator: BaseEvaluator[EI, Q, P, A, R],
-      env: Map[String, String] = WorkflowUtils.pioEnvVars,
-      params: WorkflowParams = WorkflowParams()) {
-    CoreWorkflow.runEvaluation(
-      evaluation = evaluation,
-      engine = engine,
-      engineParamsList = engineParamsList,
-      evaluationInstance = evaluationInstance,
-      evaluator = evaluator,
-      env = env,
-      params = params)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/WorkflowContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/workflow/WorkflowContext.scala b/core/src/main/scala/io/prediction/workflow/WorkflowContext.scala
deleted file mode 100644
index 264c757..0000000
--- a/core/src/main/scala/io/prediction/workflow/WorkflowContext.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.workflow
-
-import grizzled.slf4j.Logging
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkConf
-
-import scala.language.existentials
-
-// FIXME: move to better location.
-object WorkflowContext extends Logging {
-  def apply(
-      batch: String = "",
-      executorEnv: Map[String, String] = Map(),
-      sparkEnv: Map[String, String] = Map(),
-      mode: String = ""
-    ): SparkContext = {
-    val conf = new SparkConf()
-    val prefix = if (mode == "") "PredictionIO" else s"PredictionIO ${mode}"
-    conf.setAppName(s"${prefix}: ${batch}")
-    debug(s"Executor environment received: ${executorEnv}")
-    executorEnv.map(kv => conf.setExecutorEnv(kv._1, kv._2))
-    debug(s"SparkConf executor environment: ${conf.getExecutorEnv}")
-    debug(s"Application environment received: ${sparkEnv}")
-    conf.setAll(sparkEnv)
-    val sparkConfString = conf.getAll.toSeq
-    debug(s"SparkConf environment: $sparkConfString")
-    new SparkContext(conf)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/WorkflowParams.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/workflow/WorkflowParams.scala b/core/src/main/scala/io/prediction/workflow/WorkflowParams.scala
deleted file mode 100644
index 88ec54e..0000000
--- a/core/src/main/scala/io/prediction/workflow/WorkflowParams.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.workflow
-
-/** Workflow parameters.
-  *
-  * @param batch Batch label of the run.
-  * @param verbose Verbosity level.
-  * @param saveModel Controls whether trained models are persisted.
-  * @param sparkEnv Spark properties that will be set in SparkConf.setAll().
-  * @param skipSanityCheck Skips all data sanity check.
-  * @param stopAfterRead Stops workflow after reading from data source.
-  * @param stopAfterPrepare Stops workflow after data preparation.
-  * @group Workflow
-  */
-case class WorkflowParams(
-  batch: String = "",
-  verbose: Int = 2,
-  saveModel: Boolean = true,
-  sparkEnv: Map[String, String] =
-    Map[String, String]("spark.executor.extraClassPath" -> "."),
-  skipSanityCheck: Boolean = false,
-  stopAfterRead: Boolean = false,
-  stopAfterPrepare: Boolean = false) {
-  // Temporary workaround for WorkflowParamsBuilder for Java. It doesn't support
-  // custom spark environment yet.
-  def this(batch: String, verbose: Int, saveModel: Boolean)
-  = this(batch, verbose, saveModel, Map[String, String]())
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/WorkflowUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/workflow/WorkflowUtils.scala b/core/src/main/scala/io/prediction/workflow/WorkflowUtils.scala
deleted file mode 100644
index d93b9eb..0000000
--- a/core/src/main/scala/io/prediction/workflow/WorkflowUtils.scala
+++ /dev/null
@@ -1,419 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.workflow
-
-import java.io.File
-import java.io.FileNotFoundException
-
-import io.prediction.controller.EmptyParams
-import io.prediction.controller.EngineFactory
-import io.prediction.controller.EngineParamsGenerator
-import io.prediction.controller.Evaluation
-import io.prediction.controller.Params
-import io.prediction.controller.PersistentModelLoader
-import io.prediction.controller.Utils
-import io.prediction.core.BuildInfo
-
-import com.google.gson.Gson
-import com.google.gson.JsonSyntaxException
-import grizzled.slf4j.Logging
-import io.prediction.workflow.JsonExtractorOption.JsonExtractorOption
-import org.apache.log4j.Level
-import org.apache.log4j.LogManager
-import org.apache.spark.SparkContext
-import org.apache.spark.api.java.JavaRDDLike
-import org.apache.spark.rdd.RDD
-import org.json4s.JsonAST.JValue
-import org.json4s.MappingException
-import org.json4s._
-import org.json4s.native.JsonMethods._
-
-import scala.io.Source
-import scala.language.existentials
-import scala.reflect.runtime.universe
-
-/** Collection of reusable workflow related utilities. */
-object WorkflowUtils extends Logging {
-  @transient private lazy val gson = new Gson
-
-  /** Obtains an Engine object in Scala, or instantiate an Engine in Java.
-    *
-    * @param engine Engine factory name.
-    * @param cl A Java ClassLoader to look for engine-related classes.
-    *
-    * @throws ClassNotFoundException
-    *         Thrown when engine factory class does not exist.
-    * @throws NoSuchMethodException
-    *         Thrown when engine factory's apply() method is not implemented.
-    */
-  def getEngine(engine: String, cl: ClassLoader): (EngineLanguage.Value, EngineFactory) = {
-    val runtimeMirror = universe.runtimeMirror(cl)
-    val engineModule = runtimeMirror.staticModule(engine)
-    val engineObject = runtimeMirror.reflectModule(engineModule)
-    try {
-      (
-        EngineLanguage.Scala,
-        engineObject.instance.asInstanceOf[EngineFactory]
-      )
-    } catch {
-      case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try {
-        (
-          EngineLanguage.Java,
-          Class.forName(engine).newInstance.asInstanceOf[EngineFactory]
-        )
-      }
-    }
-  }
-
-  def getEngineParamsGenerator(epg: String, cl: ClassLoader):
-    (EngineLanguage.Value, EngineParamsGenerator) = {
-    val runtimeMirror = universe.runtimeMirror(cl)
-    val epgModule = runtimeMirror.staticModule(epg)
-    val epgObject = runtimeMirror.reflectModule(epgModule)
-    try {
-      (
-        EngineLanguage.Scala,
-        epgObject.instance.asInstanceOf[EngineParamsGenerator]
-      )
-    } catch {
-      case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try {
-        (
-          EngineLanguage.Java,
-          Class.forName(epg).newInstance.asInstanceOf[EngineParamsGenerator]
-        )
-      }
-    }
-  }
-
-  def getEvaluation(evaluation: String, cl: ClassLoader): (EngineLanguage.Value, Evaluation) = {
-    val runtimeMirror = universe.runtimeMirror(cl)
-    val evaluationModule = runtimeMirror.staticModule(evaluation)
-    val evaluationObject = runtimeMirror.reflectModule(evaluationModule)
-    try {
-      (
-        EngineLanguage.Scala,
-        evaluationObject.instance.asInstanceOf[Evaluation]
-      )
-    } catch {
-      case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try {
-        (
-          EngineLanguage.Java,
-          Class.forName(evaluation).newInstance.asInstanceOf[Evaluation]
-        )
-      }
-    }
-  }
-
-  /** Converts a JSON document to an instance of Params.
-    *
-    * @param language Engine's programming language.
-    * @param json JSON document.
-    * @param clazz Class of the component that is going to receive the resulting
-    *              Params instance as a constructor argument.
-    * @param jsonExtractor JSON extractor option.
-    * @param formats JSON4S serializers for deserialization.
-    *
-    * @throws MappingException Thrown when JSON4S fails to perform conversion.
-    * @throws JsonSyntaxException Thrown when GSON fails to perform conversion.
-    */
-  def extractParams(
-      language: EngineLanguage.Value = EngineLanguage.Scala,
-      json: String,
-      clazz: Class[_],
-      jsonExtractor: JsonExtractorOption,
-      formats: Formats = Utils.json4sDefaultFormats): Params = {
-    implicit val f = formats
-    val pClass = clazz.getConstructors.head.getParameterTypes
-    if (pClass.size == 0) {
-      if (json != "") {
-        warn(s"Non-empty parameters supplied to ${clazz.getName}, but its " +
-          "constructor does not accept any arguments. Stubbing with empty " +
-          "parameters.")
-      }
-      EmptyParams()
-    } else {
-      val apClass = pClass.head
-      try {
-        JsonExtractor.extract(jsonExtractor, json, apClass, f).asInstanceOf[Params]
-      } catch {
-        case e@(_: MappingException | _: JsonSyntaxException) =>
-          error(
-            s"Unable to extract parameters for ${apClass.getName} from " +
-              s"JSON string: $json. Aborting workflow.",
-            e)
-          throw e
-      }
-    }
-  }
-
-  def getParamsFromJsonByFieldAndClass(
-      variantJson: JValue,
-      field: String,
-      classMap: Map[String, Class[_]],
-      engineLanguage: EngineLanguage.Value,
-      jsonExtractor: JsonExtractorOption): (String, Params) = {
-    variantJson findField {
-      case JField(f, _) => f == field
-      case _ => false
-    } map { jv =>
-      implicit lazy val formats = Utils.json4sDefaultFormats + new NameParamsSerializer
-      val np: NameParams = try {
-        jv._2.extract[NameParams]
-      } catch {
-        case e: Exception =>
-          error(s"Unable to extract $field name and params $jv")
-          throw e
-      }
-      val extractedParams = np.params.map { p =>
-        try {
-          if (!classMap.contains(np.name)) {
-            error(s"Unable to find $field class with name '${np.name}'" +
-              " defined in Engine.")
-            sys.exit(1)
-          }
-          WorkflowUtils.extractParams(
-            engineLanguage,
-            compact(render(p)),
-            classMap(np.name),
-            jsonExtractor,
-            formats)
-        } catch {
-          case e: Exception =>
-            error(s"Unable to extract $field params $p")
-            throw e
-        }
-      }.getOrElse(EmptyParams())
-
-      (np.name, extractedParams)
-    } getOrElse("", EmptyParams())
-  }
-
-  /** Grab environmental variables that starts with 'PIO_'. */
-  def pioEnvVars: Map[String, String] =
-    sys.env.filter(kv => kv._1.startsWith("PIO_"))
-
-  /** Converts Java (non-Scala) objects to a JSON4S JValue.
-    *
-    * @param params The Java object to be converted.
-    */
-  def javaObjectToJValue(params: AnyRef): JValue = parse(gson.toJson(params))
-
-  private[prediction] def checkUpgrade(
-      component: String = "core",
-      engine: String = ""): Unit = {
-    val runner = new Thread(new UpgradeCheckRunner(component, engine))
-    runner.start()
-  }
-
-  // Extract debug string by recursively traversing the data.
-  def debugString[D](data: D): String = {
-    val s: String = data match {
-      case rdd: RDD[_] => {
-        debugString(rdd.collect())
-      }
-      case javaRdd: JavaRDDLike[_, _] => {
-        debugString(javaRdd.collect())
-      }
-      case array: Array[_] => {
-        "[" + array.map(debugString).mkString(",") + "]"
-      }
-      case d: AnyRef => {
-        d.toString
-      }
-      case null => "null"
-    }
-    s
-  }
-
-  /** Detect third party software configuration files to be submitted as
-    * extras to Apache Spark. This makes sure all executors receive the same
-    * configuration.
-    */
-  def thirdPartyConfFiles: Seq[String] = {
-    val thirdPartyFiles = Map(
-      "PIO_CONF_DIR" -> "log4j.properties",
-      "ES_CONF_DIR" -> "elasticsearch.yml",
-      "HADOOP_CONF_DIR" -> "core-site.xml",
-      "HBASE_CONF_DIR" -> "hbase-site.xml")
-
-    thirdPartyFiles.keys.toSeq.map { k: String =>
-      sys.env.get(k) map { x =>
-        val p = Seq(x, thirdPartyFiles(k)).mkString(File.separator)
-        if (new File(p).exists) Seq(p) else Seq[String]()
-      } getOrElse Seq[String]()
-    }.flatten
-  }
-
-  def thirdPartyClasspaths: Seq[String] = {
-    val thirdPartyPaths = Seq(
-      "PIO_CONF_DIR",
-      "ES_CONF_DIR",
-      "POSTGRES_JDBC_DRIVER",
-      "MYSQL_JDBC_DRIVER",
-      "HADOOP_CONF_DIR",
-      "HBASE_CONF_DIR")
-    thirdPartyPaths.map(p =>
-      sys.env.get(p).map(Seq(_)).getOrElse(Seq[String]())
-    ).flatten
-  }
-
-  def modifyLogging(verbose: Boolean): Unit = {
-    val rootLoggerLevel = if (verbose) Level.TRACE else Level.INFO
-    val chattyLoggerLevel = if (verbose) Level.INFO else Level.WARN
-
-    LogManager.getRootLogger.setLevel(rootLoggerLevel)
-
-    LogManager.getLogger("org.elasticsearch").setLevel(chattyLoggerLevel)
-    LogManager.getLogger("org.apache.hadoop").setLevel(chattyLoggerLevel)
-    LogManager.getLogger("org.apache.spark").setLevel(chattyLoggerLevel)
-    LogManager.getLogger("org.eclipse.jetty").setLevel(chattyLoggerLevel)
-    LogManager.getLogger("akka").setLevel(chattyLoggerLevel)
-  }
-
-  def extractNameParams(jv: JValue): NameParams = {
-    implicit val formats = Utils.json4sDefaultFormats
-    val nameOpt = (jv \ "name").extract[Option[String]]
-    val paramsOpt = (jv \ "params").extract[Option[JValue]]
-
-    if (nameOpt.isEmpty && paramsOpt.isEmpty) {
-      error("Unable to find 'name' or 'params' fields in" +
-        s" ${compact(render(jv))}.\n" +
-        "Since 0.8.4, the 'params' field is required in engine.json" +
-        " in order to specify parameters for DataSource, Preparator or" +
-        " Serving.\n" +
-        "Please go to https://docs.prediction.io/resources/upgrade/" +
-        " for detailed instruction of how to change engine.json.")
-      sys.exit(1)
-    }
-
-    if (nameOpt.isEmpty) {
-      info(s"No 'name' is found. Default empty String will be used.")
-    }
-
-    if (paramsOpt.isEmpty) {
-      info(s"No 'params' is found. Default EmptyParams will be used.")
-    }
-
-    NameParams(
-      name = nameOpt.getOrElse(""),
-      params = paramsOpt
-    )
-  }
-
-  def extractSparkConf(root: JValue): List[(String, String)] = {
-    def flatten(jv: JValue): List[(List[String], String)] = {
-      jv match {
-        case JObject(fields) =>
-          for ((namePrefix, childJV) <- fields;
-               (name, value) <- flatten(childJV))
-          yield (namePrefix :: name) -> value
-        case JArray(_) => {
-          error("Arrays are not allowed in the sparkConf section of engine.js.")
-          sys.exit(1)
-        }
-        case JNothing => List()
-        case _ => List(List() -> jv.values.toString)
-      }
-    }
-
-    flatten(root \ "sparkConf").map(x =>
-      (x._1.reduce((a, b) => s"$a.$b"), x._2))
-  }
-}
-
-case class NameParams(name: String, params: Option[JValue])
-
-class NameParamsSerializer extends CustomSerializer[NameParams](format => ( {
-  case jv: JValue => WorkflowUtils.extractNameParams(jv)
-}, {
-  case x: NameParams =>
-    JObject(JField("name", JString(x.name)) ::
-      JField("params", x.params.getOrElse(JNothing)) :: Nil)
-}
-  ))
-
-/** Collection of reusable workflow related utilities that touch on Apache
-  * Spark. They are separated to avoid compilation problems with certain code.
-  */
-object SparkWorkflowUtils extends Logging {
-  def getPersistentModel[AP <: Params, M](
-      pmm: PersistentModelManifest,
-      runId: String,
-      params: AP,
-      sc: Option[SparkContext],
-      cl: ClassLoader): M = {
-    val runtimeMirror = universe.runtimeMirror(cl)
-    val pmmModule = runtimeMirror.staticModule(pmm.className)
-    val pmmObject = runtimeMirror.reflectModule(pmmModule)
-    try {
-      pmmObject.instance.asInstanceOf[PersistentModelLoader[AP, M]](
-        runId,
-        params,
-        sc)
-    } catch {
-      case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try {
-        val loadMethod = Class.forName(pmm.className).getMethod(
-          "load",
-          classOf[String],
-          classOf[Params],
-          classOf[SparkContext])
-        loadMethod.invoke(null, runId, params, sc.orNull).asInstanceOf[M]
-      } catch {
-        case e: ClassNotFoundException =>
-          error(s"Model class ${pmm.className} cannot be found.")
-          throw e
-        case e: NoSuchMethodException =>
-          error(
-            "The load(String, Params, SparkContext) method cannot be found.")
-          throw e
-      }
-    }
-  }
-}
-
-class UpgradeCheckRunner(
-    val component: String,
-    val engine: String) extends Runnable with Logging {
-  val version = BuildInfo.version
-  val versionsHost = "https://direct.prediction.io/"
-
-  def run(): Unit = {
-    val url = if (engine == "") {
-      s"$versionsHost$version/$component.json"
-    } else {
-      s"$versionsHost$version/$component/$engine.json"
-    }
-    try {
-      val upgradeData = Source.fromURL(url)
-    } catch {
-      case e: FileNotFoundException =>
-        debug(s"Update metainfo not found. $url")
-      case e: java.net.UnknownHostException =>
-        debug(s"${e.getClass.getName}: {e.getMessage}")
-    }
-    // TODO: Implement upgrade logic
-  }
-}
-
-class WorkflowInterruption() extends Exception
-
-case class StopAfterReadInterruption() extends WorkflowInterruption
-
-case class StopAfterPrepareInterruption() extends WorkflowInterruption
-
-object EngineLanguage extends Enumeration {
-  val Scala, Java = Value
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/CustomQuerySerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/CustomQuerySerializer.scala b/core/src/main/scala/org/apache/predictionio/controller/CustomQuerySerializer.scala
new file mode 100644
index 0000000..2fa5551
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/CustomQuerySerializer.scala
@@ -0,0 +1,37 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseQuerySerializer
+
+/** If your query class cannot be automatically serialized/deserialized to/from
+  * JSON, implement a trait by extending this trait, and overriding the
+  * `querySerializer` member with your
+  * [[https://github.com/json4s/json4s#serializing-non-supported-types custom JSON4S serializer]].
+  * Algorithm and serving classes using your query class would only need to mix
+  * in the trait to enable the custom serializer.
+  *
+  * @group Helper
+  */
+trait CustomQuerySerializer extends BaseQuerySerializer
+
+/** DEPRECATED. Use [[CustomQuerySerializer]] instead.
+  *
+  * @group Helper
+  */
+@deprecated("Use CustomQuerySerializer instead.", "0.9.2")
+trait WithQuerySerializer extends CustomQuerySerializer
+

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/Deployment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/Deployment.scala b/core/src/main/scala/org/apache/predictionio/controller/Deployment.scala
new file mode 100644
index 0000000..76fe0b3
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/Deployment.scala
@@ -0,0 +1,56 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseEngine
+
+import scala.language.implicitConversions
+
+/** Defines a deployment that contains an [[Engine]]
+  *
+  * @group Engine
+  */
+trait Deployment extends EngineFactory {
+  protected[this] var _engine: BaseEngine[_, _, _, _] = _
+  protected[this] var engineSet: Boolean = false
+
+  /** Returns the [[Engine]] of this [[Deployment]] */
+  def apply(): BaseEngine[_, _, _, _] = {
+    assert(engineSet, "Engine not set")
+    _engine
+  }
+
+  /** Returns the [[Engine]] contained in this [[Deployment]]. */
+  private [prediction]
+  def engine: BaseEngine[_, _, _, _] = {
+    assert(engineSet, "Engine not set")
+    _engine
+  }
+
+  /** Sets the [[Engine]] for this [[Deployment]]
+    *
+    * @param engine An implementation of [[Engine]]
+    * @tparam EI Evaluation information class
+    * @tparam Q Query class
+    * @tparam P Predicted result class
+    * @tparam A Actual result class
+    */
+  def engine_=[EI, Q, P, A](engine: BaseEngine[EI, Q, P, A]) {
+    assert(!engineSet, "Engine can be set at most once")
+    _engine = engine
+    engineSet = true
+  }
+}


Mime
View raw message