predictionio-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From don...@apache.org
Subject [03/34] incubator-predictionio git commit: rename all except examples
Date Mon, 18 Jul 2016 20:17:34 GMT
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/io/prediction/tools/dashboard/CorsSupport.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/io/prediction/tools/dashboard/CorsSupport.scala b/tools/src/main/scala/io/prediction/tools/dashboard/CorsSupport.scala
deleted file mode 100644
index 3d2c888..0000000
--- a/tools/src/main/scala/io/prediction/tools/dashboard/CorsSupport.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.tools.dashboard
-
-// Reference from: https://gist.github.com/waymost/4b5598523c2c7361abea
-
-import spray.http.{HttpMethods, HttpMethod, HttpResponse, AllOrigins}
-import spray.http.HttpHeaders._
-import spray.http.HttpMethods._
-import spray.http.HttpEntity
-import spray.routing._
-import spray.http.StatusCodes
-import spray.http.ContentTypes
-
-// see also https://developer.mozilla.org/en-US/docs/Web/HTTP/Access_control_CORS
-trait CORSSupport {
-  this: HttpService =>
-
-  private val allowOriginHeader = `Access-Control-Allow-Origin`(AllOrigins)
-  private val optionsCorsHeaders = List(
-    `Access-Control-Allow-Headers`("""Origin,
-                                      |X-Requested-With,
-                                      |Content-Type,
-                                      |Accept,
-                                      |Accept-Encoding,
-                                      |Accept-Language,
-                                      |Host,
-                                      |Referer,
-                                      |User-Agent""".stripMargin.replace("\n", " ")),
-    `Access-Control-Max-Age`(1728000)
-  )
-
-  def cors[T]: Directive0 = mapRequestContext { ctx =>
-    ctx.withRouteResponseHandling {
-      // OPTION request for a resource that responds to other methods
-      case Rejected(x) if (ctx.request.method.equals(HttpMethods.OPTIONS) &&
-          x.exists(_.isInstanceOf[MethodRejection])) => {
-        val allowedMethods: List[HttpMethod] = x.collect {
-          case rejection: MethodRejection => rejection.supported
-        }
-        ctx.complete {
-          HttpResponse().withHeaders(
-            `Access-Control-Allow-Methods`(HttpMethods.OPTIONS, allowedMethods :_*) ::
-            allowOriginHeader ::
-            optionsCorsHeaders
-          )
-        }
-      }
-    }.withHttpResponseHeadersMapped { headers =>
-      allowOriginHeader :: headers
-    }
-  }
-
-  override def timeoutRoute: StandardRoute = complete {
-    HttpResponse(
-      StatusCodes.InternalServerError,
-      HttpEntity(ContentTypes.`text/plain(UTF-8)`,
-          "The server was not able to produce a timely response to your request."),
-      List(allowOriginHeader)
-    )
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/io/prediction/tools/dashboard/Dashboard.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/io/prediction/tools/dashboard/Dashboard.scala b/tools/src/main/scala/io/prediction/tools/dashboard/Dashboard.scala
deleted file mode 100644
index 154ba4e..0000000
--- a/tools/src/main/scala/io/prediction/tools/dashboard/Dashboard.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.tools.dashboard
-
-import com.typesafe.config.ConfigFactory
-import io.prediction.authentication.KeyAuthentication
-import io.prediction.configuration.SSLConfiguration
-import io.prediction.data.storage.Storage
-import spray.can.server.ServerSettings
-import spray.routing.directives.AuthMagnet
-import scala.concurrent.{Future, ExecutionContext}
-import akka.actor.{ActorContext, Actor, ActorSystem, Props}
-import akka.io.IO
-import akka.pattern.ask
-import akka.util.Timeout
-import com.github.nscala_time.time.Imports.DateTime
-import grizzled.slf4j.Logging
-import spray.can.Http
-import spray.http._
-import spray.http.MediaTypes._
-import spray.routing._
-import spray.routing.authentication.{Authentication, UserPass, BasicAuth}
-
-import scala.concurrent.duration._
-
-case class DashboardConfig(
-  ip: String = "localhost",
-  port: Int = 9000)
-
-object Dashboard extends Logging with SSLConfiguration{
-  def main(args: Array[String]): Unit = {
-    val parser = new scopt.OptionParser[DashboardConfig]("Dashboard") {
-      opt[String]("ip") action { (x, c) =>
-        c.copy(ip = x)
-      } text("IP to bind to (default: localhost).")
-      opt[Int]("port") action { (x, c) =>
-        c.copy(port = x)
-      } text("Port to bind to (default: 9000).")
-    }
-
-    parser.parse(args, DashboardConfig()) map { dc =>
-      createDashboard(dc)
-    }
-  }
-
-  def createDashboard(dc: DashboardConfig): Unit = {
-    implicit val system = ActorSystem("pio-dashboard")
-    val service =
-      system.actorOf(Props(classOf[DashboardActor], dc), "dashboard")
-    implicit val timeout = Timeout(5.seconds)
-    val settings = ServerSettings(system)
-    IO(Http) ? Http.Bind(
-      service,
-      interface = dc.ip,
-      port = dc.port,
-      settings = Some(settings.copy(sslEncryption = true)))
-    system.awaitTermination
-  }
-}
-
-class DashboardActor(
-    val dc: DashboardConfig)
-  extends Actor with DashboardService {
-  def actorRefFactory: ActorContext = context
-  def receive: Actor.Receive = runRoute(dashboardRoute)
-}
-
-trait DashboardService extends HttpService with KeyAuthentication with CORSSupport {
-
-  implicit def executionContext: ExecutionContext = actorRefFactory.dispatcher
-  val dc: DashboardConfig
-  val evaluationInstances = Storage.getMetaDataEvaluationInstances
-  val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_"))
-  val serverStartTime = DateTime.now
-  val dashboardRoute =
-    path("") {
-      authenticate(withAccessKeyFromFile) { request =>
-        get {
-          respondWithMediaType(`text/html`) {
-            complete {
-              val completedInstances = evaluationInstances.getCompleted
-              html.index(
-                dc,
-                serverStartTime,
-                pioEnvVars,
-                completedInstances).toString
-            }
-          }
-        }
-      }
-    } ~
-    pathPrefix("engine_instances" / Segment) { instanceId =>
-      path("evaluator_results.txt") {
-        get {
-          respondWithMediaType(`text/plain`) {
-            evaluationInstances.get(instanceId).map { i =>
-              complete(i.evaluatorResults)
-            } getOrElse {
-              complete(StatusCodes.NotFound)
-            }
-          }
-        }
-      } ~
-      path("evaluator_results.html") {
-        get {
-          respondWithMediaType(`text/html`) {
-            evaluationInstances.get(instanceId).map { i =>
-              complete(i.evaluatorResultsHTML)
-            } getOrElse {
-              complete(StatusCodes.NotFound)
-            }
-          }
-        }
-      } ~
-      path("evaluator_results.json") {
-        get {
-          respondWithMediaType(`application/json`) {
-            evaluationInstances.get(instanceId).map { i =>
-              complete(i.evaluatorResultsJSON)
-            } getOrElse {
-              complete(StatusCodes.NotFound)
-            }
-          }
-        }
-      } ~
-      cors {
-        path("local_evaluator_results.json") {
-          get {
-            respondWithMediaType(`application/json`) {
-              evaluationInstances.get(instanceId).map { i =>
-                complete(i.evaluatorResultsJSON)
-              } getOrElse {
-                complete(StatusCodes.NotFound)
-              }
-            }
-          }
-        }
-      }
-    } ~
-    pathPrefix("assets") {
-      getFromResourceDirectory("assets")
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/io/prediction/tools/export/EventsToFile.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/io/prediction/tools/export/EventsToFile.scala b/tools/src/main/scala/io/prediction/tools/export/EventsToFile.scala
deleted file mode 100644
index 743d57a..0000000
--- a/tools/src/main/scala/io/prediction/tools/export/EventsToFile.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.tools.export
-
-import io.prediction.controller.Utils
-import io.prediction.data.storage.EventJson4sSupport
-import io.prediction.data.storage.Storage
-import io.prediction.tools.Runner
-import io.prediction.workflow.WorkflowContext
-import io.prediction.workflow.WorkflowUtils
-
-import grizzled.slf4j.Logging
-import org.apache.spark.sql.SQLContext
-import org.json4s.native.Serialization._
-
-case class EventsToFileArgs(
-  env: String = "",
-  logFile: String = "",
-  appId: Int = 0,
-  channel: Option[String] = None,
-  outputPath: String = "",
-  format: String = "parquet",
-  verbose: Boolean = false,
-  debug: Boolean = false)
-
-object EventsToFile extends Logging {
-  def main(args: Array[String]): Unit = {
-    val parser = new scopt.OptionParser[EventsToFileArgs]("EventsToFile") {
-      opt[String]("env") action { (x, c) =>
-        c.copy(env = x)
-      }
-      opt[String]("log-file") action { (x, c) =>
-        c.copy(logFile = x)
-      }
-      opt[Int]("appid") action { (x, c) =>
-        c.copy(appId = x)
-      }
-      opt[String]("channel") action { (x, c) =>
-        c.copy(channel = Some(x))
-      }
-      opt[String]("format") action { (x, c) =>
-        c.copy(format = x)
-      }
-      opt[String]("output") action { (x, c) =>
-        c.copy(outputPath = x)
-      }
-      opt[Unit]("verbose") action { (x, c) =>
-        c.copy(verbose = true)
-      }
-      opt[Unit]("debug") action { (x, c) =>
-        c.copy(debug = true)
-      }
-    }
-    parser.parse(args, EventsToFileArgs()) map { args =>
-      // get channelId
-      val channels = Storage.getMetaDataChannels
-      val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap
-
-      val channelId: Option[Int] = args.channel.map { ch =>
-        if (!channelMap.contains(ch)) {
-          error(s"Channel ${ch} doesn't exist in this app.")
-          sys.exit(1)
-        }
-
-        channelMap(ch)
-      }
-
-      val channelStr = args.channel.map(n => " Channel " + n).getOrElse("")
-
-      WorkflowUtils.modifyLogging(verbose = args.verbose)
-      @transient lazy implicit val formats = Utils.json4sDefaultFormats +
-        new EventJson4sSupport.APISerializer
-      val sc = WorkflowContext(
-        mode = "Export",
-        batch = "App ID " + args.appId + channelStr,
-        executorEnv = Runner.envStringToMap(args.env))
-      val sqlContext = new SQLContext(sc)
-      val events = Storage.getPEvents()
-      val eventsRdd = events.find(appId = args.appId, channelId = channelId)(sc)
-      val jsonStringRdd = eventsRdd.map(write(_))
-      if (args.format == "json") {
-        jsonStringRdd.saveAsTextFile(args.outputPath)
-      } else {
-        val jsonRdd = sqlContext.jsonRDD(jsonStringRdd)
-        jsonRdd.saveAsParquetFile(args.outputPath)
-      }
-      info(s"Events are exported to ${args.outputPath}/.")
-      info("Done.")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/io/prediction/tools/imprt/FileToEvents.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/io/prediction/tools/imprt/FileToEvents.scala b/tools/src/main/scala/io/prediction/tools/imprt/FileToEvents.scala
deleted file mode 100644
index 9a19a33..0000000
--- a/tools/src/main/scala/io/prediction/tools/imprt/FileToEvents.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.tools.imprt
-
-import io.prediction.controller.Utils
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.EventJson4sSupport
-import io.prediction.data.storage.Storage
-import io.prediction.tools.Runner
-import io.prediction.workflow.WorkflowContext
-import io.prediction.workflow.WorkflowUtils
-
-import grizzled.slf4j.Logging
-import org.json4s.native.Serialization._
-
-import scala.util.{Failure, Try}
-
-case class FileToEventsArgs(
-  env: String = "",
-  logFile: String = "",
-  appId: Int = 0,
-  channel: Option[String] = None,
-  inputPath: String = "",
-  verbose: Boolean = false,
-  debug: Boolean = false)
-
-object FileToEvents extends Logging {
-  def main(args: Array[String]): Unit = {
-    val parser = new scopt.OptionParser[FileToEventsArgs]("FileToEvents") {
-      opt[String]("env") action { (x, c) =>
-        c.copy(env = x)
-      }
-      opt[String]("log-file") action { (x, c) =>
-        c.copy(logFile = x)
-      }
-      opt[Int]("appid") action { (x, c) =>
-        c.copy(appId = x)
-      }
-      opt[String]("channel") action { (x, c) =>
-        c.copy(channel = Some(x))
-      }
-      opt[String]("input") action { (x, c) =>
-        c.copy(inputPath = x)
-      }
-      opt[Unit]("verbose") action { (x, c) =>
-        c.copy(verbose = true)
-      }
-      opt[Unit]("debug") action { (x, c) =>
-        c.copy(debug = true)
-      }
-    }
-    parser.parse(args, FileToEventsArgs()) map { args =>
-      // get channelId
-      val channels = Storage.getMetaDataChannels
-      val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap
-
-      val channelId: Option[Int] = args.channel.map { ch =>
-        if (!channelMap.contains(ch)) {
-          error(s"Channel ${ch} doesn't exist in this app.")
-          sys.exit(1)
-        }
-
-        channelMap(ch)
-      }
-
-      val channelStr = args.channel.map(n => " Channel " + n).getOrElse("")
-
-      WorkflowUtils.modifyLogging(verbose = args.verbose)
-      @transient lazy implicit val formats = Utils.json4sDefaultFormats +
-        new EventJson4sSupport.APISerializer
-      val sc = WorkflowContext(
-        mode = "Import",
-        batch = "App ID " + args.appId + channelStr,
-        executorEnv = Runner.envStringToMap(args.env))
-      val rdd = sc.textFile(args.inputPath).filter(_.trim.nonEmpty).map { json =>
-        Try(read[Event](json)).recoverWith {
-          case e: Throwable =>
-            error(s"\nmalformed json => $json")
-            Failure(e)
-        }.get
-      }
-      val events = Storage.getPEvents()
-      events.write(events = rdd,
-        appId = args.appId,
-        channelId = channelId)(sc)
-      info("Events are imported.")
-      info("Done.")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala b/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala
new file mode 100644
index 0000000..1640d55
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala
@@ -0,0 +1,84 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.tools
+
+import java.io.File
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineManifest
+import org.apache.predictionio.data.storage.EngineManifestSerializer
+import org.apache.predictionio.data.storage.Storage
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.json4s._
+import org.json4s.native.Serialization.read
+
+import scala.io.Source
+
+object RegisterEngine extends Logging {
+  val engineManifests = Storage.getMetaDataEngineManifests
+  implicit val formats = DefaultFormats + new EngineManifestSerializer
+
+  def registerEngine(
+      jsonManifest: File,
+      engineFiles: Seq[File],
+      copyLocal: Boolean = false): Unit = {
+    val jsonString = try {
+      Source.fromFile(jsonManifest).mkString
+    } catch {
+      case e: java.io.FileNotFoundException =>
+        error(s"Engine manifest file not found: ${e.getMessage}. Aborting.")
+        sys.exit(1)
+    }
+    val engineManifest = read[EngineManifest](jsonString)
+
+    info(s"Registering engine ${engineManifest.id} ${engineManifest.version}")
+    engineManifests.update(
+      engineManifest.copy(files = engineFiles.map(_.toURI.toString)), true)
+  }
+
+  def unregisterEngine(jsonManifest: File): Unit = {
+    val jsonString = try {
+      Source.fromFile(jsonManifest).mkString
+    } catch {
+      case e: java.io.FileNotFoundException =>
+        error(s"Engine manifest file not found: ${e.getMessage}. Aborting.")
+        sys.exit(1)
+    }
+    val fileEngineManifest = read[EngineManifest](jsonString)
+    val engineManifest = engineManifests.get(
+      fileEngineManifest.id,
+      fileEngineManifest.version)
+
+    engineManifest map { em =>
+      val conf = new Configuration
+      val fs = FileSystem.get(conf)
+
+      em.files foreach { f =>
+        val path = new Path(f)
+        info(s"Removing ${f}")
+        fs.delete(path, false)
+      }
+
+      engineManifests.delete(em.id, em.version)
+      info(s"Unregistered engine ${em.id} ${em.version}")
+    } getOrElse {
+      error(s"${fileEngineManifest.id} ${fileEngineManifest.version} is not " +
+        "registered.")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala
new file mode 100644
index 0000000..5dae46b
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala
@@ -0,0 +1,178 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.tools
+
+import java.io.File
+import java.net.URI
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineManifest
+import org.apache.predictionio.tools.console.ConsoleArgs
+import org.apache.predictionio.workflow.WorkflowUtils
+
+import scala.sys.process._
+
+object RunServer extends Logging {
+  def runServer(
+      ca: ConsoleArgs,
+      core: File,
+      em: EngineManifest,
+      engineInstanceId: String): Int = {
+    val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_")).map(kv =>
+      s"${kv._1}=${kv._2}"
+    ).mkString(",")
+
+    val sparkHome = ca.common.sparkHome.getOrElse(
+      sys.env.getOrElse("SPARK_HOME", "."))
+
+    val extraFiles = WorkflowUtils.thirdPartyConfFiles
+
+    val driverClassPathIndex =
+      ca.common.sparkPassThrough.indexOf("--driver-class-path")
+    val driverClassPathPrefix =
+      if (driverClassPathIndex != -1) {
+        Seq(ca.common.sparkPassThrough(driverClassPathIndex + 1))
+      } else {
+        Seq()
+      }
+    val extraClasspaths =
+      driverClassPathPrefix ++ WorkflowUtils.thirdPartyClasspaths
+
+    val deployModeIndex =
+      ca.common.sparkPassThrough.indexOf("--deploy-mode")
+    val deployMode = if (deployModeIndex != -1) {
+      ca.common.sparkPassThrough(deployModeIndex + 1)
+    } else {
+      "client"
+    }
+
+    val mainJar =
+      if (ca.build.uberJar) {
+        if (deployMode == "cluster") {
+          em.files.filter(_.startsWith("hdfs")).head
+        } else {
+          em.files.filterNot(_.startsWith("hdfs")).head
+        }
+      } else {
+        if (deployMode == "cluster") {
+          em.files.filter(_.contains("pio-assembly")).head
+        } else {
+          core.getCanonicalPath
+        }
+      }
+
+    val jarFiles = (em.files ++ Option(new File(ca.common.pioHome.get, "plugins")
+      .listFiles()).getOrElse(Array.empty[File]).map(_.getAbsolutePath)).mkString(",")
+
+    val sparkSubmit =
+      Seq(Seq(sparkHome, "bin", "spark-submit").mkString(File.separator)) ++
+      ca.common.sparkPassThrough ++
+      Seq(
+        "--class",
+        "org.apache.predictionio.workflow.CreateServer",
+        "--name",
+        s"PredictionIO Engine Instance: ${engineInstanceId}") ++
+      (if (!ca.build.uberJar) {
+        Seq("--jars", jarFiles)
+      } else Seq()) ++
+      (if (extraFiles.size > 0) {
+        Seq("--files", extraFiles.mkString(","))
+      } else {
+        Seq()
+      }) ++
+      (if (extraClasspaths.size > 0) {
+        Seq("--driver-class-path", extraClasspaths.mkString(":"))
+      } else {
+        Seq()
+      }) ++
+      (if (ca.common.sparkKryo) {
+        Seq(
+          "--conf",
+          "spark.serializer=org.apache.spark.serializer.KryoSerializer")
+      } else {
+        Seq()
+      }) ++
+      Seq(
+        mainJar,
+        "--engineInstanceId",
+        engineInstanceId,
+        "--ip",
+        ca.deploy.ip,
+        "--port",
+        ca.deploy.port.toString,
+        "--event-server-ip",
+        ca.eventServer.ip,
+        "--event-server-port",
+        ca.eventServer.port.toString) ++
+      (if (ca.accessKey.accessKey != "") {
+        Seq("--accesskey", ca.accessKey.accessKey)
+      } else {
+        Seq()
+      }) ++
+      (if (ca.eventServer.enabled) Seq("--feedback") else Seq()) ++
+      (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++
+      (if (ca.common.verbose) Seq("--verbose") else Seq()) ++
+      ca.deploy.logUrl.map(x => Seq("--log-url", x)).getOrElse(Seq()) ++
+      ca.deploy.logPrefix.map(x => Seq("--log-prefix", x)).getOrElse(Seq()) ++
+      Seq("--json-extractor", ca.common.jsonExtractor.toString)
+
+    info(s"Submission command: ${sparkSubmit.mkString(" ")}")
+
+    val proc =
+      Process(sparkSubmit, None, "CLASSPATH" -> "", "SPARK_YARN_USER_ENV" -> pioEnvVars).run()
+    Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
+      def run(): Unit = {
+        proc.destroy()
+      }
+    }))
+    proc.exitValue()
+  }
+
+  def newRunServer(
+    ca: ConsoleArgs,
+    em: EngineManifest,
+    engineInstanceId: String): Int = {
+    val jarFiles = em.files.map(new URI(_)) ++
+      Option(new File(ca.common.pioHome.get, "plugins").listFiles())
+        .getOrElse(Array.empty[File]).map(_.toURI)
+    val args = Seq(
+      "--engineInstanceId",
+      engineInstanceId,
+      "--engine-variant",
+      ca.common.variantJson.toURI.toString,
+      "--ip",
+      ca.deploy.ip,
+      "--port",
+      ca.deploy.port.toString,
+      "--event-server-ip",
+      ca.eventServer.ip,
+      "--event-server-port",
+      ca.eventServer.port.toString) ++
+      (if (ca.accessKey.accessKey != "") {
+        Seq("--accesskey", ca.accessKey.accessKey)
+      } else {
+        Nil
+      }) ++
+      (if (ca.eventServer.enabled) Seq("--feedback") else Nil) ++
+      (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Nil) ++
+      (if (ca.common.verbose) Seq("--verbose") else Nil) ++
+      ca.deploy.logUrl.map(x => Seq("--log-url", x)).getOrElse(Nil) ++
+      ca.deploy.logPrefix.map(x => Seq("--log-prefix", x)).getOrElse(Nil) ++
+      Seq("--json-extractor", ca.common.jsonExtractor.toString)
+
+    Runner.runOnSpark("org.apache.predictionio.workflow.CreateServer", args, ca, jarFiles)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
new file mode 100644
index 0000000..4b42f40
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
@@ -0,0 +1,212 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.tools
+
+import java.io.File
+import java.net.URI
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineManifest
+import org.apache.predictionio.tools.console.ConsoleArgs
+import org.apache.predictionio.workflow.WorkflowUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+
+import scala.sys.process._
+
+object RunWorkflow extends Logging {
+  def runWorkflow(
+      ca: ConsoleArgs,
+      core: File,
+      em: EngineManifest,
+      variantJson: File): Int = {
+    // Collect and serialize PIO_* environmental variables
+    val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_")).map(kv =>
+      s"${kv._1}=${kv._2}"
+    ).mkString(",")
+
+    val sparkHome = ca.common.sparkHome.getOrElse(
+      sys.env.getOrElse("SPARK_HOME", "."))
+
+    val hadoopConf = new Configuration
+    val hdfs = FileSystem.get(hadoopConf)
+
+    val driverClassPathIndex =
+      ca.common.sparkPassThrough.indexOf("--driver-class-path")
+    val driverClassPathPrefix =
+      if (driverClassPathIndex != -1) {
+        Seq(ca.common.sparkPassThrough(driverClassPathIndex + 1))
+      } else {
+        Seq()
+      }
+    val extraClasspaths =
+      driverClassPathPrefix ++ WorkflowUtils.thirdPartyClasspaths
+
+    val deployModeIndex =
+      ca.common.sparkPassThrough.indexOf("--deploy-mode")
+    val deployMode = if (deployModeIndex != -1) {
+      ca.common.sparkPassThrough(deployModeIndex + 1)
+    } else {
+      "client"
+    }
+
+    val extraFiles = WorkflowUtils.thirdPartyConfFiles
+
+    val mainJar =
+      if (ca.build.uberJar) {
+        if (deployMode == "cluster") {
+          em.files.filter(_.startsWith("hdfs")).head
+        } else {
+          em.files.filterNot(_.startsWith("hdfs")).head
+        }
+      } else {
+        if (deployMode == "cluster") {
+          em.files.filter(_.contains("pio-assembly")).head
+        } else {
+          core.getCanonicalPath
+        }
+      }
+
+    val workMode =
+      ca.common.evaluation.map(_ => "Evaluation").getOrElse("Training")
+
+    val engineLocation = Seq(
+      sys.env("PIO_FS_ENGINESDIR"),
+      em.id,
+      em.version)
+
+    if (deployMode == "cluster") {
+      val dstPath = new Path(engineLocation.mkString(Path.SEPARATOR))
+      info("Cluster deploy mode detected. Trying to copy " +
+        s"${variantJson.getCanonicalPath} to " +
+        s"${hdfs.makeQualified(dstPath).toString}.")
+      hdfs.copyFromLocalFile(new Path(variantJson.toURI), dstPath)
+    }
+
+    val sparkSubmit =
+      Seq(Seq(sparkHome, "bin", "spark-submit").mkString(File.separator)) ++
+      ca.common.sparkPassThrough ++
+      Seq(
+        "--class",
+        "org.apache.predictionio.workflow.CreateWorkflow",
+        "--name",
+        s"PredictionIO $workMode: ${em.id} ${em.version} (${ca.common.batch})") ++
+      (if (!ca.build.uberJar) {
+        Seq("--jars", em.files.mkString(","))
+      } else Seq()) ++
+      (if (extraFiles.size > 0) {
+        Seq("--files", extraFiles.mkString(","))
+      } else {
+        Seq()
+      }) ++
+      (if (extraClasspaths.size > 0) {
+        Seq("--driver-class-path", extraClasspaths.mkString(":"))
+      } else {
+        Seq()
+      }) ++
+      (if (ca.common.sparkKryo) {
+        Seq(
+          "--conf",
+          "spark.serializer=org.apache.spark.serializer.KryoSerializer")
+      } else {
+        Seq()
+      }) ++
+      Seq(
+        mainJar,
+        "--env",
+        pioEnvVars,
+        "--engine-id",
+        em.id,
+        "--engine-version",
+        em.version,
+        "--engine-variant",
+        if (deployMode == "cluster") {
+          hdfs.makeQualified(new Path(
+            (engineLocation :+ variantJson.getName).mkString(Path.SEPARATOR))).
+            toString
+        } else {
+          variantJson.getCanonicalPath
+        },
+        "--verbosity",
+        ca.common.verbosity.toString) ++
+      ca.common.engineFactory.map(
+        x => Seq("--engine-factory", x)).getOrElse(Seq()) ++
+      ca.common.engineParamsKey.map(
+        x => Seq("--engine-params-key", x)).getOrElse(Seq()) ++
+      (if (deployMode == "cluster") Seq("--deploy-mode", "cluster") else Seq()) ++
+      (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++
+      (if (ca.common.verbose) Seq("--verbose") else Seq()) ++
+      (if (ca.common.skipSanityCheck) Seq("--skip-sanity-check") else Seq()) ++
+      (if (ca.common.stopAfterRead) Seq("--stop-after-read") else Seq()) ++
+      (if (ca.common.stopAfterPrepare) {
+        Seq("--stop-after-prepare")
+      } else {
+        Seq()
+      }) ++
+      ca.common.evaluation.map(x => Seq("--evaluation-class", x)).
+        getOrElse(Seq()) ++
+      // If engineParamsGenerator is specified, it overrides the evaluation.
+      ca.common.engineParamsGenerator.orElse(ca.common.evaluation)
+        .map(x => Seq("--engine-params-generator-class", x))
+        .getOrElse(Seq()) ++
+      (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++
+      Seq("--json-extractor", ca.common.jsonExtractor.toString)
+
+    info(s"Submission command: ${sparkSubmit.mkString(" ")}")
+    Process(sparkSubmit, None, "CLASSPATH" -> "", "SPARK_YARN_USER_ENV" -> pioEnvVars).!
+  }
+
+  def newRunWorkflow(ca: ConsoleArgs, em: EngineManifest): Int = {
+    val jarFiles = em.files.map(new URI(_))
+    val args = Seq(
+      "--engine-id",
+      em.id,
+      "--engine-version",
+      em.version,
+      "--engine-variant",
+      ca.common.variantJson.toURI.toString,
+      "--verbosity",
+      ca.common.verbosity.toString) ++
+      ca.common.engineFactory.map(
+        x => Seq("--engine-factory", x)).getOrElse(Seq()) ++
+      ca.common.engineParamsKey.map(
+        x => Seq("--engine-params-key", x)).getOrElse(Seq()) ++
+      (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++
+      (if (ca.common.verbose) Seq("--verbose") else Seq()) ++
+      (if (ca.common.skipSanityCheck) Seq("--skip-sanity-check") else Seq()) ++
+      (if (ca.common.stopAfterRead) Seq("--stop-after-read") else Seq()) ++
+      (if (ca.common.stopAfterPrepare) {
+        Seq("--stop-after-prepare")
+      } else {
+        Seq()
+      }) ++
+      ca.common.evaluation.map(x => Seq("--evaluation-class", x)).
+        getOrElse(Seq()) ++
+      // If engineParamsGenerator is specified, it overrides the evaluation.
+      ca.common.engineParamsGenerator.orElse(ca.common.evaluation)
+        .map(x => Seq("--engine-params-generator-class", x))
+        .getOrElse(Seq()) ++
+      (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++
+      Seq("--json-extractor", ca.common.jsonExtractor.toString)
+
+    Runner.runOnSpark(
+      "org.apache.predictionio.workflow.CreateWorkflow",
+      args,
+      ca,
+      jarFiles)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala b/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala
new file mode 100644
index 0000000..3a8fed5
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala
@@ -0,0 +1,211 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.tools
+
+import java.io.File
+import java.net.URI
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.tools.console.ConsoleArgs
+import org.apache.predictionio.workflow.WorkflowUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+
+import scala.sys.process._
+
+object Runner extends Logging {
+  def envStringToMap(env: String): Map[String, String] =
+    env.split(',').flatMap(p =>
+      p.split('=') match {
+        case Array(k, v) => List(k -> v)
+        case _ => Nil
+      }
+    ).toMap
+
+  def argumentValue(arguments: Seq[String], argumentName: String): Option[String] = {
+    val argumentIndex = arguments.indexOf(argumentName)
+    try {
+      arguments(argumentIndex) // just to make it error out if index is -1
+      Some(arguments(argumentIndex + 1))
+    } catch {
+      case e: IndexOutOfBoundsException => None
+    }
+  }
+
+  def handleScratchFile(
+      fileSystem: Option[FileSystem],
+      uri: Option[URI],
+      localFile: File): String = {
+    val localFilePath = localFile.getCanonicalPath
+    (fileSystem, uri) match {
+      case (Some(fs), Some(u)) =>
+        val dest = fs.makeQualified(Path.mergePaths(
+          new Path(u),
+          new Path(localFilePath)))
+        info(s"Copying $localFile to ${dest.toString}")
+        fs.copyFromLocalFile(new Path(localFilePath), dest)
+        dest.toUri.toString
+      case _ => localFile.toURI.toString
+    }
+  }
+
+  def cleanup(fs: Option[FileSystem], uri: Option[URI]): Unit = {
+    (fs, uri) match {
+      case (Some(f), Some(u)) =>
+        f.close()
+      case _ => Unit
+    }
+  }
+
+  def detectFilePaths(
+      fileSystem: Option[FileSystem],
+      uri: Option[URI],
+      args: Seq[String]): Seq[String] = {
+    args map { arg =>
+      val f = try {
+        new File(new URI(arg))
+      } catch {
+        case e: Throwable => new File(arg)
+      }
+      if (f.exists()) {
+        handleScratchFile(fileSystem, uri, f)
+      } else {
+        arg
+      }
+    }
+  }
+
+  def runOnSpark(
+      className: String,
+      classArgs: Seq[String],
+      ca: ConsoleArgs,
+      extraJars: Seq[URI]): Int = {
+    // Return error for unsupported cases
+    val deployMode =
+      argumentValue(ca.common.sparkPassThrough, "--deploy-mode").getOrElse("client")
+    val master =
+      argumentValue(ca.common.sparkPassThrough, "--master").getOrElse("local")
+
+    (ca.common.scratchUri, deployMode, master) match {
+      case (Some(u), "client", m) if m != "yarn-cluster" =>
+        error("--scratch-uri cannot be set when deploy mode is client")
+        return 1
+      case (_, "cluster", m) if m.startsWith("spark://") =>
+        error("Using cluster deploy mode with Spark standalone cluster is not supported")
+        return 1
+      case _ => Unit
+    }
+
+    // Initialize HDFS API for scratch URI
+    val fs = ca.common.scratchUri map { uri =>
+      FileSystem.get(uri, new Configuration())
+    }
+
+    // Collect and serialize PIO_* environmental variables
+    val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_")).map(kv =>
+      s"${kv._1}=${kv._2}"
+    ).mkString(",")
+
+    // Location of Spark
+    val sparkHome = ca.common.sparkHome.getOrElse(
+      sys.env.getOrElse("SPARK_HOME", "."))
+
+    // Local path to PredictionIO assembly JAR
+    val mainJar = handleScratchFile(
+      fs,
+      ca.common.scratchUri,
+      console.Console.coreAssembly(ca.common.pioHome.get))
+
+    // Extra JARs that are needed by the driver
+    val driverClassPathPrefix =
+      argumentValue(ca.common.sparkPassThrough, "--driver-class-path") map { v =>
+        Seq(v)
+      } getOrElse {
+        Nil
+      }
+
+    val extraClasspaths =
+      driverClassPathPrefix ++ WorkflowUtils.thirdPartyClasspaths
+
+    // Extra files that are needed to be passed to --files
+    val extraFiles = WorkflowUtils.thirdPartyConfFiles map { f =>
+      handleScratchFile(fs, ca.common.scratchUri, new File(f))
+    }
+
+    val deployedJars = extraJars map { j =>
+      handleScratchFile(fs, ca.common.scratchUri, new File(j))
+    }
+
+    val sparkSubmitCommand =
+      Seq(Seq(sparkHome, "bin", "spark-submit").mkString(File.separator))
+
+    val sparkSubmitJars = if (extraJars.nonEmpty) {
+      Seq("--jars", deployedJars.map(_.toString).mkString(","))
+    } else {
+      Nil
+    }
+
+    val sparkSubmitFiles = if (extraFiles.nonEmpty) {
+      Seq("--files", extraFiles.mkString(","))
+    } else {
+      Nil
+    }
+
+    val sparkSubmitExtraClasspaths = if (extraClasspaths.nonEmpty) {
+      Seq("--driver-class-path", extraClasspaths.mkString(":"))
+    } else {
+      Nil
+    }
+
+    val sparkSubmitKryo = if (ca.common.sparkKryo) {
+      Seq(
+        "--conf",
+        "spark.serializer=org.apache.spark.serializer.KryoSerializer")
+    } else {
+      Nil
+    }
+
+    val verbose = if (ca.common.verbose) Seq("--verbose") else Nil
+
+    val sparkSubmit = Seq(
+      sparkSubmitCommand,
+      ca.common.sparkPassThrough,
+      Seq("--class", className),
+      sparkSubmitJars,
+      sparkSubmitFiles,
+      sparkSubmitExtraClasspaths,
+      sparkSubmitKryo,
+      Seq(mainJar),
+      detectFilePaths(fs, ca.common.scratchUri, classArgs),
+      Seq("--env", pioEnvVars),
+      verbose).flatten.filter(_ != "")
+    info(s"Submission command: ${sparkSubmit.mkString(" ")}")
+    val proc = Process(
+      sparkSubmit,
+      None,
+      "CLASSPATH" -> "",
+      "SPARK_YARN_USER_ENV" -> pioEnvVars).run()
+    Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
+      def run(): Unit = {
+        cleanup(fs, ca.common.scratchUri)
+        proc.destroy()
+      }
+    }))
+    cleanup(fs, ca.common.scratchUri)
+    proc.exitValue()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala b/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala
new file mode 100644
index 0000000..b70cb7e
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala
@@ -0,0 +1,156 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.tools.admin
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.event.Logging
+import akka.io.IO
+import akka.util.Timeout
+import org.apache.predictionio.data.api.StartServer
+import org.apache.predictionio.data.storage.Storage
+import org.json4s.{Formats, DefaultFormats}
+
+import java.util.concurrent.TimeUnit
+
+import spray.can.Http
+import spray.http.{MediaTypes, StatusCodes}
+import spray.httpx.Json4sSupport
+import spray.routing._
+
+import scala.concurrent.ExecutionContext
+
+class AdminServiceActor(val commandClient: CommandClient)
+  extends HttpServiceActor {
+
+  object Json4sProtocol extends Json4sSupport {
+    implicit def json4sFormats: Formats = DefaultFormats
+  }
+
+  import Json4sProtocol._
+
+  val log = Logging(context.system, this)
+
+  // we use the enclosing ActorContext's or ActorSystem's dispatcher for our
+  // Futures
+  implicit def executionContext: ExecutionContext = actorRefFactory.dispatcher
+  implicit val timeout: Timeout = Timeout(5, TimeUnit.SECONDS)
+
+  // for better message response
+  val rejectionHandler = RejectionHandler {
+    case MalformedRequestContentRejection(msg, _) :: _ =>
+      complete(StatusCodes.BadRequest, Map("message" -> msg))
+    case MissingQueryParamRejection(msg) :: _ =>
+      complete(StatusCodes.NotFound,
+        Map("message" -> s"missing required query parameter ${msg}."))
+    case AuthenticationFailedRejection(cause, challengeHeaders) :: _ =>
+      complete(StatusCodes.Unauthorized, challengeHeaders,
+        Map("message" -> s"Invalid accessKey."))
+  }
+
+  val jsonPath = """(.+)\.json$""".r
+
+  val route: Route =
+    pathSingleSlash {
+      get {
+        respondWithMediaType(MediaTypes.`application/json`) {
+          complete(Map("status" -> "alive"))
+        }
+      }
+    } ~
+      path("cmd" / "app" / Segment / "data") {
+        appName => {
+          delete {
+            respondWithMediaType(MediaTypes.`application/json`) {
+              complete(commandClient.futureAppDataDelete(appName))
+            }
+          }
+        }
+      } ~
+      path("cmd" / "app" / Segment) {
+        appName => {
+          delete {
+            respondWithMediaType(MediaTypes.`application/json`) {
+              complete(commandClient.futureAppDelete(appName))
+            }
+          }
+        }
+      } ~
+      path("cmd" / "app") {
+        get {
+          respondWithMediaType(MediaTypes.`application/json`) {
+            complete(commandClient.futureAppList())
+          }
+        } ~
+          post {
+            entity(as[AppRequest]) {
+              appArgs => respondWithMediaType(MediaTypes.`application/json`) {
+                complete(commandClient.futureAppNew(appArgs))
+              }
+            }
+          }
+      }
+  def receive: Actor.Receive = runRoute(route)
+}
+
+class AdminServerActor(val commandClient: CommandClient) extends Actor {
+  val log = Logging(context.system, this)
+  val child = context.actorOf(
+    Props(classOf[AdminServiceActor], commandClient),
+    "AdminServiceActor")
+
+  implicit val system = context.system
+
+  def receive: PartialFunction[Any, Unit] = {
+    case StartServer(host, portNum) => {
+      IO(Http) ! Http.Bind(child, interface = host, port = portNum)
+
+    }
+    case m: Http.Bound => log.info("Bound received. AdminServer is ready.")
+    case m: Http.CommandFailed => log.error("Command failed.")
+    case _ => log.error("Unknown message.")
+  }
+}
+
+case class AdminServerConfig(
+  ip: String = "localhost",
+  port: Int = 7071
+)
+
+object AdminServer {
+  def createAdminServer(config: AdminServerConfig): Unit = {
+    implicit val system = ActorSystem("AdminServerSystem")
+
+    val commandClient = new CommandClient(
+      appClient = Storage.getMetaDataApps,
+      accessKeyClient = Storage.getMetaDataAccessKeys,
+      eventClient = Storage.getLEvents()
+    )
+
+    val serverActor = system.actorOf(
+      Props(classOf[AdminServerActor], commandClient),
+      "AdminServerActor")
+    serverActor ! StartServer(config.ip, config.port)
+    system.awaitTermination
+  }
+}
+
+object AdminRun {
+  def main (args: Array[String]) {
+    AdminServer.createAdminServer(AdminServerConfig(
+      ip = "localhost",
+      port = 7071))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala b/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala
new file mode 100644
index 0000000..143023e
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala
@@ -0,0 +1,160 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.tools.admin
+
+import org.apache.predictionio.data.storage._
+
+import scala.concurrent.{ExecutionContext, Future}
+
+abstract class BaseResponse()
+
+case class GeneralResponse(
+  status: Int = 0,
+  message: String = ""
+) extends BaseResponse()
+
+case class AppRequest(
+  id: Int = 0,
+  name: String = "",
+  description: String = ""
+)
+
+case class TrainRequest(
+  enginePath: String = ""
+)
+case class AppResponse(
+  id: Int = 0,
+  name: String = "",
+  keys: Seq[AccessKey]
+) extends BaseResponse()
+
+case class AppNewResponse(
+  status: Int = 0,
+  message: String = "",
+  id: Int = 0,
+  name: String = "",
+  key: String
+) extends BaseResponse()
+
+case class AppListResponse(
+  status: Int = 0,
+  message: String = "",
+  apps: Seq[AppResponse]
+) extends BaseResponse()
+
+class CommandClient(
+  val appClient: Apps,
+  val accessKeyClient: AccessKeys,
+  val eventClient: LEvents
+) {
+
+  def futureAppNew(req: AppRequest)(implicit ec: ExecutionContext): Future[BaseResponse] = Future {
+    val response = appClient.getByName(req.name) map { app =>
+      GeneralResponse(0, s"App ${req.name} already exists. Aborting.")
+    } getOrElse {
+      appClient.get(req.id) map {
+        app2 =>
+          GeneralResponse(0,
+              s"App ID ${app2.id} already exists and maps to the app '${app2.name}'. " +
+              "Aborting.")
+      } getOrElse {
+        val appid = appClient.insert(App(
+          id = Option(req.id).getOrElse(0),
+          name = req.name,
+          description = Option(req.description)))
+        appid map { id =>
+          val dbInit = eventClient.init(id)
+          val r = if (dbInit) {
+            val accessKey = AccessKey(
+              key = "",
+              appid = id,
+              events = Seq())
+            val accessKey2 = accessKeyClient.insert(AccessKey(
+              key = "",
+              appid = id,
+              events = Seq()))
+            accessKey2 map { k =>
+              new AppNewResponse(1,"App created successfully.",id, req.name, k)
+            } getOrElse {
+              GeneralResponse(0, s"Unable to create new access key.")
+            }
+          } else {
+            GeneralResponse(0, s"Unable to initialize Event Store for this app ID: ${id}.")
+          }
+          r
+        } getOrElse {
+          GeneralResponse(0, s"Unable to create new app.")
+        }
+      }
+    }
+    response
+  }
+
+  def futureAppList()(implicit ec: ExecutionContext): Future[AppListResponse] = Future {
+    val apps = appClient.getAll().sortBy(_.name)
+    val appsRes = apps.map {
+      app => {
+        new AppResponse(app.id, app.name, accessKeyClient.getByAppid(app.id))
+      }
+    }
+    new AppListResponse(1, "Successful retrieved app list.", appsRes)
+  }
+
+  def futureAppDataDelete(appName: String)
+      (implicit ec: ExecutionContext): Future[GeneralResponse] = Future {
+    val response = appClient.getByName(appName) map { app =>
+      val data = if (eventClient.remove(app.id)) {
+        GeneralResponse(1, s"Removed Event Store for this app ID: ${app.id}")
+      } else {
+        GeneralResponse(0, s"Error removing Event Store for this app.")
+      }
+
+      val dbInit = eventClient.init(app.id)
+      val data2 = if (dbInit) {
+        GeneralResponse(1, s"Initialized Event Store for this app ID: ${app.id}.")
+      } else {
+        GeneralResponse(0, s"Unable to initialize Event Store for this appId:" +
+          s" ${app.id}.")
+      }
+      GeneralResponse(data.status * data2.status, data.message + data2.message)
+    } getOrElse {
+      GeneralResponse(0, s"App ${appName} does not exist.")
+    }
+    response
+  }
+
+  def futureAppDelete(appName: String)
+      (implicit ec: ExecutionContext): Future[GeneralResponse] = Future {
+
+    val response = appClient.getByName(appName) map { app =>
+      val data = if (eventClient.remove(app.id)) {
+        Storage.getMetaDataApps.delete(app.id)
+        GeneralResponse(1, s"App successfully deleted")
+      } else {
+        GeneralResponse(0, s"Error removing Event Store for app ${app.name}.");
+      }
+      data
+    } getOrElse {
+      GeneralResponse(0, s"App ${appName} does not exist.")
+    }
+    response
+  }
+
+  def futureTrain(req: TrainRequest)
+      (implicit ec: ExecutionContext): Future[GeneralResponse] = Future {
+    null
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md b/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md
new file mode 100644
index 0000000..475a3de
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md
@@ -0,0 +1,161 @@
+## Admin API (under development)
+
+### Start Admin HTTP Server without bin/pio (for development)
+
+NOTE: elasticsearch and hbase should be running first.
+
+```
+$ sbt/sbt "tools/compile"
+$ set -a
+$ source conf/pio-env.sh
+$ set +a
+$ sbt/sbt "tools/run-main io.prediction.tools.admin.AdminRun"
+```
+
+### Unit test (Very minimal)
+
+```
+$ set -a
+$ source conf/pio-env.sh
+$ set +a
+$ sbt/sbt "tools/test-only io.prediction.tools.admin.AdminAPISpec"
+```
+
+### Start with pio command adminserver
+
+```
+$ pio adminserver
+```
+
+Admin Server url defaults to `http://localhost:7071`
+
+The host and port can be specified by using the 'ip' and 'port' parameters
+
+```
+$ pio adminserver --ip 127.0.0.1 --port 7080
+```
+
+### Current Supported Commands
+
+#### Check status
+
+```
+$ curl -i http://localhost:7071/
+
+{"status":"alive"}
+```
+
+#### Get list of apps
+
+```
+$ curl -i -X GET http://localhost:7071/cmd/app
+
+{"status":1,"message":"Successful retrieved app list.","apps":[{"id":12,"name":"scratch","keys":[{"key":"gtPgVMIr3uthus1QJWFBcIjNf6d1SNuhaOWQAgdLbOBP1eRWMNIJWl6SkHgI1OoN","appid":12,"events":[]}]},{"id":17,"name":"test-ecommercerec","keys":[{"key":"zPkr6sBwQoBwBjVHK2hsF9u26L38ARSe19QzkdYentuomCtYSuH0vXP5fq7advo4","appid":17,"events":[]}]}]}
+```
+
+#### Create a new app
+
+```
+$ curl -i -X POST http://localhost:7071/cmd/app \
+-H "Content-Type: application/json" \
+-d '{ "name" : "my_new_app" }'
+
+{"status":1,"message":"App created successfully.","id":19,"name":"my_new_app","keys":[{"key":"","appid":19,"events":[]}]}
+```
+
+#### Delete data of app
+
+```
+$ curl -i -X DELETE http://localhost:7071/cmd/app/my_new_app/data
+```
+
+#### Delete app
+
+```
+$ curl -i -X DELETE http://localhost:7071/cmd/app/my_new_app
+
+{"status":1,"message":"App successfully deleted"}
+```
+
+
+## API Doc (To be updated)
+
+### app list:
+GET http://localhost:7071/cmd/app
+
+OK Response:
+{
+  “status”: <STATUS>,
+  “message”: <MESSAGE>,
+  “apps” : [
+    { “name': “<APP_NAME>”,
+      “id': <APP_ID>,
+      “accessKey' : “<ACCESS_KEY>” },
+    { “name': “<APP_NAME>”,
+      “id': <APP_ID>,
+      “accessKey' : “<ACCESS_KEY>” }, ... ]
+}
+
+Error Response:
+{“status”: <STATUS>, “message” : “<MESSAGE>”}
+
+### app new
+POST http://localhost:7071/cmd/app
+Request Body:
+{ name”: “<APP_NAME>”, // required
+  “id”: <APP_ID>, // optional
+  “description”: “<DESCRIPTION>” } // optional
+
+OK Response:
+{ “status”: <STATUS>,
+  “message”: <MESSAGE>,
+  “app” : {
+    “name”: “<APP_NAME>”,
+    “id”: <APP_ID>,
+    “accessKey” : “<ACCESS_KEY>” }
+}
+
+Error Response:
+{ “status”: <STATUS>, “message” : “<MESSAGE>”}
+
+### app delete
+DELETE http://localhost:7071/cmd/app/{appName}
+
+OK Response:
+{ "status": <STATUS>, "message" : “<MESSAGE>”}
+
+Error Response:
+{ “status”: <STATUS>, “message” : “<MESSAGE>”}
+
+### app data-delete
+DELETE http://localhost:7071/cmd/app/{appName}/data
+
+OK Response:
+{ "status": <STATUS>, "message" : “<MESSAGE>”}
+
+Error Response:
+{ “status”: <STATUS>, “message” : “<MESSAGE>” }
+
+
+### train TBD
+
+#### Training request:
+POST http://localhost:7071/cmd/train
+Request body: TBD
+
+OK Response: TBD
+
+Error Response: TBD
+
+#### Get training status:
+GET http://localhost:7071/cmd/train/{engineInstanceId}
+
+OK Response: TBD
+INIT
+TRAINING
+DONE
+ERROR
+
+Error Response: TBD
+
+### deploy TBD

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala
new file mode 100644
index 0000000..a6ab83c
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala
@@ -0,0 +1,83 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.tools.console
+
+import org.apache.predictionio.data.storage
+
+import grizzled.slf4j.Logging
+
+case class AccessKeyArgs(
+  accessKey: String = "",
+  events: Seq[String] = Seq())
+
+object AccessKey extends Logging {
+  def create(ca: ConsoleArgs): Int = {
+    val apps = storage.Storage.getMetaDataApps
+    apps.getByName(ca.app.name) map { app =>
+      val accessKeys = storage.Storage.getMetaDataAccessKeys
+      val accessKey = accessKeys.insert(storage.AccessKey(
+        key = ca.accessKey.accessKey,
+        appid = app.id,
+        events = ca.accessKey.events))
+      accessKey map { k =>
+        info(s"Created new access key: ${k}")
+        0
+      } getOrElse {
+        error(s"Unable to create new access key.")
+        1
+      }
+    } getOrElse {
+      error(s"App ${ca.app.name} does not exist. Aborting.")
+      1
+    }
+  }
+
+  def list(ca: ConsoleArgs): Int = {
+    val keys =
+      if (ca.app.name == "") {
+        storage.Storage.getMetaDataAccessKeys.getAll
+      } else {
+        val apps = storage.Storage.getMetaDataApps
+        apps.getByName(ca.app.name) map { app =>
+          storage.Storage.getMetaDataAccessKeys.getByAppid(app.id)
+        } getOrElse {
+          error(s"App ${ca.app.name} does not exist. Aborting.")
+          return 1
+        }
+      }
+    val title = "Access Key(s)"
+    info(f"$title%64s | App ID | Allowed Event(s)")
+    keys.sortBy(k => k.appid) foreach { k =>
+      val events =
+        if (k.events.size > 0) k.events.sorted.mkString(",") else "(all)"
+      info(f"${k.key}%64s | ${k.appid}%6d | $events%s")
+    }
+    info(s"Finished listing ${keys.size} access key(s).")
+    0
+  }
+
+  def delete(ca: ConsoleArgs): Int = {
+    try {
+      storage.Storage.getMetaDataAccessKeys.delete(ca.accessKey.accessKey)
+      info(s"Deleted access key ${ca.accessKey.accessKey}.")
+      0
+    } catch {
+      case e: Exception =>
+        error(s"Error deleting access key ${ca.accessKey.accessKey}.", e)
+        1
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/console/App.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/App.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/App.scala
new file mode 100644
index 0000000..cc2f36d
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/console/App.scala
@@ -0,0 +1,537 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.tools.console
+
+import org.apache.predictionio.data.storage
+
+import grizzled.slf4j.Logging
+
+case class AppArgs(
+  id: Option[Int] = None,
+  name: String = "",
+  channel: String = "",
+  dataDeleteChannel: Option[String] = None,
+  all: Boolean = false,
+  force: Boolean = false,
+  description: Option[String] = None)
+
+object App extends Logging {
+  def create(ca: ConsoleArgs): Int = {
+    val apps = storage.Storage.getMetaDataApps()
+    // get the client in the beginning so error exit right away if can't access client
+    val events = storage.Storage.getLEvents()
+    apps.getByName(ca.app.name) map { app =>
+      error(s"App ${ca.app.name} already exists. Aborting.")
+      1
+    } getOrElse {
+      ca.app.id.map { id =>
+        apps.get(id) map { app =>
+          error(
+            s"App ID ${id} already exists and maps to the app '${app.name}'. " +
+            "Aborting.")
+          return 1
+        }
+      }
+      val appid = apps.insert(storage.App(
+        id = ca.app.id.getOrElse(0),
+        name = ca.app.name,
+        description = ca.app.description))
+      appid map { id =>
+        val dbInit = events.init(id)
+        val r = if (dbInit) {
+          info(s"Initialized Event Store for this app ID: ${id}.")
+          val accessKeys = storage.Storage.getMetaDataAccessKeys
+          val accessKey = accessKeys.insert(storage.AccessKey(
+            key = ca.accessKey.accessKey,
+            appid = id,
+            events = Seq()))
+          accessKey map { k =>
+            info("Created new app:")
+            info(s"      Name: ${ca.app.name}")
+            info(s"        ID: ${id}")
+            info(s"Access Key: ${k}")
+            0
+          } getOrElse {
+            error(s"Unable to create new access key.")
+            1
+          }
+        } else {
+          error(s"Unable to initialize Event Store for this app ID: ${id}.")
+          // revert back the meta data change
+          try {
+            apps.delete(id)
+            0
+          } catch {
+            case e: Exception =>
+              error(s"Failed to revert back the App meta-data change.", e)
+              error(s"The app ${ca.app.name} CANNOT be used!")
+              error(s"Please run 'pio app delete ${ca.app.name}' " +
+                "to delete this app!")
+              1
+          }
+        }
+        events.close()
+        r
+      } getOrElse {
+        error(s"Unable to create new app.")
+        1
+      }
+    }
+  }
+
+  def list(ca: ConsoleArgs): Int = {
+    val apps = storage.Storage.getMetaDataApps.getAll().sortBy(_.name)
+    val accessKeys = storage.Storage.getMetaDataAccessKeys
+    val title = "Name"
+    val ak = "Access Key"
+    info(f"$title%20s |   ID | $ak%64s | Allowed Event(s)")
+    apps foreach { app =>
+      val keys = accessKeys.getByAppid(app.id)
+      keys foreach { k =>
+        val events =
+          if (k.events.size > 0) k.events.sorted.mkString(",") else "(all)"
+        info(f"${app.name}%20s | ${app.id}%4d | ${k.key}%64s | $events%s")
+      }
+    }
+    info(s"Finished listing ${apps.size} app(s).")
+    0
+  }
+
+  def show(ca: ConsoleArgs): Int = {
+    val apps = storage.Storage.getMetaDataApps
+    val accessKeys = storage.Storage.getMetaDataAccessKeys
+    val channels = storage.Storage.getMetaDataChannels
+    apps.getByName(ca.app.name) map { app =>
+      info(s"    App Name: ${app.name}")
+      info(s"      App ID: ${app.id}")
+      info(s" Description: ${app.description.getOrElse("")}")
+      val keys = accessKeys.getByAppid(app.id)
+
+      var firstKey = true
+      keys foreach { k =>
+        val events =
+          if (k.events.size > 0) k.events.sorted.mkString(",") else "(all)"
+        if (firstKey) {
+          info(f"  Access Key: ${k.key}%s | ${events}%s")
+          firstKey = false
+        } else {
+          info(f"              ${k.key}%s | ${events}%s")
+        }
+      }
+
+      val chans = channels.getByAppid(app.id)
+      var firstChan = true
+      val titleName = "Channel Name"
+      val titleID = "Channel ID"
+      chans.foreach { ch =>
+        if (firstChan) {
+          info(f"    Channels: ${titleName}%16s | ${titleID}%10s ")
+          firstChan = false
+        }
+        info(f"              ${ch.name}%16s | ${ch.id}%10s")
+      }
+      0
+    } getOrElse {
+      error(s"App ${ca.app.name} does not exist. Aborting.")
+      1
+    }
+  }
+
+  def delete(ca: ConsoleArgs): Int = {
+    val apps = storage.Storage.getMetaDataApps
+    val accesskeys = storage.Storage.getMetaDataAccessKeys
+    val channels = storage.Storage.getMetaDataChannels
+    val events = storage.Storage.getLEvents()
+    val status = apps.getByName(ca.app.name) map { app =>
+      info(s"The following app (including all channels) will be deleted. Are you sure?")
+      info(s"    App Name: ${app.name}")
+      info(s"      App ID: ${app.id}")
+      info(s" Description: ${app.description.getOrElse("")}")
+      val chans = channels.getByAppid(app.id)
+      var firstChan = true
+      val titleName = "Channel Name"
+      val titleID = "Channel ID"
+      chans.foreach { ch =>
+        if (firstChan) {
+          info(f"    Channels: ${titleName}%16s | ${titleID}%10s ")
+          firstChan = false
+        }
+        info(f"              ${ch.name}%16s | ${ch.id}%10s")
+      }
+
+      val choice = if(ca.app.force) "YES" else readLine("Enter 'YES' to proceed: ")
+      choice match {
+        case "YES" => {
+          // delete channels
+          val delChannelStatus: Seq[Int] = chans.map { ch =>
+            if (events.remove(app.id, Some(ch.id))) {
+              info(s"Removed Event Store of the channel ID: ${ch.id}")
+              try {
+                channels.delete(ch.id)
+                info(s"Deleted channel ${ch.name}")
+                0
+              } catch {
+                case e: Exception =>
+                  error(s"Error deleting channel ${ch.name}.", e)
+                  1
+              }
+            } else {
+              error(s"Error removing Event Store of the channel ID: ${ch.id}.")
+              return 1
+            }
+          }
+
+          if (delChannelStatus.exists(_ != 0)) {
+            error("Error occurred while deleting channels. Aborting.")
+            return 1
+          }
+
+          try {
+            events.remove(app.id)
+            info(s"Removed Event Store for this app ID: ${app.id}")
+          } catch {
+            case e: Exception =>
+              error(s"Error removing Event Store for this app. Aborting.", e)
+              return 1
+          }
+
+          accesskeys.getByAppid(app.id) foreach { key =>
+            try {
+              accesskeys.delete(key.key)
+              info(s"Removed access key ${key.key}")
+            } catch {
+              case e: Exception =>
+                error(s"Error removing access key ${key.key}. Aborting.", e)
+                return 1
+            }
+          }
+
+          try {
+            apps.delete(app.id)
+            info(s"Deleted app ${app.name}.")
+          } catch {
+            case e: Exception =>
+              error(s"Error deleting app ${app.name}. Aborting.", e)
+              return 1
+          }
+
+          info("Done.")
+          0
+        }
+        case _ =>
+          info("Aborted.")
+          0
+      }
+    } getOrElse {
+      error(s"App ${ca.app.name} does not exist. Aborting.")
+      1
+    }
+    events.close()
+    status
+  }
+
+  def dataDelete(ca: ConsoleArgs): Int = {
+    if (ca.app.all) {
+      dataDeleteAll(ca)
+    } else {
+      dataDeleteOne(ca)
+    }
+  }
+
+  def dataDeleteOne(ca: ConsoleArgs): Int = {
+    val apps = storage.Storage.getMetaDataApps
+    val channels = storage.Storage.getMetaDataChannels
+    apps.getByName(ca.app.name) map { app =>
+
+      val channelId = ca.app.dataDeleteChannel.map { ch =>
+        val channelMap = channels.getByAppid(app.id).map(c => (c.name, c.id)).toMap
+        if (!channelMap.contains(ch)) {
+          error(s"Unable to delete data for channel.")
+          error(s"Channel ${ch} doesn't exist.")
+          return 1
+        }
+
+        channelMap(ch)
+      }
+
+      if (channelId.isDefined) {
+        info(s"Data of the following channel will be deleted. Are you sure?")
+        info(s"Channel Name: ${ca.app.dataDeleteChannel.get}")
+        info(s"  Channel ID: ${channelId.get}")
+        info(s"    App Name: ${app.name}")
+        info(s"      App ID: ${app.id}")
+        info(s" Description: ${app.description}")
+      } else {
+        info(s"Data of the following app (default channel only) will be deleted. Are you sure?")
+        info(s"    App Name: ${app.name}")
+        info(s"      App ID: ${app.id}")
+        info(s" Description: ${app.description}")
+      }
+
+      val choice = if(ca.app.force) "YES" else readLine("Enter 'YES' to proceed: ")
+
+      choice match {
+        case "YES" => {
+          val events = storage.Storage.getLEvents()
+          // remove table
+          val r1 = if (events.remove(app.id, channelId)) {
+            if (channelId.isDefined) {
+              info(s"Removed Event Store for this channel ID: ${channelId.get}")
+            } else {
+              info(s"Removed Event Store for this app ID: ${app.id}")
+            }
+            0
+          } else {
+            if (channelId.isDefined) {
+              error(s"Error removing Event Store for this channel.")
+            } else {
+              error(s"Error removing Event Store for this app.")
+            }
+            1
+          }
+          // re-create table
+          val dbInit = events.init(app.id, channelId)
+          val r2 = if (dbInit) {
+            if (channelId.isDefined) {
+              info(s"Initialized Event Store for this channel ID: ${channelId.get}.")
+            } else {
+              info(s"Initialized Event Store for this app ID: ${app.id}.")
+            }
+            0
+          } else {
+            if (channelId.isDefined) {
+              error(s"Unable to initialize Event Store for this channel ID:" +
+                s" ${channelId.get}.")
+            } else {
+              error(s"Unable to initialize Event Store for this appId:" +
+                s" ${app.id}.")
+            }
+            1
+          }
+          events.close()
+          info("Done.")
+          r1 + r2
+        }
+        case _ =>
+          info("Aborted.")
+          0
+      }
+    } getOrElse {
+      error(s"App ${ca.app.name} does not exist. Aborting.")
+      1
+    }
+  }
+
+  def dataDeleteAll(ca: ConsoleArgs): Int = {
+    val apps = storage.Storage.getMetaDataApps
+    val channels = storage.Storage.getMetaDataChannels
+    val events = storage.Storage.getLEvents()
+    val status = apps.getByName(ca.app.name) map { app =>
+      info(s"All data of the app (including default and all channels) will be deleted." +
+        " Are you sure?")
+      info(s"    App Name: ${app.name}")
+      info(s"      App ID: ${app.id}")
+      info(s" Description: ${app.description}")
+      val chans = channels.getByAppid(app.id)
+      var firstChan = true
+      val titleName = "Channel Name"
+      val titleID = "Channel ID"
+      chans.foreach { ch =>
+        if (firstChan) {
+          info(f"    Channels: ${titleName}%16s | ${titleID}%10s ")
+          firstChan = false
+        }
+        info(f"              ${ch.name}%16s | ${ch.id}%10s")
+      }
+
+      val choice = if(ca.app.force) "YES" else readLine("Enter 'YES' to proceed: ")
+      choice match {
+        case "YES" => {
+          // delete channels
+          val delChannelStatus: Seq[Int] = chans.map { ch =>
+            val r1 = if (events.remove(app.id, Some(ch.id))) {
+              info(s"Removed Event Store of the channel ID: ${ch.id}")
+              0
+            } else {
+              error(s"Error removing Event Store of the channel ID: ${ch.id}.")
+              1
+            }
+            // re-create table
+            val dbInit = events.init(app.id, Some(ch.id))
+            val r2 = if (dbInit) {
+              info(s"Initialized Event Store of the channel ID: ${ch.id}")
+              0
+            } else {
+              error(s"Unable to initialize Event Store of the channel ID: ${ch.id}.")
+              1
+            }
+            r1 + r2
+          }
+
+          if (delChannelStatus.filter(_ != 0).isEmpty) {
+            val r1 = if (events.remove(app.id)) {
+              info(s"Removed Event Store for this app ID: ${app.id}")
+              0
+            } else {
+              error(s"Error removing Event Store for this app.")
+              1
+            }
+
+            val dbInit = events.init(app.id)
+            val r2 = if (dbInit) {
+              info(s"Initialized Event Store for this app ID: ${app.id}.")
+              0
+            } else {
+              error(s"Unable to initialize Event Store for this appId: ${app.id}.")
+              1
+            }
+            info("Done.")
+            r1 + r2
+          } else 1
+        }
+        case _ =>
+          info("Aborted.")
+          0
+      }
+    } getOrElse {
+      error(s"App ${ca.app.name} does not exist. Aborting.")
+      1
+    }
+    events.close()
+    status
+  }
+
+  def channelNew(ca: ConsoleArgs): Int = {
+    val apps = storage.Storage.getMetaDataApps
+    val channels = storage.Storage.getMetaDataChannels
+    val events = storage.Storage.getLEvents()
+    val newChannel = ca.app.channel
+    val status = apps.getByName(ca.app.name) map { app =>
+      val channelMap = channels.getByAppid(app.id).map(c => (c.name, c.id)).toMap
+      if (channelMap.contains(newChannel)) {
+        error(s"Unable to create new channel.")
+        error(s"Channel ${newChannel} already exists.")
+        1
+      } else if (!storage.Channel.isValidName(newChannel)) {
+        error(s"Unable to create new channel.")
+        error(s"The channel name ${newChannel} is invalid.")
+        error(s"${storage.Channel.nameConstraint}")
+        1
+      } else {
+
+        val channelId = channels.insert(storage.Channel(
+          id = 0, // new id will be assigned
+          appid = app.id,
+          name = newChannel
+        ))
+        channelId.map { chanId =>
+          info(s"Updated Channel meta-data.")
+          // initialize storage
+          val dbInit = events.init(app.id, Some(chanId))
+          if (dbInit) {
+            info(s"Initialized Event Store for the channel: ${newChannel}.")
+            info(s"Created new channel:")
+            info(s"    Channel Name: ${newChannel}")
+            info(s"      Channel ID: ${chanId}")
+            info(s"          App ID: ${app.id}")
+            0
+          } else {
+            error(s"Unable to create new channel.")
+            error(s"Failed to initalize Event Store.")
+            // reverted back the meta data
+            try {
+              channels.delete(chanId)
+              0
+            } catch {
+              case e: Exception =>
+                error(s"Failed to revert back the Channel meta-data change.", e)
+                error(s"The channel ${newChannel} CANNOT be used!")
+                error(s"Please run 'pio app channel-delete ${app.name} ${newChannel}' " +
+                  "to delete this channel!")
+                1
+            }
+          }
+        }.getOrElse {
+          error(s"Unable to create new channel.")
+          error(s"Failed to update Channel meta-data.")
+          1
+        }
+      }
+    } getOrElse {
+      error(s"App ${ca.app.name} does not exist. Aborting.")
+      1
+    }
+    events.close()
+    status
+  }
+
+  def channelDelete(ca: ConsoleArgs): Int = {
+    val apps = storage.Storage.getMetaDataApps
+    val channels = storage.Storage.getMetaDataChannels
+    val events = storage.Storage.getLEvents()
+    val deleteChannel = ca.app.channel
+    val status = apps.getByName(ca.app.name) map { app =>
+      val channelMap = channels.getByAppid(app.id).map(c => (c.name, c.id)).toMap
+      if (!channelMap.contains(deleteChannel)) {
+        error(s"Unable to delete channel.")
+        error(s"Channel ${deleteChannel} doesn't exist.")
+        1
+      } else {
+        info(s"The following channel will be deleted. Are you sure?")
+        info(s"    Channel Name: ${deleteChannel}")
+        info(s"      Channel ID: ${channelMap(deleteChannel)}")
+        info(s"        App Name: ${app.name}")
+        info(s"          App ID: ${app.id}")
+        val choice = if(ca.app.force) "YES" else readLine("Enter 'YES' to proceed: ")
+        choice match {
+          case "YES" => {
+            // NOTE: remove storage first before remove meta data (in case remove storage failed)
+            val dbRemoved = events.remove(app.id, Some(channelMap(deleteChannel)))
+            if (dbRemoved) {
+              info(s"Removed Event Store for this channel: ${deleteChannel}")
+              try {
+                channels.delete(channelMap(deleteChannel))
+                info(s"Deleted channel: ${deleteChannel}.")
+                0
+              } catch {
+                case e: Exception =>
+                  error(s"Unable to delete channel.", e)
+                  error(s"Failed to update Channel meta-data.")
+                  error(s"The channel ${deleteChannel} CANNOT be used!")
+                  error(s"Please run 'pio app channel-delete ${app.name} ${deleteChannel}' " +
+                    "to delete this channel again!")
+                  1
+              }
+            } else {
+              error(s"Unable to delete channel.")
+              error(s"Error removing Event Store for this channel.")
+              1
+            }
+          }
+          case _ =>
+            info("Aborted.")
+            0
+        }
+      }
+    } getOrElse {
+      error(s"App ${ca.app.name} does not exist. Aborting.")
+      1
+    }
+    events.close()
+    status
+  }
+
+}


Mime
View raw message