http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/io/prediction/tools/dashboard/CorsSupport.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/io/prediction/tools/dashboard/CorsSupport.scala b/tools/src/main/scala/io/prediction/tools/dashboard/CorsSupport.scala
deleted file mode 100644
index 3d2c888..0000000
--- a/tools/src/main/scala/io/prediction/tools/dashboard/CorsSupport.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.tools.dashboard
-
-// Reference from: https://gist.github.com/waymost/4b5598523c2c7361abea
-
-import spray.http.{HttpMethods, HttpMethod, HttpResponse, AllOrigins}
-import spray.http.HttpHeaders._
-import spray.http.HttpMethods._
-import spray.http.HttpEntity
-import spray.routing._
-import spray.http.StatusCodes
-import spray.http.ContentTypes
-
-// see also https://developer.mozilla.org/en-US/docs/Web/HTTP/Access_control_CORS
-trait CORSSupport {
- this: HttpService =>
-
- private val allowOriginHeader = `Access-Control-Allow-Origin`(AllOrigins)
- private val optionsCorsHeaders = List(
- `Access-Control-Allow-Headers`("""Origin,
- |X-Requested-With,
- |Content-Type,
- |Accept,
- |Accept-Encoding,
- |Accept-Language,
- |Host,
- |Referer,
- |User-Agent""".stripMargin.replace("\n", " ")),
- `Access-Control-Max-Age`(1728000)
- )
-
- def cors[T]: Directive0 = mapRequestContext { ctx =>
- ctx.withRouteResponseHandling {
- // OPTION request for a resource that responds to other methods
- case Rejected(x) if (ctx.request.method.equals(HttpMethods.OPTIONS) &&
- x.exists(_.isInstanceOf[MethodRejection])) => {
- val allowedMethods: List[HttpMethod] = x.collect {
- case rejection: MethodRejection => rejection.supported
- }
- ctx.complete {
- HttpResponse().withHeaders(
- `Access-Control-Allow-Methods`(HttpMethods.OPTIONS, allowedMethods :_*) ::
- allowOriginHeader ::
- optionsCorsHeaders
- )
- }
- }
- }.withHttpResponseHeadersMapped { headers =>
- allowOriginHeader :: headers
- }
- }
-
- override def timeoutRoute: StandardRoute = complete {
- HttpResponse(
- StatusCodes.InternalServerError,
- HttpEntity(ContentTypes.`text/plain(UTF-8)`,
- "The server was not able to produce a timely response to your request."),
- List(allowOriginHeader)
- )
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/io/prediction/tools/dashboard/Dashboard.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/io/prediction/tools/dashboard/Dashboard.scala b/tools/src/main/scala/io/prediction/tools/dashboard/Dashboard.scala
deleted file mode 100644
index 154ba4e..0000000
--- a/tools/src/main/scala/io/prediction/tools/dashboard/Dashboard.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.tools.dashboard
-
-import com.typesafe.config.ConfigFactory
-import io.prediction.authentication.KeyAuthentication
-import io.prediction.configuration.SSLConfiguration
-import io.prediction.data.storage.Storage
-import spray.can.server.ServerSettings
-import spray.routing.directives.AuthMagnet
-import scala.concurrent.{Future, ExecutionContext}
-import akka.actor.{ActorContext, Actor, ActorSystem, Props}
-import akka.io.IO
-import akka.pattern.ask
-import akka.util.Timeout
-import com.github.nscala_time.time.Imports.DateTime
-import grizzled.slf4j.Logging
-import spray.can.Http
-import spray.http._
-import spray.http.MediaTypes._
-import spray.routing._
-import spray.routing.authentication.{Authentication, UserPass, BasicAuth}
-
-import scala.concurrent.duration._
-
-case class DashboardConfig(
- ip: String = "localhost",
- port: Int = 9000)
-
-object Dashboard extends Logging with SSLConfiguration{
- def main(args: Array[String]): Unit = {
- val parser = new scopt.OptionParser[DashboardConfig]("Dashboard") {
- opt[String]("ip") action { (x, c) =>
- c.copy(ip = x)
- } text("IP to bind to (default: localhost).")
- opt[Int]("port") action { (x, c) =>
- c.copy(port = x)
- } text("Port to bind to (default: 9000).")
- }
-
- parser.parse(args, DashboardConfig()) map { dc =>
- createDashboard(dc)
- }
- }
-
- def createDashboard(dc: DashboardConfig): Unit = {
- implicit val system = ActorSystem("pio-dashboard")
- val service =
- system.actorOf(Props(classOf[DashboardActor], dc), "dashboard")
- implicit val timeout = Timeout(5.seconds)
- val settings = ServerSettings(system)
- IO(Http) ? Http.Bind(
- service,
- interface = dc.ip,
- port = dc.port,
- settings = Some(settings.copy(sslEncryption = true)))
- system.awaitTermination
- }
-}
-
-class DashboardActor(
- val dc: DashboardConfig)
- extends Actor with DashboardService {
- def actorRefFactory: ActorContext = context
- def receive: Actor.Receive = runRoute(dashboardRoute)
-}
-
-trait DashboardService extends HttpService with KeyAuthentication with CORSSupport {
-
- implicit def executionContext: ExecutionContext = actorRefFactory.dispatcher
- val dc: DashboardConfig
- val evaluationInstances = Storage.getMetaDataEvaluationInstances
- val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_"))
- val serverStartTime = DateTime.now
- val dashboardRoute =
- path("") {
- authenticate(withAccessKeyFromFile) { request =>
- get {
- respondWithMediaType(`text/html`) {
- complete {
- val completedInstances = evaluationInstances.getCompleted
- html.index(
- dc,
- serverStartTime,
- pioEnvVars,
- completedInstances).toString
- }
- }
- }
- }
- } ~
- pathPrefix("engine_instances" / Segment) { instanceId =>
- path("evaluator_results.txt") {
- get {
- respondWithMediaType(`text/plain`) {
- evaluationInstances.get(instanceId).map { i =>
- complete(i.evaluatorResults)
- } getOrElse {
- complete(StatusCodes.NotFound)
- }
- }
- }
- } ~
- path("evaluator_results.html") {
- get {
- respondWithMediaType(`text/html`) {
- evaluationInstances.get(instanceId).map { i =>
- complete(i.evaluatorResultsHTML)
- } getOrElse {
- complete(StatusCodes.NotFound)
- }
- }
- }
- } ~
- path("evaluator_results.json") {
- get {
- respondWithMediaType(`application/json`) {
- evaluationInstances.get(instanceId).map { i =>
- complete(i.evaluatorResultsJSON)
- } getOrElse {
- complete(StatusCodes.NotFound)
- }
- }
- }
- } ~
- cors {
- path("local_evaluator_results.json") {
- get {
- respondWithMediaType(`application/json`) {
- evaluationInstances.get(instanceId).map { i =>
- complete(i.evaluatorResultsJSON)
- } getOrElse {
- complete(StatusCodes.NotFound)
- }
- }
- }
- }
- }
- } ~
- pathPrefix("assets") {
- getFromResourceDirectory("assets")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/io/prediction/tools/export/EventsToFile.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/io/prediction/tools/export/EventsToFile.scala b/tools/src/main/scala/io/prediction/tools/export/EventsToFile.scala
deleted file mode 100644
index 743d57a..0000000
--- a/tools/src/main/scala/io/prediction/tools/export/EventsToFile.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.tools.export
-
-import io.prediction.controller.Utils
-import io.prediction.data.storage.EventJson4sSupport
-import io.prediction.data.storage.Storage
-import io.prediction.tools.Runner
-import io.prediction.workflow.WorkflowContext
-import io.prediction.workflow.WorkflowUtils
-
-import grizzled.slf4j.Logging
-import org.apache.spark.sql.SQLContext
-import org.json4s.native.Serialization._
-
-case class EventsToFileArgs(
- env: String = "",
- logFile: String = "",
- appId: Int = 0,
- channel: Option[String] = None,
- outputPath: String = "",
- format: String = "parquet",
- verbose: Boolean = false,
- debug: Boolean = false)
-
-object EventsToFile extends Logging {
- def main(args: Array[String]): Unit = {
- val parser = new scopt.OptionParser[EventsToFileArgs]("EventsToFile") {
- opt[String]("env") action { (x, c) =>
- c.copy(env = x)
- }
- opt[String]("log-file") action { (x, c) =>
- c.copy(logFile = x)
- }
- opt[Int]("appid") action { (x, c) =>
- c.copy(appId = x)
- }
- opt[String]("channel") action { (x, c) =>
- c.copy(channel = Some(x))
- }
- opt[String]("format") action { (x, c) =>
- c.copy(format = x)
- }
- opt[String]("output") action { (x, c) =>
- c.copy(outputPath = x)
- }
- opt[Unit]("verbose") action { (x, c) =>
- c.copy(verbose = true)
- }
- opt[Unit]("debug") action { (x, c) =>
- c.copy(debug = true)
- }
- }
- parser.parse(args, EventsToFileArgs()) map { args =>
- // get channelId
- val channels = Storage.getMetaDataChannels
- val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap
-
- val channelId: Option[Int] = args.channel.map { ch =>
- if (!channelMap.contains(ch)) {
- error(s"Channel ${ch} doesn't exist in this app.")
- sys.exit(1)
- }
-
- channelMap(ch)
- }
-
- val channelStr = args.channel.map(n => " Channel " + n).getOrElse("")
-
- WorkflowUtils.modifyLogging(verbose = args.verbose)
- @transient lazy implicit val formats = Utils.json4sDefaultFormats +
- new EventJson4sSupport.APISerializer
- val sc = WorkflowContext(
- mode = "Export",
- batch = "App ID " + args.appId + channelStr,
- executorEnv = Runner.envStringToMap(args.env))
- val sqlContext = new SQLContext(sc)
- val events = Storage.getPEvents()
- val eventsRdd = events.find(appId = args.appId, channelId = channelId)(sc)
- val jsonStringRdd = eventsRdd.map(write(_))
- if (args.format == "json") {
- jsonStringRdd.saveAsTextFile(args.outputPath)
- } else {
- val jsonRdd = sqlContext.jsonRDD(jsonStringRdd)
- jsonRdd.saveAsParquetFile(args.outputPath)
- }
- info(s"Events are exported to ${args.outputPath}/.")
- info("Done.")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/io/prediction/tools/imprt/FileToEvents.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/io/prediction/tools/imprt/FileToEvents.scala b/tools/src/main/scala/io/prediction/tools/imprt/FileToEvents.scala
deleted file mode 100644
index 9a19a33..0000000
--- a/tools/src/main/scala/io/prediction/tools/imprt/FileToEvents.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.tools.imprt
-
-import io.prediction.controller.Utils
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.EventJson4sSupport
-import io.prediction.data.storage.Storage
-import io.prediction.tools.Runner
-import io.prediction.workflow.WorkflowContext
-import io.prediction.workflow.WorkflowUtils
-
-import grizzled.slf4j.Logging
-import org.json4s.native.Serialization._
-
-import scala.util.{Failure, Try}
-
-case class FileToEventsArgs(
- env: String = "",
- logFile: String = "",
- appId: Int = 0,
- channel: Option[String] = None,
- inputPath: String = "",
- verbose: Boolean = false,
- debug: Boolean = false)
-
-object FileToEvents extends Logging {
- def main(args: Array[String]): Unit = {
- val parser = new scopt.OptionParser[FileToEventsArgs]("FileToEvents") {
- opt[String]("env") action { (x, c) =>
- c.copy(env = x)
- }
- opt[String]("log-file") action { (x, c) =>
- c.copy(logFile = x)
- }
- opt[Int]("appid") action { (x, c) =>
- c.copy(appId = x)
- }
- opt[String]("channel") action { (x, c) =>
- c.copy(channel = Some(x))
- }
- opt[String]("input") action { (x, c) =>
- c.copy(inputPath = x)
- }
- opt[Unit]("verbose") action { (x, c) =>
- c.copy(verbose = true)
- }
- opt[Unit]("debug") action { (x, c) =>
- c.copy(debug = true)
- }
- }
- parser.parse(args, FileToEventsArgs()) map { args =>
- // get channelId
- val channels = Storage.getMetaDataChannels
- val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap
-
- val channelId: Option[Int] = args.channel.map { ch =>
- if (!channelMap.contains(ch)) {
- error(s"Channel ${ch} doesn't exist in this app.")
- sys.exit(1)
- }
-
- channelMap(ch)
- }
-
- val channelStr = args.channel.map(n => " Channel " + n).getOrElse("")
-
- WorkflowUtils.modifyLogging(verbose = args.verbose)
- @transient lazy implicit val formats = Utils.json4sDefaultFormats +
- new EventJson4sSupport.APISerializer
- val sc = WorkflowContext(
- mode = "Import",
- batch = "App ID " + args.appId + channelStr,
- executorEnv = Runner.envStringToMap(args.env))
- val rdd = sc.textFile(args.inputPath).filter(_.trim.nonEmpty).map { json =>
- Try(read[Event](json)).recoverWith {
- case e: Throwable =>
- error(s"\nmalformed json => $json")
- Failure(e)
- }.get
- }
- val events = Storage.getPEvents()
- events.write(events = rdd,
- appId = args.appId,
- channelId = channelId)(sc)
- info("Events are imported.")
- info("Done.")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala b/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala
new file mode 100644
index 0000000..1640d55
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala
@@ -0,0 +1,84 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools
+
+import java.io.File
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineManifest
+import org.apache.predictionio.data.storage.EngineManifestSerializer
+import org.apache.predictionio.data.storage.Storage
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.json4s._
+import org.json4s.native.Serialization.read
+
+import scala.io.Source
+
+object RegisterEngine extends Logging {
+ val engineManifests = Storage.getMetaDataEngineManifests
+ implicit val formats = DefaultFormats + new EngineManifestSerializer
+
+ def registerEngine(
+ jsonManifest: File,
+ engineFiles: Seq[File],
+ copyLocal: Boolean = false): Unit = {
+ val jsonString = try {
+ Source.fromFile(jsonManifest).mkString
+ } catch {
+ case e: java.io.FileNotFoundException =>
+ error(s"Engine manifest file not found: ${e.getMessage}. Aborting.")
+ sys.exit(1)
+ }
+ val engineManifest = read[EngineManifest](jsonString)
+
+ info(s"Registering engine ${engineManifest.id} ${engineManifest.version}")
+ engineManifests.update(
+ engineManifest.copy(files = engineFiles.map(_.toURI.toString)), true)
+ }
+
+ def unregisterEngine(jsonManifest: File): Unit = {
+ val jsonString = try {
+ Source.fromFile(jsonManifest).mkString
+ } catch {
+ case e: java.io.FileNotFoundException =>
+ error(s"Engine manifest file not found: ${e.getMessage}. Aborting.")
+ sys.exit(1)
+ }
+ val fileEngineManifest = read[EngineManifest](jsonString)
+ val engineManifest = engineManifests.get(
+ fileEngineManifest.id,
+ fileEngineManifest.version)
+
+ engineManifest map { em =>
+ val conf = new Configuration
+ val fs = FileSystem.get(conf)
+
+ em.files foreach { f =>
+ val path = new Path(f)
+ info(s"Removing ${f}")
+ fs.delete(path, false)
+ }
+
+ engineManifests.delete(em.id, em.version)
+ info(s"Unregistered engine ${em.id} ${em.version}")
+ } getOrElse {
+ error(s"${fileEngineManifest.id} ${fileEngineManifest.version} is not " +
+ "registered.")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala
new file mode 100644
index 0000000..5dae46b
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala
@@ -0,0 +1,178 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools
+
+import java.io.File
+import java.net.URI
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineManifest
+import org.apache.predictionio.tools.console.ConsoleArgs
+import org.apache.predictionio.workflow.WorkflowUtils
+
+import scala.sys.process._
+
+object RunServer extends Logging {
+ def runServer(
+ ca: ConsoleArgs,
+ core: File,
+ em: EngineManifest,
+ engineInstanceId: String): Int = {
+ val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_")).map(kv =>
+ s"${kv._1}=${kv._2}"
+ ).mkString(",")
+
+ val sparkHome = ca.common.sparkHome.getOrElse(
+ sys.env.getOrElse("SPARK_HOME", "."))
+
+ val extraFiles = WorkflowUtils.thirdPartyConfFiles
+
+ val driverClassPathIndex =
+ ca.common.sparkPassThrough.indexOf("--driver-class-path")
+ val driverClassPathPrefix =
+ if (driverClassPathIndex != -1) {
+ Seq(ca.common.sparkPassThrough(driverClassPathIndex + 1))
+ } else {
+ Seq()
+ }
+ val extraClasspaths =
+ driverClassPathPrefix ++ WorkflowUtils.thirdPartyClasspaths
+
+ val deployModeIndex =
+ ca.common.sparkPassThrough.indexOf("--deploy-mode")
+ val deployMode = if (deployModeIndex != -1) {
+ ca.common.sparkPassThrough(deployModeIndex + 1)
+ } else {
+ "client"
+ }
+
+ val mainJar =
+ if (ca.build.uberJar) {
+ if (deployMode == "cluster") {
+ em.files.filter(_.startsWith("hdfs")).head
+ } else {
+ em.files.filterNot(_.startsWith("hdfs")).head
+ }
+ } else {
+ if (deployMode == "cluster") {
+ em.files.filter(_.contains("pio-assembly")).head
+ } else {
+ core.getCanonicalPath
+ }
+ }
+
+ val jarFiles = (em.files ++ Option(new File(ca.common.pioHome.get, "plugins")
+ .listFiles()).getOrElse(Array.empty[File]).map(_.getAbsolutePath)).mkString(",")
+
+ val sparkSubmit =
+ Seq(Seq(sparkHome, "bin", "spark-submit").mkString(File.separator)) ++
+ ca.common.sparkPassThrough ++
+ Seq(
+ "--class",
+ "org.apache.predictionio.workflow.CreateServer",
+ "--name",
+ s"PredictionIO Engine Instance: ${engineInstanceId}") ++
+ (if (!ca.build.uberJar) {
+ Seq("--jars", jarFiles)
+ } else Seq()) ++
+ (if (extraFiles.size > 0) {
+ Seq("--files", extraFiles.mkString(","))
+ } else {
+ Seq()
+ }) ++
+ (if (extraClasspaths.size > 0) {
+ Seq("--driver-class-path", extraClasspaths.mkString(":"))
+ } else {
+ Seq()
+ }) ++
+ (if (ca.common.sparkKryo) {
+ Seq(
+ "--conf",
+ "spark.serializer=org.apache.spark.serializer.KryoSerializer")
+ } else {
+ Seq()
+ }) ++
+ Seq(
+ mainJar,
+ "--engineInstanceId",
+ engineInstanceId,
+ "--ip",
+ ca.deploy.ip,
+ "--port",
+ ca.deploy.port.toString,
+ "--event-server-ip",
+ ca.eventServer.ip,
+ "--event-server-port",
+ ca.eventServer.port.toString) ++
+ (if (ca.accessKey.accessKey != "") {
+ Seq("--accesskey", ca.accessKey.accessKey)
+ } else {
+ Seq()
+ }) ++
+ (if (ca.eventServer.enabled) Seq("--feedback") else Seq()) ++
+ (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++
+ (if (ca.common.verbose) Seq("--verbose") else Seq()) ++
+ ca.deploy.logUrl.map(x => Seq("--log-url", x)).getOrElse(Seq()) ++
+ ca.deploy.logPrefix.map(x => Seq("--log-prefix", x)).getOrElse(Seq()) ++
+ Seq("--json-extractor", ca.common.jsonExtractor.toString)
+
+ info(s"Submission command: ${sparkSubmit.mkString(" ")}")
+
+ val proc =
+ Process(sparkSubmit, None, "CLASSPATH" -> "", "SPARK_YARN_USER_ENV" -> pioEnvVars).run()
+ Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
+ def run(): Unit = {
+ proc.destroy()
+ }
+ }))
+ proc.exitValue()
+ }
+
+ def newRunServer(
+ ca: ConsoleArgs,
+ em: EngineManifest,
+ engineInstanceId: String): Int = {
+ val jarFiles = em.files.map(new URI(_)) ++
+ Option(new File(ca.common.pioHome.get, "plugins").listFiles())
+ .getOrElse(Array.empty[File]).map(_.toURI)
+ val args = Seq(
+ "--engineInstanceId",
+ engineInstanceId,
+ "--engine-variant",
+ ca.common.variantJson.toURI.toString,
+ "--ip",
+ ca.deploy.ip,
+ "--port",
+ ca.deploy.port.toString,
+ "--event-server-ip",
+ ca.eventServer.ip,
+ "--event-server-port",
+ ca.eventServer.port.toString) ++
+ (if (ca.accessKey.accessKey != "") {
+ Seq("--accesskey", ca.accessKey.accessKey)
+ } else {
+ Nil
+ }) ++
+ (if (ca.eventServer.enabled) Seq("--feedback") else Nil) ++
+ (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Nil) ++
+ (if (ca.common.verbose) Seq("--verbose") else Nil) ++
+ ca.deploy.logUrl.map(x => Seq("--log-url", x)).getOrElse(Nil) ++
+ ca.deploy.logPrefix.map(x => Seq("--log-prefix", x)).getOrElse(Nil) ++
+ Seq("--json-extractor", ca.common.jsonExtractor.toString)
+
+ Runner.runOnSpark("org.apache.predictionio.workflow.CreateServer", args, ca, jarFiles)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
new file mode 100644
index 0000000..4b42f40
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
@@ -0,0 +1,212 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools
+
+import java.io.File
+import java.net.URI
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.EngineManifest
+import org.apache.predictionio.tools.console.ConsoleArgs
+import org.apache.predictionio.workflow.WorkflowUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+
+import scala.sys.process._
+
+object RunWorkflow extends Logging {
+ def runWorkflow(
+ ca: ConsoleArgs,
+ core: File,
+ em: EngineManifest,
+ variantJson: File): Int = {
+ // Collect and serialize PIO_* environmental variables
+ val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_")).map(kv =>
+ s"${kv._1}=${kv._2}"
+ ).mkString(",")
+
+ val sparkHome = ca.common.sparkHome.getOrElse(
+ sys.env.getOrElse("SPARK_HOME", "."))
+
+ val hadoopConf = new Configuration
+ val hdfs = FileSystem.get(hadoopConf)
+
+ val driverClassPathIndex =
+ ca.common.sparkPassThrough.indexOf("--driver-class-path")
+ val driverClassPathPrefix =
+ if (driverClassPathIndex != -1) {
+ Seq(ca.common.sparkPassThrough(driverClassPathIndex + 1))
+ } else {
+ Seq()
+ }
+ val extraClasspaths =
+ driverClassPathPrefix ++ WorkflowUtils.thirdPartyClasspaths
+
+ val deployModeIndex =
+ ca.common.sparkPassThrough.indexOf("--deploy-mode")
+ val deployMode = if (deployModeIndex != -1) {
+ ca.common.sparkPassThrough(deployModeIndex + 1)
+ } else {
+ "client"
+ }
+
+ val extraFiles = WorkflowUtils.thirdPartyConfFiles
+
+ val mainJar =
+ if (ca.build.uberJar) {
+ if (deployMode == "cluster") {
+ em.files.filter(_.startsWith("hdfs")).head
+ } else {
+ em.files.filterNot(_.startsWith("hdfs")).head
+ }
+ } else {
+ if (deployMode == "cluster") {
+ em.files.filter(_.contains("pio-assembly")).head
+ } else {
+ core.getCanonicalPath
+ }
+ }
+
+ val workMode =
+ ca.common.evaluation.map(_ => "Evaluation").getOrElse("Training")
+
+ val engineLocation = Seq(
+ sys.env("PIO_FS_ENGINESDIR"),
+ em.id,
+ em.version)
+
+ if (deployMode == "cluster") {
+ val dstPath = new Path(engineLocation.mkString(Path.SEPARATOR))
+ info("Cluster deploy mode detected. Trying to copy " +
+ s"${variantJson.getCanonicalPath} to " +
+ s"${hdfs.makeQualified(dstPath).toString}.")
+ hdfs.copyFromLocalFile(new Path(variantJson.toURI), dstPath)
+ }
+
+ val sparkSubmit =
+ Seq(Seq(sparkHome, "bin", "spark-submit").mkString(File.separator)) ++
+ ca.common.sparkPassThrough ++
+ Seq(
+ "--class",
+ "org.apache.predictionio.workflow.CreateWorkflow",
+ "--name",
+ s"PredictionIO $workMode: ${em.id} ${em.version} (${ca.common.batch})") ++
+ (if (!ca.build.uberJar) {
+ Seq("--jars", em.files.mkString(","))
+ } else Seq()) ++
+ (if (extraFiles.size > 0) {
+ Seq("--files", extraFiles.mkString(","))
+ } else {
+ Seq()
+ }) ++
+ (if (extraClasspaths.size > 0) {
+ Seq("--driver-class-path", extraClasspaths.mkString(":"))
+ } else {
+ Seq()
+ }) ++
+ (if (ca.common.sparkKryo) {
+ Seq(
+ "--conf",
+ "spark.serializer=org.apache.spark.serializer.KryoSerializer")
+ } else {
+ Seq()
+ }) ++
+ Seq(
+ mainJar,
+ "--env",
+ pioEnvVars,
+ "--engine-id",
+ em.id,
+ "--engine-version",
+ em.version,
+ "--engine-variant",
+ if (deployMode == "cluster") {
+ hdfs.makeQualified(new Path(
+ (engineLocation :+ variantJson.getName).mkString(Path.SEPARATOR))).
+ toString
+ } else {
+ variantJson.getCanonicalPath
+ },
+ "--verbosity",
+ ca.common.verbosity.toString) ++
+ ca.common.engineFactory.map(
+ x => Seq("--engine-factory", x)).getOrElse(Seq()) ++
+ ca.common.engineParamsKey.map(
+ x => Seq("--engine-params-key", x)).getOrElse(Seq()) ++
+ (if (deployMode == "cluster") Seq("--deploy-mode", "cluster") else Seq()) ++
+ (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++
+ (if (ca.common.verbose) Seq("--verbose") else Seq()) ++
+ (if (ca.common.skipSanityCheck) Seq("--skip-sanity-check") else Seq()) ++
+ (if (ca.common.stopAfterRead) Seq("--stop-after-read") else Seq()) ++
+ (if (ca.common.stopAfterPrepare) {
+ Seq("--stop-after-prepare")
+ } else {
+ Seq()
+ }) ++
+ ca.common.evaluation.map(x => Seq("--evaluation-class", x)).
+ getOrElse(Seq()) ++
+ // If engineParamsGenerator is specified, it overrides the evaluation.
+ ca.common.engineParamsGenerator.orElse(ca.common.evaluation)
+ .map(x => Seq("--engine-params-generator-class", x))
+ .getOrElse(Seq()) ++
+ (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++
+ Seq("--json-extractor", ca.common.jsonExtractor.toString)
+
+ info(s"Submission command: ${sparkSubmit.mkString(" ")}")
+ Process(sparkSubmit, None, "CLASSPATH" -> "", "SPARK_YARN_USER_ENV" -> pioEnvVars).!
+ }
+
+ def newRunWorkflow(ca: ConsoleArgs, em: EngineManifest): Int = {
+ val jarFiles = em.files.map(new URI(_))
+ val args = Seq(
+ "--engine-id",
+ em.id,
+ "--engine-version",
+ em.version,
+ "--engine-variant",
+ ca.common.variantJson.toURI.toString,
+ "--verbosity",
+ ca.common.verbosity.toString) ++
+ ca.common.engineFactory.map(
+ x => Seq("--engine-factory", x)).getOrElse(Seq()) ++
+ ca.common.engineParamsKey.map(
+ x => Seq("--engine-params-key", x)).getOrElse(Seq()) ++
+ (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++
+ (if (ca.common.verbose) Seq("--verbose") else Seq()) ++
+ (if (ca.common.skipSanityCheck) Seq("--skip-sanity-check") else Seq()) ++
+ (if (ca.common.stopAfterRead) Seq("--stop-after-read") else Seq()) ++
+ (if (ca.common.stopAfterPrepare) {
+ Seq("--stop-after-prepare")
+ } else {
+ Seq()
+ }) ++
+ ca.common.evaluation.map(x => Seq("--evaluation-class", x)).
+ getOrElse(Seq()) ++
+ // If engineParamsGenerator is specified, it overrides the evaluation.
+ ca.common.engineParamsGenerator.orElse(ca.common.evaluation)
+ .map(x => Seq("--engine-params-generator-class", x))
+ .getOrElse(Seq()) ++
+ (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++
+ Seq("--json-extractor", ca.common.jsonExtractor.toString)
+
+ Runner.runOnSpark(
+ "org.apache.predictionio.workflow.CreateWorkflow",
+ args,
+ ca,
+ jarFiles)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala b/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala
new file mode 100644
index 0000000..3a8fed5
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala
@@ -0,0 +1,211 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools
+
+import java.io.File
+import java.net.URI
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.tools.console.ConsoleArgs
+import org.apache.predictionio.workflow.WorkflowUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+
+import scala.sys.process._
+
+object Runner extends Logging {
+ def envStringToMap(env: String): Map[String, String] =
+ env.split(',').flatMap(p =>
+ p.split('=') match {
+ case Array(k, v) => List(k -> v)
+ case _ => Nil
+ }
+ ).toMap
+
+ def argumentValue(arguments: Seq[String], argumentName: String): Option[String] = {
+ val argumentIndex = arguments.indexOf(argumentName)
+ try {
+ arguments(argumentIndex) // just to make it error out if index is -1
+ Some(arguments(argumentIndex + 1))
+ } catch {
+ case e: IndexOutOfBoundsException => None
+ }
+ }
+
+ def handleScratchFile(
+ fileSystem: Option[FileSystem],
+ uri: Option[URI],
+ localFile: File): String = {
+ val localFilePath = localFile.getCanonicalPath
+ (fileSystem, uri) match {
+ case (Some(fs), Some(u)) =>
+ val dest = fs.makeQualified(Path.mergePaths(
+ new Path(u),
+ new Path(localFilePath)))
+ info(s"Copying $localFile to ${dest.toString}")
+ fs.copyFromLocalFile(new Path(localFilePath), dest)
+ dest.toUri.toString
+ case _ => localFile.toURI.toString
+ }
+ }
+
+ def cleanup(fs: Option[FileSystem], uri: Option[URI]): Unit = {
+ (fs, uri) match {
+ case (Some(f), Some(u)) =>
+ f.close()
+ case _ => Unit
+ }
+ }
+
+ def detectFilePaths(
+ fileSystem: Option[FileSystem],
+ uri: Option[URI],
+ args: Seq[String]): Seq[String] = {
+ args map { arg =>
+ val f = try {
+ new File(new URI(arg))
+ } catch {
+ case e: Throwable => new File(arg)
+ }
+ if (f.exists()) {
+ handleScratchFile(fileSystem, uri, f)
+ } else {
+ arg
+ }
+ }
+ }
+
+ def runOnSpark(
+ className: String,
+ classArgs: Seq[String],
+ ca: ConsoleArgs,
+ extraJars: Seq[URI]): Int = {
+ // Return error for unsupported cases
+ val deployMode =
+ argumentValue(ca.common.sparkPassThrough, "--deploy-mode").getOrElse("client")
+ val master =
+ argumentValue(ca.common.sparkPassThrough, "--master").getOrElse("local")
+
+ (ca.common.scratchUri, deployMode, master) match {
+ case (Some(u), "client", m) if m != "yarn-cluster" =>
+ error("--scratch-uri cannot be set when deploy mode is client")
+ return 1
+ case (_, "cluster", m) if m.startsWith("spark://") =>
+ error("Using cluster deploy mode with Spark standalone cluster is not supported")
+ return 1
+ case _ => Unit
+ }
+
+ // Initialize HDFS API for scratch URI
+ val fs = ca.common.scratchUri map { uri =>
+ FileSystem.get(uri, new Configuration())
+ }
+
+ // Collect and serialize PIO_* environmental variables
+ val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_")).map(kv =>
+ s"${kv._1}=${kv._2}"
+ ).mkString(",")
+
+ // Location of Spark
+ val sparkHome = ca.common.sparkHome.getOrElse(
+ sys.env.getOrElse("SPARK_HOME", "."))
+
+ // Local path to PredictionIO assembly JAR
+ val mainJar = handleScratchFile(
+ fs,
+ ca.common.scratchUri,
+ console.Console.coreAssembly(ca.common.pioHome.get))
+
+ // Extra JARs that are needed by the driver
+ val driverClassPathPrefix =
+ argumentValue(ca.common.sparkPassThrough, "--driver-class-path") map { v =>
+ Seq(v)
+ } getOrElse {
+ Nil
+ }
+
+ val extraClasspaths =
+ driverClassPathPrefix ++ WorkflowUtils.thirdPartyClasspaths
+
+ // Extra files that are needed to be passed to --files
+ val extraFiles = WorkflowUtils.thirdPartyConfFiles map { f =>
+ handleScratchFile(fs, ca.common.scratchUri, new File(f))
+ }
+
+ val deployedJars = extraJars map { j =>
+ handleScratchFile(fs, ca.common.scratchUri, new File(j))
+ }
+
+ val sparkSubmitCommand =
+ Seq(Seq(sparkHome, "bin", "spark-submit").mkString(File.separator))
+
+ val sparkSubmitJars = if (extraJars.nonEmpty) {
+ Seq("--jars", deployedJars.map(_.toString).mkString(","))
+ } else {
+ Nil
+ }
+
+ val sparkSubmitFiles = if (extraFiles.nonEmpty) {
+ Seq("--files", extraFiles.mkString(","))
+ } else {
+ Nil
+ }
+
+ val sparkSubmitExtraClasspaths = if (extraClasspaths.nonEmpty) {
+ Seq("--driver-class-path", extraClasspaths.mkString(":"))
+ } else {
+ Nil
+ }
+
+ val sparkSubmitKryo = if (ca.common.sparkKryo) {
+ Seq(
+ "--conf",
+ "spark.serializer=org.apache.spark.serializer.KryoSerializer")
+ } else {
+ Nil
+ }
+
+ val verbose = if (ca.common.verbose) Seq("--verbose") else Nil
+
+ val sparkSubmit = Seq(
+ sparkSubmitCommand,
+ ca.common.sparkPassThrough,
+ Seq("--class", className),
+ sparkSubmitJars,
+ sparkSubmitFiles,
+ sparkSubmitExtraClasspaths,
+ sparkSubmitKryo,
+ Seq(mainJar),
+ detectFilePaths(fs, ca.common.scratchUri, classArgs),
+ Seq("--env", pioEnvVars),
+ verbose).flatten.filter(_ != "")
+ info(s"Submission command: ${sparkSubmit.mkString(" ")}")
+ val proc = Process(
+ sparkSubmit,
+ None,
+ "CLASSPATH" -> "",
+ "SPARK_YARN_USER_ENV" -> pioEnvVars).run()
+ Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
+ def run(): Unit = {
+ cleanup(fs, ca.common.scratchUri)
+ proc.destroy()
+ }
+ }))
+ cleanup(fs, ca.common.scratchUri)
+ proc.exitValue()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala b/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala
new file mode 100644
index 0000000..b70cb7e
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala
@@ -0,0 +1,156 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools.admin
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.event.Logging
+import akka.io.IO
+import akka.util.Timeout
+import org.apache.predictionio.data.api.StartServer
+import org.apache.predictionio.data.storage.Storage
+import org.json4s.{Formats, DefaultFormats}
+
+import java.util.concurrent.TimeUnit
+
+import spray.can.Http
+import spray.http.{MediaTypes, StatusCodes}
+import spray.httpx.Json4sSupport
+import spray.routing._
+
+import scala.concurrent.ExecutionContext
+
+class AdminServiceActor(val commandClient: CommandClient)
+ extends HttpServiceActor {
+
+ object Json4sProtocol extends Json4sSupport {
+ implicit def json4sFormats: Formats = DefaultFormats
+ }
+
+ import Json4sProtocol._
+
+ val log = Logging(context.system, this)
+
+ // we use the enclosing ActorContext's or ActorSystem's dispatcher for our
+ // Futures
+ implicit def executionContext: ExecutionContext = actorRefFactory.dispatcher
+ implicit val timeout: Timeout = Timeout(5, TimeUnit.SECONDS)
+
+ // for better message response
+ val rejectionHandler = RejectionHandler {
+ case MalformedRequestContentRejection(msg, _) :: _ =>
+ complete(StatusCodes.BadRequest, Map("message" -> msg))
+ case MissingQueryParamRejection(msg) :: _ =>
+ complete(StatusCodes.NotFound,
+ Map("message" -> s"missing required query parameter ${msg}."))
+ case AuthenticationFailedRejection(cause, challengeHeaders) :: _ =>
+ complete(StatusCodes.Unauthorized, challengeHeaders,
+ Map("message" -> s"Invalid accessKey."))
+ }
+
+ val jsonPath = """(.+)\.json$""".r
+
+ val route: Route =
+ pathSingleSlash {
+ get {
+ respondWithMediaType(MediaTypes.`application/json`) {
+ complete(Map("status" -> "alive"))
+ }
+ }
+ } ~
+ path("cmd" / "app" / Segment / "data") {
+ appName => {
+ delete {
+ respondWithMediaType(MediaTypes.`application/json`) {
+ complete(commandClient.futureAppDataDelete(appName))
+ }
+ }
+ }
+ } ~
+ path("cmd" / "app" / Segment) {
+ appName => {
+ delete {
+ respondWithMediaType(MediaTypes.`application/json`) {
+ complete(commandClient.futureAppDelete(appName))
+ }
+ }
+ }
+ } ~
+ path("cmd" / "app") {
+ get {
+ respondWithMediaType(MediaTypes.`application/json`) {
+ complete(commandClient.futureAppList())
+ }
+ } ~
+ post {
+ entity(as[AppRequest]) {
+ appArgs => respondWithMediaType(MediaTypes.`application/json`) {
+ complete(commandClient.futureAppNew(appArgs))
+ }
+ }
+ }
+ }
+ def receive: Actor.Receive = runRoute(route)
+}
+
+class AdminServerActor(val commandClient: CommandClient) extends Actor {
+ val log = Logging(context.system, this)
+ val child = context.actorOf(
+ Props(classOf[AdminServiceActor], commandClient),
+ "AdminServiceActor")
+
+ implicit val system = context.system
+
+ def receive: PartialFunction[Any, Unit] = {
+ case StartServer(host, portNum) => {
+ IO(Http) ! Http.Bind(child, interface = host, port = portNum)
+
+ }
+ case m: Http.Bound => log.info("Bound received. AdminServer is ready.")
+ case m: Http.CommandFailed => log.error("Command failed.")
+ case _ => log.error("Unknown message.")
+ }
+}
+
+case class AdminServerConfig(
+ ip: String = "localhost",
+ port: Int = 7071
+)
+
+object AdminServer {
+ def createAdminServer(config: AdminServerConfig): Unit = {
+ implicit val system = ActorSystem("AdminServerSystem")
+
+ val commandClient = new CommandClient(
+ appClient = Storage.getMetaDataApps,
+ accessKeyClient = Storage.getMetaDataAccessKeys,
+ eventClient = Storage.getLEvents()
+ )
+
+ val serverActor = system.actorOf(
+ Props(classOf[AdminServerActor], commandClient),
+ "AdminServerActor")
+ serverActor ! StartServer(config.ip, config.port)
+ system.awaitTermination
+ }
+}
+
+object AdminRun {
+ def main (args: Array[String]) {
+ AdminServer.createAdminServer(AdminServerConfig(
+ ip = "localhost",
+ port = 7071))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala b/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala
new file mode 100644
index 0000000..143023e
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala
@@ -0,0 +1,160 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools.admin
+
+import org.apache.predictionio.data.storage._
+
+import scala.concurrent.{ExecutionContext, Future}
+
+abstract class BaseResponse()
+
+case class GeneralResponse(
+ status: Int = 0,
+ message: String = ""
+) extends BaseResponse()
+
+case class AppRequest(
+ id: Int = 0,
+ name: String = "",
+ description: String = ""
+)
+
+case class TrainRequest(
+ enginePath: String = ""
+)
+case class AppResponse(
+ id: Int = 0,
+ name: String = "",
+ keys: Seq[AccessKey]
+) extends BaseResponse()
+
+case class AppNewResponse(
+ status: Int = 0,
+ message: String = "",
+ id: Int = 0,
+ name: String = "",
+ key: String
+) extends BaseResponse()
+
+case class AppListResponse(
+ status: Int = 0,
+ message: String = "",
+ apps: Seq[AppResponse]
+) extends BaseResponse()
+
+class CommandClient(
+ val appClient: Apps,
+ val accessKeyClient: AccessKeys,
+ val eventClient: LEvents
+) {
+
+ def futureAppNew(req: AppRequest)(implicit ec: ExecutionContext): Future[BaseResponse] = Future {
+ val response = appClient.getByName(req.name) map { app =>
+ GeneralResponse(0, s"App ${req.name} already exists. Aborting.")
+ } getOrElse {
+ appClient.get(req.id) map {
+ app2 =>
+ GeneralResponse(0,
+ s"App ID ${app2.id} already exists and maps to the app '${app2.name}'. " +
+ "Aborting.")
+ } getOrElse {
+ val appid = appClient.insert(App(
+ id = Option(req.id).getOrElse(0),
+ name = req.name,
+ description = Option(req.description)))
+ appid map { id =>
+ val dbInit = eventClient.init(id)
+ val r = if (dbInit) {
+ val accessKey = AccessKey(
+ key = "",
+ appid = id,
+ events = Seq())
+ val accessKey2 = accessKeyClient.insert(AccessKey(
+ key = "",
+ appid = id,
+ events = Seq()))
+ accessKey2 map { k =>
+ new AppNewResponse(1,"App created successfully.",id, req.name, k)
+ } getOrElse {
+ GeneralResponse(0, s"Unable to create new access key.")
+ }
+ } else {
+ GeneralResponse(0, s"Unable to initialize Event Store for this app ID: ${id}.")
+ }
+ r
+ } getOrElse {
+ GeneralResponse(0, s"Unable to create new app.")
+ }
+ }
+ }
+ response
+ }
+
+ def futureAppList()(implicit ec: ExecutionContext): Future[AppListResponse] = Future {
+ val apps = appClient.getAll().sortBy(_.name)
+ val appsRes = apps.map {
+ app => {
+ new AppResponse(app.id, app.name, accessKeyClient.getByAppid(app.id))
+ }
+ }
+ new AppListResponse(1, "Successful retrieved app list.", appsRes)
+ }
+
+ def futureAppDataDelete(appName: String)
+ (implicit ec: ExecutionContext): Future[GeneralResponse] = Future {
+ val response = appClient.getByName(appName) map { app =>
+ val data = if (eventClient.remove(app.id)) {
+ GeneralResponse(1, s"Removed Event Store for this app ID: ${app.id}")
+ } else {
+ GeneralResponse(0, s"Error removing Event Store for this app.")
+ }
+
+ val dbInit = eventClient.init(app.id)
+ val data2 = if (dbInit) {
+ GeneralResponse(1, s"Initialized Event Store for this app ID: ${app.id}.")
+ } else {
+ GeneralResponse(0, s"Unable to initialize Event Store for this appId:" +
+ s" ${app.id}.")
+ }
+ GeneralResponse(data.status * data2.status, data.message + data2.message)
+ } getOrElse {
+ GeneralResponse(0, s"App ${appName} does not exist.")
+ }
+ response
+ }
+
+ def futureAppDelete(appName: String)
+ (implicit ec: ExecutionContext): Future[GeneralResponse] = Future {
+
+ val response = appClient.getByName(appName) map { app =>
+ val data = if (eventClient.remove(app.id)) {
+ Storage.getMetaDataApps.delete(app.id)
+ GeneralResponse(1, s"App successfully deleted")
+ } else {
+ GeneralResponse(0, s"Error removing Event Store for app ${app.name}.");
+ }
+ data
+ } getOrElse {
+ GeneralResponse(0, s"App ${appName} does not exist.")
+ }
+ response
+ }
+
+ def futureTrain(req: TrainRequest)
+ (implicit ec: ExecutionContext): Future[GeneralResponse] = Future {
+ null
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md b/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md
new file mode 100644
index 0000000..475a3de
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md
@@ -0,0 +1,161 @@
+## Admin API (under development)
+
+### Start Admin HTTP Server without bin/pio (for development)
+
+NOTE: elasticsearch and hbase should be running first.
+
+```
+$ sbt/sbt "tools/compile"
+$ set -a
+$ source conf/pio-env.sh
+$ set +a
+$ sbt/sbt "tools/run-main io.prediction.tools.admin.AdminRun"
+```
+
+### Unit test (Very minimal)
+
+```
+$ set -a
+$ source conf/pio-env.sh
+$ set +a
+$ sbt/sbt "tools/test-only io.prediction.tools.admin.AdminAPISpec"
+```
+
+### Start with pio command adminserver
+
+```
+$ pio adminserver
+```
+
+Admin Server url defaults to `http://localhost:7071`
+
+The host and port can be specified by using the 'ip' and 'port' parameters
+
+```
+$ pio adminserver --ip 127.0.0.1 --port 7080
+```
+
+### Current Supported Commands
+
+#### Check status
+
+```
+$ curl -i http://localhost:7071/
+
+{"status":"alive"}
+```
+
+#### Get list of apps
+
+```
+$ curl -i -X GET http://localhost:7071/cmd/app
+
+{"status":1,"message":"Successful retrieved app list.","apps":[{"id":12,"name":"scratch","keys":[{"key":"gtPgVMIr3uthus1QJWFBcIjNf6d1SNuhaOWQAgdLbOBP1eRWMNIJWl6SkHgI1OoN","appid":12,"events":[]}]},{"id":17,"name":"test-ecommercerec","keys":[{"key":"zPkr6sBwQoBwBjVHK2hsF9u26L38ARSe19QzkdYentuomCtYSuH0vXP5fq7advo4","appid":17,"events":[]}]}]}
+```
+
+#### Create a new app
+
+```
+$ curl -i -X POST http://localhost:7071/cmd/app \
+-H "Content-Type: application/json" \
+-d '{ "name" : "my_new_app" }'
+
+{"status":1,"message":"App created successfully.","id":19,"name":"my_new_app","keys":[{"key":"","appid":19,"events":[]}]}
+```
+
+#### Delete data of app
+
+```
+$ curl -i -X DELETE http://localhost:7071/cmd/app/my_new_app/data
+```
+
+#### Delete app
+
+```
+$ curl -i -X DELETE http://localhost:7071/cmd/app/my_new_app
+
+{"status":1,"message":"App successfully deleted"}
+```
+
+
+## API Doc (To be updated)
+
+### app list:
+GET http://localhost:7071/cmd/app
+
+OK Response:
+{
+ “status”: <STATUS>,
+ “message”: <MESSAGE>,
+ “apps” : [
+ { “name': “<APP_NAME>”,
+ “id': <APP_ID>,
+ “accessKey' : “<ACCESS_KEY>” },
+ { “name': “<APP_NAME>”,
+ “id': <APP_ID>,
+ “accessKey' : “<ACCESS_KEY>” }, ... ]
+}
+
+Error Response:
+{“status”: <STATUS>, “message” : “<MESSAGE>”}
+
+### app new
+POST http://localhost:7071/cmd/app
+Request Body:
+{ name”: “<APP_NAME>”, // required
+ “id”: <APP_ID>, // optional
+ “description”: “<DESCRIPTION>” } // optional
+
+OK Response:
+{ “status”: <STATUS>,
+ “message”: <MESSAGE>,
+ “app” : {
+ “name”: “<APP_NAME>”,
+ “id”: <APP_ID>,
+ “accessKey” : “<ACCESS_KEY>” }
+}
+
+Error Response:
+{ “status”: <STATUS>, “message” : “<MESSAGE>”}
+
+### app delete
+DELETE http://localhost:7071/cmd/app/{appName}
+
+OK Response:
+{ "status": <STATUS>, "message" : “<MESSAGE>”}
+
+Error Response:
+{ “status”: <STATUS>, “message” : “<MESSAGE>”}
+
+### app data-delete
+DELETE http://localhost:7071/cmd/app/{appName}/data
+
+OK Response:
+{ "status": <STATUS>, "message" : “<MESSAGE>”}
+
+Error Response:
+{ “status”: <STATUS>, “message” : “<MESSAGE>” }
+
+
+### train TBD
+
+#### Training request:
+POST http://localhost:7071/cmd/train
+Request body: TBD
+
+OK Response: TBD
+
+Error Response: TBD
+
+#### Get training status:
+GET http://localhost:7071/cmd/train/{engineInstanceId}
+
+OK Response: TBD
+INIT
+TRAINING
+DONE
+ERROR
+
+Error Response: TBD
+
+### deploy TBD
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala
new file mode 100644
index 0000000..a6ab83c
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala
@@ -0,0 +1,83 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools.console
+
+import org.apache.predictionio.data.storage
+
+import grizzled.slf4j.Logging
+
+case class AccessKeyArgs(
+ accessKey: String = "",
+ events: Seq[String] = Seq())
+
+object AccessKey extends Logging {
+ def create(ca: ConsoleArgs): Int = {
+ val apps = storage.Storage.getMetaDataApps
+ apps.getByName(ca.app.name) map { app =>
+ val accessKeys = storage.Storage.getMetaDataAccessKeys
+ val accessKey = accessKeys.insert(storage.AccessKey(
+ key = ca.accessKey.accessKey,
+ appid = app.id,
+ events = ca.accessKey.events))
+ accessKey map { k =>
+ info(s"Created new access key: ${k}")
+ 0
+ } getOrElse {
+ error(s"Unable to create new access key.")
+ 1
+ }
+ } getOrElse {
+ error(s"App ${ca.app.name} does not exist. Aborting.")
+ 1
+ }
+ }
+
+ def list(ca: ConsoleArgs): Int = {
+ val keys =
+ if (ca.app.name == "") {
+ storage.Storage.getMetaDataAccessKeys.getAll
+ } else {
+ val apps = storage.Storage.getMetaDataApps
+ apps.getByName(ca.app.name) map { app =>
+ storage.Storage.getMetaDataAccessKeys.getByAppid(app.id)
+ } getOrElse {
+ error(s"App ${ca.app.name} does not exist. Aborting.")
+ return 1
+ }
+ }
+ val title = "Access Key(s)"
+ info(f"$title%64s | App ID | Allowed Event(s)")
+ keys.sortBy(k => k.appid) foreach { k =>
+ val events =
+ if (k.events.size > 0) k.events.sorted.mkString(",") else "(all)"
+ info(f"${k.key}%64s | ${k.appid}%6d | $events%s")
+ }
+ info(s"Finished listing ${keys.size} access key(s).")
+ 0
+ }
+
+ def delete(ca: ConsoleArgs): Int = {
+ try {
+ storage.Storage.getMetaDataAccessKeys.delete(ca.accessKey.accessKey)
+ info(s"Deleted access key ${ca.accessKey.accessKey}.")
+ 0
+ } catch {
+ case e: Exception =>
+ error(s"Error deleting access key ${ca.accessKey.accessKey}.", e)
+ 1
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/console/App.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/App.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/App.scala
new file mode 100644
index 0000000..cc2f36d
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/console/App.scala
@@ -0,0 +1,537 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools.console
+
+import org.apache.predictionio.data.storage
+
+import grizzled.slf4j.Logging
+
+case class AppArgs(
+ id: Option[Int] = None,
+ name: String = "",
+ channel: String = "",
+ dataDeleteChannel: Option[String] = None,
+ all: Boolean = false,
+ force: Boolean = false,
+ description: Option[String] = None)
+
+object App extends Logging {
+ def create(ca: ConsoleArgs): Int = {
+ val apps = storage.Storage.getMetaDataApps()
+ // get the client in the beginning so error exit right away if can't access client
+ val events = storage.Storage.getLEvents()
+ apps.getByName(ca.app.name) map { app =>
+ error(s"App ${ca.app.name} already exists. Aborting.")
+ 1
+ } getOrElse {
+ ca.app.id.map { id =>
+ apps.get(id) map { app =>
+ error(
+ s"App ID ${id} already exists and maps to the app '${app.name}'. " +
+ "Aborting.")
+ return 1
+ }
+ }
+ val appid = apps.insert(storage.App(
+ id = ca.app.id.getOrElse(0),
+ name = ca.app.name,
+ description = ca.app.description))
+ appid map { id =>
+ val dbInit = events.init(id)
+ val r = if (dbInit) {
+ info(s"Initialized Event Store for this app ID: ${id}.")
+ val accessKeys = storage.Storage.getMetaDataAccessKeys
+ val accessKey = accessKeys.insert(storage.AccessKey(
+ key = ca.accessKey.accessKey,
+ appid = id,
+ events = Seq()))
+ accessKey map { k =>
+ info("Created new app:")
+ info(s" Name: ${ca.app.name}")
+ info(s" ID: ${id}")
+ info(s"Access Key: ${k}")
+ 0
+ } getOrElse {
+ error(s"Unable to create new access key.")
+ 1
+ }
+ } else {
+ error(s"Unable to initialize Event Store for this app ID: ${id}.")
+ // revert back the meta data change
+ try {
+ apps.delete(id)
+ 0
+ } catch {
+ case e: Exception =>
+ error(s"Failed to revert back the App meta-data change.", e)
+ error(s"The app ${ca.app.name} CANNOT be used!")
+ error(s"Please run 'pio app delete ${ca.app.name}' " +
+ "to delete this app!")
+ 1
+ }
+ }
+ events.close()
+ r
+ } getOrElse {
+ error(s"Unable to create new app.")
+ 1
+ }
+ }
+ }
+
+ def list(ca: ConsoleArgs): Int = {
+ val apps = storage.Storage.getMetaDataApps.getAll().sortBy(_.name)
+ val accessKeys = storage.Storage.getMetaDataAccessKeys
+ val title = "Name"
+ val ak = "Access Key"
+ info(f"$title%20s | ID | $ak%64s | Allowed Event(s)")
+ apps foreach { app =>
+ val keys = accessKeys.getByAppid(app.id)
+ keys foreach { k =>
+ val events =
+ if (k.events.size > 0) k.events.sorted.mkString(",") else "(all)"
+ info(f"${app.name}%20s | ${app.id}%4d | ${k.key}%64s | $events%s")
+ }
+ }
+ info(s"Finished listing ${apps.size} app(s).")
+ 0
+ }
+
+ def show(ca: ConsoleArgs): Int = {
+ val apps = storage.Storage.getMetaDataApps
+ val accessKeys = storage.Storage.getMetaDataAccessKeys
+ val channels = storage.Storage.getMetaDataChannels
+ apps.getByName(ca.app.name) map { app =>
+ info(s" App Name: ${app.name}")
+ info(s" App ID: ${app.id}")
+ info(s" Description: ${app.description.getOrElse("")}")
+ val keys = accessKeys.getByAppid(app.id)
+
+ var firstKey = true
+ keys foreach { k =>
+ val events =
+ if (k.events.size > 0) k.events.sorted.mkString(",") else "(all)"
+ if (firstKey) {
+ info(f" Access Key: ${k.key}%s | ${events}%s")
+ firstKey = false
+ } else {
+ info(f" ${k.key}%s | ${events}%s")
+ }
+ }
+
+ val chans = channels.getByAppid(app.id)
+ var firstChan = true
+ val titleName = "Channel Name"
+ val titleID = "Channel ID"
+ chans.foreach { ch =>
+ if (firstChan) {
+ info(f" Channels: ${titleName}%16s | ${titleID}%10s ")
+ firstChan = false
+ }
+ info(f" ${ch.name}%16s | ${ch.id}%10s")
+ }
+ 0
+ } getOrElse {
+ error(s"App ${ca.app.name} does not exist. Aborting.")
+ 1
+ }
+ }
+
+ def delete(ca: ConsoleArgs): Int = {
+ val apps = storage.Storage.getMetaDataApps
+ val accesskeys = storage.Storage.getMetaDataAccessKeys
+ val channels = storage.Storage.getMetaDataChannels
+ val events = storage.Storage.getLEvents()
+ val status = apps.getByName(ca.app.name) map { app =>
+ info(s"The following app (including all channels) will be deleted. Are you sure?")
+ info(s" App Name: ${app.name}")
+ info(s" App ID: ${app.id}")
+ info(s" Description: ${app.description.getOrElse("")}")
+ val chans = channels.getByAppid(app.id)
+ var firstChan = true
+ val titleName = "Channel Name"
+ val titleID = "Channel ID"
+ chans.foreach { ch =>
+ if (firstChan) {
+ info(f" Channels: ${titleName}%16s | ${titleID}%10s ")
+ firstChan = false
+ }
+ info(f" ${ch.name}%16s | ${ch.id}%10s")
+ }
+
+ val choice = if(ca.app.force) "YES" else readLine("Enter 'YES' to proceed: ")
+ choice match {
+ case "YES" => {
+ // delete channels
+ val delChannelStatus: Seq[Int] = chans.map { ch =>
+ if (events.remove(app.id, Some(ch.id))) {
+ info(s"Removed Event Store of the channel ID: ${ch.id}")
+ try {
+ channels.delete(ch.id)
+ info(s"Deleted channel ${ch.name}")
+ 0
+ } catch {
+ case e: Exception =>
+ error(s"Error deleting channel ${ch.name}.", e)
+ 1
+ }
+ } else {
+ error(s"Error removing Event Store of the channel ID: ${ch.id}.")
+ return 1
+ }
+ }
+
+ if (delChannelStatus.exists(_ != 0)) {
+ error("Error occurred while deleting channels. Aborting.")
+ return 1
+ }
+
+ try {
+ events.remove(app.id)
+ info(s"Removed Event Store for this app ID: ${app.id}")
+ } catch {
+ case e: Exception =>
+ error(s"Error removing Event Store for this app. Aborting.", e)
+ return 1
+ }
+
+ accesskeys.getByAppid(app.id) foreach { key =>
+ try {
+ accesskeys.delete(key.key)
+ info(s"Removed access key ${key.key}")
+ } catch {
+ case e: Exception =>
+ error(s"Error removing access key ${key.key}. Aborting.", e)
+ return 1
+ }
+ }
+
+ try {
+ apps.delete(app.id)
+ info(s"Deleted app ${app.name}.")
+ } catch {
+ case e: Exception =>
+ error(s"Error deleting app ${app.name}. Aborting.", e)
+ return 1
+ }
+
+ info("Done.")
+ 0
+ }
+ case _ =>
+ info("Aborted.")
+ 0
+ }
+ } getOrElse {
+ error(s"App ${ca.app.name} does not exist. Aborting.")
+ 1
+ }
+ events.close()
+ status
+ }
+
+ def dataDelete(ca: ConsoleArgs): Int = {
+ if (ca.app.all) {
+ dataDeleteAll(ca)
+ } else {
+ dataDeleteOne(ca)
+ }
+ }
+
+ def dataDeleteOne(ca: ConsoleArgs): Int = {
+ val apps = storage.Storage.getMetaDataApps
+ val channels = storage.Storage.getMetaDataChannels
+ apps.getByName(ca.app.name) map { app =>
+
+ val channelId = ca.app.dataDeleteChannel.map { ch =>
+ val channelMap = channels.getByAppid(app.id).map(c => (c.name, c.id)).toMap
+ if (!channelMap.contains(ch)) {
+ error(s"Unable to delete data for channel.")
+ error(s"Channel ${ch} doesn't exist.")
+ return 1
+ }
+
+ channelMap(ch)
+ }
+
+ if (channelId.isDefined) {
+ info(s"Data of the following channel will be deleted. Are you sure?")
+ info(s"Channel Name: ${ca.app.dataDeleteChannel.get}")
+ info(s" Channel ID: ${channelId.get}")
+ info(s" App Name: ${app.name}")
+ info(s" App ID: ${app.id}")
+ info(s" Description: ${app.description}")
+ } else {
+ info(s"Data of the following app (default channel only) will be deleted. Are you sure?")
+ info(s" App Name: ${app.name}")
+ info(s" App ID: ${app.id}")
+ info(s" Description: ${app.description}")
+ }
+
+ val choice = if(ca.app.force) "YES" else readLine("Enter 'YES' to proceed: ")
+
+ choice match {
+ case "YES" => {
+ val events = storage.Storage.getLEvents()
+ // remove table
+ val r1 = if (events.remove(app.id, channelId)) {
+ if (channelId.isDefined) {
+ info(s"Removed Event Store for this channel ID: ${channelId.get}")
+ } else {
+ info(s"Removed Event Store for this app ID: ${app.id}")
+ }
+ 0
+ } else {
+ if (channelId.isDefined) {
+ error(s"Error removing Event Store for this channel.")
+ } else {
+ error(s"Error removing Event Store for this app.")
+ }
+ 1
+ }
+ // re-create table
+ val dbInit = events.init(app.id, channelId)
+ val r2 = if (dbInit) {
+ if (channelId.isDefined) {
+ info(s"Initialized Event Store for this channel ID: ${channelId.get}.")
+ } else {
+ info(s"Initialized Event Store for this app ID: ${app.id}.")
+ }
+ 0
+ } else {
+ if (channelId.isDefined) {
+ error(s"Unable to initialize Event Store for this channel ID:" +
+ s" ${channelId.get}.")
+ } else {
+ error(s"Unable to initialize Event Store for this appId:" +
+ s" ${app.id}.")
+ }
+ 1
+ }
+ events.close()
+ info("Done.")
+ r1 + r2
+ }
+ case _ =>
+ info("Aborted.")
+ 0
+ }
+ } getOrElse {
+ error(s"App ${ca.app.name} does not exist. Aborting.")
+ 1
+ }
+ }
+
+ def dataDeleteAll(ca: ConsoleArgs): Int = {
+ val apps = storage.Storage.getMetaDataApps
+ val channels = storage.Storage.getMetaDataChannels
+ val events = storage.Storage.getLEvents()
+ val status = apps.getByName(ca.app.name) map { app =>
+ info(s"All data of the app (including default and all channels) will be deleted." +
+ " Are you sure?")
+ info(s" App Name: ${app.name}")
+ info(s" App ID: ${app.id}")
+ info(s" Description: ${app.description}")
+ val chans = channels.getByAppid(app.id)
+ var firstChan = true
+ val titleName = "Channel Name"
+ val titleID = "Channel ID"
+ chans.foreach { ch =>
+ if (firstChan) {
+ info(f" Channels: ${titleName}%16s | ${titleID}%10s ")
+ firstChan = false
+ }
+ info(f" ${ch.name}%16s | ${ch.id}%10s")
+ }
+
+ val choice = if(ca.app.force) "YES" else readLine("Enter 'YES' to proceed: ")
+ choice match {
+ case "YES" => {
+ // delete channels
+ val delChannelStatus: Seq[Int] = chans.map { ch =>
+ val r1 = if (events.remove(app.id, Some(ch.id))) {
+ info(s"Removed Event Store of the channel ID: ${ch.id}")
+ 0
+ } else {
+ error(s"Error removing Event Store of the channel ID: ${ch.id}.")
+ 1
+ }
+ // re-create table
+ val dbInit = events.init(app.id, Some(ch.id))
+ val r2 = if (dbInit) {
+ info(s"Initialized Event Store of the channel ID: ${ch.id}")
+ 0
+ } else {
+ error(s"Unable to initialize Event Store of the channel ID: ${ch.id}.")
+ 1
+ }
+ r1 + r2
+ }
+
+ if (delChannelStatus.filter(_ != 0).isEmpty) {
+ val r1 = if (events.remove(app.id)) {
+ info(s"Removed Event Store for this app ID: ${app.id}")
+ 0
+ } else {
+ error(s"Error removing Event Store for this app.")
+ 1
+ }
+
+ val dbInit = events.init(app.id)
+ val r2 = if (dbInit) {
+ info(s"Initialized Event Store for this app ID: ${app.id}.")
+ 0
+ } else {
+ error(s"Unable to initialize Event Store for this appId: ${app.id}.")
+ 1
+ }
+ info("Done.")
+ r1 + r2
+ } else 1
+ }
+ case _ =>
+ info("Aborted.")
+ 0
+ }
+ } getOrElse {
+ error(s"App ${ca.app.name} does not exist. Aborting.")
+ 1
+ }
+ events.close()
+ status
+ }
+
+ def channelNew(ca: ConsoleArgs): Int = {
+ val apps = storage.Storage.getMetaDataApps
+ val channels = storage.Storage.getMetaDataChannels
+ val events = storage.Storage.getLEvents()
+ val newChannel = ca.app.channel
+ val status = apps.getByName(ca.app.name) map { app =>
+ val channelMap = channels.getByAppid(app.id).map(c => (c.name, c.id)).toMap
+ if (channelMap.contains(newChannel)) {
+ error(s"Unable to create new channel.")
+ error(s"Channel ${newChannel} already exists.")
+ 1
+ } else if (!storage.Channel.isValidName(newChannel)) {
+ error(s"Unable to create new channel.")
+ error(s"The channel name ${newChannel} is invalid.")
+ error(s"${storage.Channel.nameConstraint}")
+ 1
+ } else {
+
+ val channelId = channels.insert(storage.Channel(
+ id = 0, // new id will be assigned
+ appid = app.id,
+ name = newChannel
+ ))
+ channelId.map { chanId =>
+ info(s"Updated Channel meta-data.")
+ // initialize storage
+ val dbInit = events.init(app.id, Some(chanId))
+ if (dbInit) {
+ info(s"Initialized Event Store for the channel: ${newChannel}.")
+ info(s"Created new channel:")
+ info(s" Channel Name: ${newChannel}")
+ info(s" Channel ID: ${chanId}")
+ info(s" App ID: ${app.id}")
+ 0
+ } else {
+ error(s"Unable to create new channel.")
+ error(s"Failed to initalize Event Store.")
+ // reverted back the meta data
+ try {
+ channels.delete(chanId)
+ 0
+ } catch {
+ case e: Exception =>
+ error(s"Failed to revert back the Channel meta-data change.", e)
+ error(s"The channel ${newChannel} CANNOT be used!")
+ error(s"Please run 'pio app channel-delete ${app.name} ${newChannel}' " +
+ "to delete this channel!")
+ 1
+ }
+ }
+ }.getOrElse {
+ error(s"Unable to create new channel.")
+ error(s"Failed to update Channel meta-data.")
+ 1
+ }
+ }
+ } getOrElse {
+ error(s"App ${ca.app.name} does not exist. Aborting.")
+ 1
+ }
+ events.close()
+ status
+ }
+
+ def channelDelete(ca: ConsoleArgs): Int = {
+ val apps = storage.Storage.getMetaDataApps
+ val channels = storage.Storage.getMetaDataChannels
+ val events = storage.Storage.getLEvents()
+ val deleteChannel = ca.app.channel
+ val status = apps.getByName(ca.app.name) map { app =>
+ val channelMap = channels.getByAppid(app.id).map(c => (c.name, c.id)).toMap
+ if (!channelMap.contains(deleteChannel)) {
+ error(s"Unable to delete channel.")
+ error(s"Channel ${deleteChannel} doesn't exist.")
+ 1
+ } else {
+ info(s"The following channel will be deleted. Are you sure?")
+ info(s" Channel Name: ${deleteChannel}")
+ info(s" Channel ID: ${channelMap(deleteChannel)}")
+ info(s" App Name: ${app.name}")
+ info(s" App ID: ${app.id}")
+ val choice = if(ca.app.force) "YES" else readLine("Enter 'YES' to proceed: ")
+ choice match {
+ case "YES" => {
+ // NOTE: remove storage first before remove meta data (in case remove storage failed)
+ val dbRemoved = events.remove(app.id, Some(channelMap(deleteChannel)))
+ if (dbRemoved) {
+ info(s"Removed Event Store for this channel: ${deleteChannel}")
+ try {
+ channels.delete(channelMap(deleteChannel))
+ info(s"Deleted channel: ${deleteChannel}.")
+ 0
+ } catch {
+ case e: Exception =>
+ error(s"Unable to delete channel.", e)
+ error(s"Failed to update Channel meta-data.")
+ error(s"The channel ${deleteChannel} CANNOT be used!")
+ error(s"Please run 'pio app channel-delete ${app.name} ${deleteChannel}' " +
+ "to delete this channel again!")
+ 1
+ }
+ } else {
+ error(s"Unable to delete channel.")
+ error(s"Error removing Event Store for this channel.")
+ 1
+ }
+ }
+ case _ =>
+ info("Aborted.")
+ 0
+ }
+ }
+ } getOrElse {
+ error(s"App ${ca.app.name} does not exist. Aborting.")
+ 1
+ }
+ events.close()
+ status
+ }
+
+}
|