Repository: incubator-predictionio
Updated Branches:
refs/heads/develop 9d0854242 -> 7615b4708
[PIO-47] [PIO-51] Stateless build and transitions
PIO-51 serves as a transitional convenience due to deprecation and
removal of engine manifests. This is an important step towards enabling
deployment on PaaS and building of template using other IDEs.
[PIO-51] Enable `pio build/train/deploy` outside of engine directory
Provide engine directory path as --engine-dir or -ed. Defaults to
current working directory.
[PIO-47] Eliminate engine manifest for stateless build
Closes #328
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/7615b470
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/7615b470
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/7615b470
Branch: refs/heads/develop
Commit: 7615b4708dae6ae30657d51194a33416a5509d1f
Parents: 9d08542
Author: Chan Lee <chanlee514@gmail.com>
Authored: Sun Jan 29 11:53:28 2017 -0800
Committer: Donald Szeto <donald@apache.org>
Committed: Sun Jan 29 11:53:28 2017 -0800
----------------------------------------------------------------------
.../predictionio/workflow/CreateServer.scala | 83 +++----
.../predictionio/workflow/index.scala.html | 9 -
.../data/storage/EngineManifests.scala | 120 ----------
.../predictionio/data/storage/Storage.scala | 4 -
.../elasticsearch/ESEngineManifests.scala | 84 -------
.../data/storage/jdbc/JDBCEngineManifests.scala | 114 ----------
tests/pio_tests/scenarios/quickstart_test.py | 35 ++-
tests/pio_tests/utils.py | 21 +-
tools/build.sbt | 6 +-
.../org/apache/predictionio/tools/Common.scala | 15 +-
.../predictionio/tools/RegisterEngine.scala | 87 --------
.../apache/predictionio/tools/RunServer.scala | 17 +-
.../apache/predictionio/tools/RunWorkflow.scala | 28 +--
.../predictionio/tools/commands/Engine.scala | 218 +++++--------------
.../predictionio/tools/console/Console.scala | 77 ++++---
.../apache/predictionio/tools/console/Pio.scala | 12 +-
.../predictionio/tools/console/main.scala.txt | 6 +-
17 files changed, 224 insertions(+), 712 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7615b470/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala
index 2f947c7..ddc8d90 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala
@@ -18,9 +18,7 @@
package org.apache.predictionio.workflow
-import java.io.PrintWriter
-import java.io.Serializable
-import java.io.StringWriter
+import java.io.{PrintWriter, Serializable, StringWriter}
import java.util.concurrent.TimeUnit
import akka.actor._
@@ -30,24 +28,15 @@ import akka.pattern.ask
import akka.util.Timeout
import com.github.nscala_time.time.Imports.DateTime
import com.twitter.bijection.Injection
-import com.twitter.chill.KryoBase
-import com.twitter.chill.KryoInjection
-import com.twitter.chill.ScalaKryoInstantiator
+import com.twitter.chill.{KryoBase, KryoInjection, ScalaKryoInstantiator}
import com.typesafe.config.ConfigFactory
import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer
import grizzled.slf4j.Logging
import org.apache.predictionio.authentication.KeyAuthentication
import org.apache.predictionio.configuration.SSLConfiguration
-import org.apache.predictionio.controller.Engine
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.controller.Utils
-import org.apache.predictionio.controller.WithPrId
-import org.apache.predictionio.core.BaseAlgorithm
-import org.apache.predictionio.core.BaseServing
-import org.apache.predictionio.core.Doer
-import org.apache.predictionio.data.storage.EngineInstance
-import org.apache.predictionio.data.storage.EngineManifest
-import org.apache.predictionio.data.storage.Storage
+import org.apache.predictionio.controller.{Engine, Params, Utils, WithPrId}
+import org.apache.predictionio.core.{BaseAlgorithm, BaseServing, Doer}
+import org.apache.predictionio.data.storage.{EngineInstance, Storage}
import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
import org.json4s._
import org.json4s.native.JsonMethods._
@@ -58,16 +47,13 @@ import spray.http.MediaTypes._
import spray.http._
import spray.httpx.Json4sSupport
import spray.routing._
-import spray.routing.authentication.{UserPass, BasicAuth}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.future
import scala.language.existentials
-import scala.util.Failure
-import scala.util.Random
-import scala.util.Success
+import scala.util.{Failure, Random, Success}
import scalaj.http.HttpOptions
class KryoInstantiator(classLoader: ClassLoader) extends ScalaKryoInstantiator {
@@ -115,7 +101,6 @@ case class ReloadServer()
object CreateServer extends Logging {
val actorSystem = ActorSystem("pio-server")
val engineInstances = Storage.getMetaDataEngineInstances
- val engineManifests = Storage.getMetaDataEngineManifests
val modeldata = Storage.getModelDataModels
def main(args: Array[String]): Unit = {
@@ -183,21 +168,16 @@ object CreateServer extends Logging {
val engineId = sc.engineId.getOrElse(engineInstance.engineId)
val engineVersion = sc.engineVersion.getOrElse(
engineInstance.engineVersion)
- engineManifests.get(engineId, engineVersion) map { manifest =>
- val engineFactoryName = engineInstance.engineFactory
- val master = actorSystem.actorOf(Props(
- classOf[MasterActor],
- sc,
- engineInstance,
- engineFactoryName,
- manifest),
- "master")
- implicit val timeout = Timeout(5.seconds)
- master ? StartServer()
- actorSystem.awaitTermination
- } getOrElse {
- error(s"Invalid engine ID or version. Aborting server.")
- }
+ val engineFactoryName = engineInstance.engineFactory
+ val master = actorSystem.actorOf(Props(
+ classOf[MasterActor],
+ sc,
+ engineInstance,
+ engineFactoryName),
+ "master")
+ implicit val timeout = Timeout(5.seconds)
+ master ? StartServer()
+ actorSystem.awaitTermination
} getOrElse {
error(s"Invalid engine instance ID. Aborting server.")
}
@@ -208,10 +188,10 @@ object CreateServer extends Logging {
sc: ServerConfig,
engineInstance: EngineInstance,
engine: Engine[TD, EIN, PD, Q, P, A],
- engineLanguage: EngineLanguage.Value,
- manifest: EngineManifest): ActorRef = {
+ engineLanguage: EngineLanguage.Value): ActorRef = {
- val engineParams = engine.engineInstanceToEngineParams(engineInstance, sc.jsonExtractor)
+ val engineParams = engine.engineInstanceToEngineParams(
+ engineInstance, sc.jsonExtractor)
val kryo = KryoInstantiator.newKryoInjection
@@ -255,7 +235,6 @@ object CreateServer extends Logging {
engineInstance,
engine,
engineLanguage,
- manifest,
engineParams.dataSourceParams._2,
engineParams.preparatorParams._2,
algorithms,
@@ -269,8 +248,7 @@ object CreateServer extends Logging {
class MasterActor (
sc: ServerConfig,
engineInstance: EngineInstance,
- engineFactoryName: String,
- manifest: EngineManifest) extends Actor with SSLConfiguration with KeyAuthentication {
+ engineFactoryName: String) extends Actor with SSLConfiguration with KeyAuthentication {
val log = Logging(context.system, this)
implicit val system = context.system
var sprayHttpListener: Option[ActorRef] = None
@@ -312,8 +290,7 @@ class MasterActor (
val actor = createServerActor(
sc,
engineInstance,
- engineFactoryName,
- manifest)
+ engineFactoryName)
currentServerActor = Some(actor)
undeploy(sc.ip, sc.port)
self ! BindServer()
@@ -341,11 +318,11 @@ class MasterActor (
log.info("Reload server command received.")
val latestEngineInstance =
CreateServer.engineInstances.getLatestCompleted(
- manifest.id,
- manifest.version,
+ engineInstance.engineId,
+ engineInstance.engineVersion,
engineInstance.engineVariant)
latestEngineInstance map { lr =>
- val actor = createServerActor(sc, lr, engineFactoryName, manifest)
+ val actor = createServerActor(sc, lr, engineFactoryName)
sprayHttpListener.map { l =>
l ! Http.Unbind(5.seconds)
val settings = ServerSettings(system)
@@ -361,8 +338,8 @@ class MasterActor (
}
} getOrElse {
log.warning(
- s"No latest completed engine instance for ${manifest.id} " +
- s"${manifest.version}. Abort reloading.")
+ s"No latest completed engine instance for ${engineInstance.engineId} " +
+ s"${engineInstance.engineVersion}. Abort reloading.")
}
case x: Http.Bound =>
val serverUrl = s"${protocol}${sc.ip}:${sc.port}"
@@ -384,8 +361,7 @@ class MasterActor (
def createServerActor(
sc: ServerConfig,
engineInstance: EngineInstance,
- engineFactoryName: String,
- manifest: EngineManifest): ActorRef = {
+ engineFactoryName: String): ActorRef = {
val (engineLanguage, engineFactory) =
WorkflowUtils.getEngine(engineFactoryName, getClass.getClassLoader)
val engine = engineFactory()
@@ -402,8 +378,7 @@ class MasterActor (
engineInstance,
// engine,
deployableEngine,
- engineLanguage,
- manifest)
+ engineLanguage)
}
}
@@ -412,7 +387,6 @@ class ServerActor[Q, P](
val engineInstance: EngineInstance,
val engine: Engine[_, _, _, Q, P, _],
val engineLanguage: EngineLanguage.Value,
- val manifest: EngineManifest,
val dataSourceParams: Params,
val preparatorParams: Params,
val algorithms: Seq[BaseAlgorithm[_, _, Q, P]],
@@ -474,7 +448,6 @@ class ServerActor[Q, P](
complete {
html.index(
args,
- manifest,
engineInstance,
algorithms.map(_.toString),
algorithmsParams.map(_.toString),
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7615b470/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html
----------------------------------------------------------------------
diff --git a/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html b/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html
index aacf4fb..62fe5a7 100644
--- a/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html
+++ b/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html
@@ -1,10 +1,8 @@
@import org.apache.predictionio.data.storage.EngineInstance
-@import org.apache.predictionio.data.storage.EngineManifest
@import org.apache.predictionio.workflow.ServerConfig
@import org.joda.time.DateTime
@import org.joda.time.format.DateTimeFormat
@(args: ServerConfig,
- manifest: EngineManifest,
engineInstance: EngineInstance,
algorithms: Seq[String],
algorithmsParams: Seq[String],
@@ -65,13 +63,6 @@
<tr><th>Average Serving Time</th><td>@{f"${avgServingSec}%.4f"} seconds</td></tr>
<tr><th>Last Serving Time</th><td>@{f"${lastServingSec}%.4f"} seconds</td></tr>
<tr><th>Engine Factory Class (Scala/Java)</th><td>@{engineInstance.engineFactory}</td></tr>
- <tr>
- <th rowspan="@(manifest.files.size)">Library Files</th>
- <td>@{manifest.files.head}</td>
- </tr>
- @for(f <- manifest.files.drop(1)) {
- <tr><td>@f</td></tr>
- }
</table>
<h2>Data Source</h2>
<table class="table table-bordered table-striped">
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7615b470/data/src/main/scala/org/apache/predictionio/data/storage/EngineManifests.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EngineManifests.scala
deleted file mode 100644
index 9f5ab18..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/storage/EngineManifests.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage
-
-import org.apache.predictionio.annotation.DeveloperApi
-import org.json4s._
-
-/** :: DeveloperApi ::
- * Provides a way to discover engines by ID and version in a distributed
- * environment
- *
- * @param id Unique identifier of an engine.
- * @param version Engine version string.
- * @param name A short and descriptive name for the engine.
- * @param description A long description of the engine.
- * @param files Paths to engine files.
- * @param engineFactory Engine's factory class name.
- * @group Meta Data
- */
-@DeveloperApi
-case class EngineManifest(
- id: String,
- version: String,
- name: String,
- description: Option[String],
- files: Seq[String],
- engineFactory: String)
-
-/** :: DeveloperApi ::
- * Base trait of the [[EngineManifest]] data access object
- *
- * @group Meta Data
- */
-@DeveloperApi
-trait EngineManifests {
- /** Inserts an [[EngineManifest]] */
- def insert(engineManifest: EngineManifest): Unit
-
- /** Get an [[EngineManifest]] by its ID */
- def get(id: String, version: String): Option[EngineManifest]
-
- /** Get all [[EngineManifest]] */
- def getAll(): Seq[EngineManifest]
-
- /** Updates an [[EngineManifest]] */
- def update(engineInfo: EngineManifest, upsert: Boolean = false): Unit
-
- /** Delete an [[EngineManifest]] by its ID */
- def delete(id: String, version: String): Unit
-}
-
-/** :: DeveloperApi ::
- * JSON4S serializer for [[EngineManifest]]
- *
- * @group Meta Data
- */
-@DeveloperApi
-class EngineManifestSerializer
- extends CustomSerializer[EngineManifest](format => (
- {
- case JObject(fields) =>
- val seed = EngineManifest(
- id = "",
- version = "",
- name = "",
- description = None,
- files = Nil,
- engineFactory = "")
- fields.foldLeft(seed) { case (enginemanifest, field) =>
- field match {
- case JField("id", JString(id)) => enginemanifest.copy(id = id)
- case JField("version", JString(version)) =>
- enginemanifest.copy(version = version)
- case JField("name", JString(name)) => enginemanifest.copy(name = name)
- case JField("description", JString(description)) =>
- enginemanifest.copy(description = Some(description))
- case JField("files", JArray(s)) =>
- enginemanifest.copy(files = s.map(t =>
- t match {
- case JString(file) => file
- case _ => ""
- }
- ))
- case JField("engineFactory", JString(engineFactory)) =>
- enginemanifest.copy(engineFactory = engineFactory)
- case _ => enginemanifest
- }
- }
- },
- {
- case enginemanifest: EngineManifest =>
- JObject(
- JField("id", JString(enginemanifest.id)) ::
- JField("version", JString(enginemanifest.version)) ::
- JField("name", JString(enginemanifest.name)) ::
- JField("description",
- enginemanifest.description.map(
- x => JString(x)).getOrElse(JNothing)) ::
- JField("files",
- JArray(enginemanifest.files.map(x => JString(x)).toList)) ::
- JField("engineFactory", JString(enginemanifest.engineFactory)) ::
- Nil)
- }
-))
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7615b470/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
index 72a42a7..3374f5f 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
@@ -338,7 +338,6 @@ object Storage extends Logging {
private[predictionio] def verifyAllDataObjects(): Unit = {
info("Verifying Meta Data Backend (Source: " +
s"${repositoriesToDataObjectMeta(MetaDataRepository).sourceName})...")
- getMetaDataEngineManifests()
getMetaDataEngineInstances()
getMetaDataEvaluationInstances()
getMetaDataApps()
@@ -360,9 +359,6 @@ object Storage extends Logging {
eventsDb.close()
}
- private[predictionio] def getMetaDataEngineManifests(): EngineManifests =
- getDataObjectFromRepo[EngineManifests](MetaDataRepository)
-
private[predictionio] def getMetaDataEngineInstances(): EngineInstances =
getDataObjectFromRepo[EngineInstances](MetaDataRepository)
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7615b470/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
deleted file mode 100644
index 65b6691..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.EngineManifestSerializer
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.apache.predictionio.data.storage.EngineManifest
-import org.apache.predictionio.data.storage.EngineManifests
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.json4s._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-class ESEngineManifests(client: Client, config: StorageClientConfig, index: String)
- extends EngineManifests with Logging {
- implicit val formats = DefaultFormats + new EngineManifestSerializer
- private val estype = "engine_manifests"
- private def esid(id: String, version: String) = s"$id $version"
-
- def insert(engineManifest: EngineManifest): Unit = {
- val json = write(engineManifest)
- val response = client.prepareIndex(
- index,
- estype,
- esid(engineManifest.id, engineManifest.version)).
- setSource(json).execute().actionGet()
- }
-
- def get(id: String, version: String): Option[EngineManifest] = {
- try {
- val response = client.prepareGet(index, estype, esid(id, version)).
- execute().actionGet()
- if (response.isExists) {
- Some(read[EngineManifest](response.getSourceAsString))
- } else {
- None
- }
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- None
- }
- }
-
- def getAll(): Seq[EngineManifest] = {
- try {
- val builder = client.prepareSearch()
- ESUtils.getAll[EngineManifest](client, builder)
- } catch {
- case e: ElasticsearchException =>
- error(e.getMessage)
- Seq()
- }
- }
-
- def update(engineManifest: EngineManifest, upsert: Boolean = false): Unit =
- insert(engineManifest)
-
- def delete(id: String, version: String): Unit = {
- try {
- client.prepareDelete(index, estype, esid(id, version)).execute().actionGet()
- } catch {
- case e: ElasticsearchException => error(e.getMessage)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7615b470/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineManifests.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineManifests.scala
deleted file mode 100644
index b77da9d..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineManifests.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.jdbc
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.EngineManifest
-import org.apache.predictionio.data.storage.EngineManifests
-import org.apache.predictionio.data.storage.StorageClientConfig
-import scalikejdbc._
-
-/** JDBC implementation of [[EngineManifests]] */
-class JDBCEngineManifests(client: String, config: StorageClientConfig, prefix: String)
- extends EngineManifests with Logging {
- /** Database table name for this data access object */
- val tableName = JDBCUtils.prefixTableName(prefix, "enginemanifests")
- DB autoCommit { implicit session =>
- sql"""
- create table if not exists $tableName (
- id varchar(100) not null primary key,
- version text not null,
- engineName text not null,
- description text,
- files text not null,
- engineFactory text not null)""".execute().apply()
- }
-
- def insert(m: EngineManifest): Unit = DB localTx { implicit session =>
- sql"""
- INSERT INTO $tableName VALUES(
- ${m.id},
- ${m.version},
- ${m.name},
- ${m.description},
- ${m.files.mkString(",")},
- ${m.engineFactory})""".update().apply()
- }
-
- def get(id: String, version: String): Option[EngineManifest] = DB localTx { implicit session =>
- sql"""
- SELECT
- id,
- version,
- engineName,
- description,
- files,
- engineFactory
- FROM $tableName WHERE id = $id AND version = $version""".
- map(resultToEngineManifest).single().apply()
- }
-
- def getAll(): Seq[EngineManifest] = DB localTx { implicit session =>
- sql"""
- SELECT
- id,
- version,
- engineName,
- description,
- files,
- engineFactory
- FROM $tableName""".map(resultToEngineManifest).list().apply()
- }
-
- def update(m: EngineManifest, upsert: Boolean = false): Unit = {
- var r = 0
- DB localTx { implicit session =>
- r = sql"""
- update $tableName set
- engineName = ${m.name},
- description = ${m.description},
- files = ${m.files.mkString(",")},
- engineFactory = ${m.engineFactory}
- where id = ${m.id} and version = ${m.version}""".update().apply()
- }
- if (r == 0) {
- if (upsert) {
- insert(m)
- } else {
- error("Cannot find a record to update, and upsert is not enabled.")
- }
- }
- }
-
- def delete(id: String, version: String): Unit = DB localTx { implicit session =>
- sql"DELETE FROM $tableName WHERE id = $id AND version = $version".
- update().apply()
- }
-
- /** Convert JDBC results to [[EngineManifest]] */
- def resultToEngineManifest(rs: WrappedResultSet): EngineManifest = {
- EngineManifest(
- id = rs.string("id"),
- version = rs.string("version"),
- name = rs.string("engineName"),
- description = rs.stringOpt("description"),
- files = rs.string("files").split(","),
- engineFactory = rs.string("engineFactory"))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7615b470/tests/pio_tests/scenarios/quickstart_test.py
----------------------------------------------------------------------
diff --git a/tests/pio_tests/scenarios/quickstart_test.py b/tests/pio_tests/scenarios/quickstart_test.py
index 04d80d3..1c5f422 100644
--- a/tests/pio_tests/scenarios/quickstart_test.py
+++ b/tests/pio_tests/scenarios/quickstart_test.py
@@ -73,6 +73,36 @@ class QuickStartTest(BaseTestCase):
self.app = AppEngine(self.test_context, app_context)
+ def engine_dir_test(self):
+ self.log.info("Stopping deployed engine")
+ self.app.stop()
+
+ self.log.info("Creating dummy directory")
+ engine_path = self.app.engine_path
+ dummy_path = "{}/dummy".format(engine_path)
+ srun("mkdir -p {}".format(dummy_path))
+
+ self.log.info("Testing pio commands in dummy directory with " +
+ "--engine-dir argument")
+ self.app.engine_path = dummy_path
+ self.log.info("Building an engine...")
+ self.app.build(engine_dir=engine_path)
+ self.log.info("Training...")
+ self.app.train(engine_dir=engine_path)
+ self.log.info("Deploying and waiting 15s for it to start...")
+ self.app.deploy(wait_time=15, engine_dir=engine_path)
+
+ self.log.info("Sending a single query and checking results")
+ user_query = { "user": 1, "num": 4 }
+ r = self.app.query(user_query)
+ self.assertEqual(200, r.status_code)
+ result = r.json()
+ self.assertEqual(4, len(result['itemScores']))
+
+ self.log.info("Deleting dummy directory")
+ srun("rm -rf {}".format(dummy_path))
+ self.app.engine_path = engine_path
+
def runTest(self):
self.log.info("Adding a new application")
self.app.new()
@@ -113,7 +143,7 @@ class QuickStartTest(BaseTestCase):
r = self.app.send_event(ev)
self.assertEqual(201, r.status_code)
- self.log.info("Checking the number of events stored on the server after the update")
+ self.log.info("Checking the number of events stored on eventserver")
r = self.app.get_events(params={'limit': -1})
self.assertEquals(200, r.status_code)
stored_events = r.json()
@@ -126,6 +156,9 @@ class QuickStartTest(BaseTestCase):
self.log.info("Deploying and waiting 15s for it to start...")
self.app.deploy(wait_time=15)
+ self.log.info("Testing pio commands outside of engine directory")
+ self.engine_dir_test()
+
self.log.info("Sending a single query and checking results")
user_query = { "user": 1, "num": 4 }
r = self.app.query(user_query)
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7615b470/tests/pio_tests/utils.py
----------------------------------------------------------------------
diff --git a/tests/pio_tests/utils.py b/tests/pio_tests/utils.py
index e61f28e..e6c5b0b 100644
--- a/tests/pio_tests/utils.py
+++ b/tests/pio_tests/utils.py
@@ -248,18 +248,19 @@ class AppEngine:
def delete(self):
srun('pio app delete {0} --force'.format(self.app_context.name))
- def build(self, sbt_extra=None, clean=False, no_asm=True):
- srun('cd {0}; pio build {1} {2} {3}'.format(
+ def build(self, sbt_extra=None, clean=False, no_asm=True, engine_dir=None):
+ srun('cd {0}; pio build {1} {2} {3} {4}'.format(
self.engine_path,
'--sbt-extra {}'.format(sbt_extra) if sbt_extra else '',
'--clean' if clean else '',
- '--no-asm' if no_asm else ''))
+ '--no-asm' if no_asm else '',
+ '--engine-dir {}'.format(engine_dir) if engine_dir else ''))
def train(self, batch=None, skip_sanity_check=False, stop_after_read=False,
stop_after_prepare=False, engine_factory=None,
- engine_params_key=None, scratch_uri=None):
+ engine_params_key=None, scratch_uri=None, engine_dir=None):
- srun('cd {}; pio train {} {} {} {} {} {} {}'.format(
+ srun('cd {0}; pio train {1} {2} {3} {4} {5} {6} {7} {8}'.format(
self.engine_path,
'--batch {}'.format(batch) if batch else '',
'--skip-sanity-check' if skip_sanity_check else '',
@@ -267,13 +268,14 @@ class AppEngine:
'--stop-after-prepare' if stop_after_prepare else '',
'--engine_factory {}'.format(engine_factory) if engine_factory else '',
'--engine-params-key {}'.format(engine_params_key) if engine_params_key else '',
- '--scratch-uri {}'.format(scratch_uri) if scratch_uri else ''))
+ '--scratch-uri {}'.format(scratch_uri) if scratch_uri else '',
+ '--engine-dir {}'.format(engine_dir) if engine_dir else ''))
def deploy(self, wait_time=0, ip=None, port=None, engine_instance_id=None,
feedback=False, accesskey=None, event_server_ip=None, event_server_port=None,
- batch=None, scratch_uri=None):
+ batch=None, scratch_uri=None, engine_dir=None):
- command = 'cd {}; pio deploy {} {} {} {} {} {} {} {} {}'.format(
+ command = 'cd {0}; pio deploy {1} {2} {3} {4} {5} {6} {7} {8} {9} {10}'.format(
self.engine_path,
'--ip {}'.format(ip) if ip else '',
'--port {}'.format(port) if port else '',
@@ -283,7 +285,8 @@ class AppEngine:
'--event-server-ip {}'.format(event_server_ip) if event_server_ip else '',
'--event-server-port {}'.format(event_server_port) if event_server_port else '',
'--batch {}'.format(bach) if batch else '',
- '--scratch-uri {}'.format(scratch_uri) if scratch_uri else '')
+ '--scratch-uri {}'.format(scratch_uri) if scratch_uri else '',
+ '--engine-dir {}'.format(engine_dir) if engine_dir else '')
self.deployed_process = srun_bg(command)
time.sleep(wait_time)
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7615b470/tools/build.sbt
----------------------------------------------------------------------
diff --git a/tools/build.sbt b/tools/build.sbt
index fefdb45..57e7d96 100644
--- a/tools/build.sbt
+++ b/tools/build.sbt
@@ -23,7 +23,7 @@ libraryDependencies ++= Seq(
"com.github.scopt" %% "scopt" % "3.2.0",
"io.spray" %% "spray-can" % "1.3.3",
"io.spray" %% "spray-routing" % "1.3.3",
- "me.lessis" % "semverfi_2.10" % "0.1.3",
+ "me.lessis" % "semverfi_2.10" % "0.1.3",
"org.apache.hadoop" % "hadoop-common" % "2.6.2",
"org.apache.hadoop" % "hadoop-hdfs" % "2.6.2",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
@@ -33,8 +33,8 @@ libraryDependencies ++= Seq(
"org.json4s" %% "json4s-ext" % json4sVersion.value,
"org.scalaj" %% "scalaj-http" % "1.1.6",
"org.spark-project.akka" %% "akka-actor" % "2.3.4-spark",
- "io.spray" %% "spray-testkit" % "1.3.3" % "test",
- "org.specs2" %% "specs2" % "2.3.13" % "test",
+ "io.spray" %% "spray-testkit" % "1.3.3" % "test",
+ "org.specs2" %% "specs2" % "2.3.13" % "test",
"org.spark-project.akka" %% "akka-slf4j" % "2.3.4-spark")
dependencyOverrides += "org.slf4j" % "slf4j-log4j12" % "1.7.18"
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7615b470/tools/src/main/scala/org/apache/predictionio/tools/Common.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/Common.scala b/tools/src/main/scala/org/apache/predictionio/tools/Common.scala
index c379138..6c56615 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/Common.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/Common.scala
@@ -73,7 +73,11 @@ object Common extends EitherLogging {
versionNoPatch.getOrElse(fullVersion)
}
- def jarFilesForScala: Array[File] = {
+ def getEngineDirPath(directory: Option[String]): String = {
+ new File(directory.getOrElse(".")).getCanonicalPath
+ }
+
+ def jarFilesForScala(directory: String): Array[File] = {
def recursiveListFiles(f: File): Array[File] = {
Option(f.listFiles) map { these =>
these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles)
@@ -87,10 +91,13 @@ object Common extends EitherLogging {
def jarFilesAt(path: File): Array[File] = recursiveListFiles(path) filter {
_.getName.toLowerCase.endsWith(".jar")
}
- val libFiles = jarFilesForScalaFilter(jarFilesAt(new File("lib")))
+
+ val engineDir = getEngineDirPath(Some(directory))
+ val libFiles = jarFilesForScalaFilter(
+ jarFilesAt(new File(engineDir, "lib")))
val scalaVersionNoPatch = Common.versionNoPatch(BuildInfo.scalaVersion)
- val targetFiles = jarFilesForScalaFilter(jarFilesAt(new File("target" +
- File.separator + s"scala-${scalaVersionNoPatch}")))
+ val targetFiles = jarFilesForScalaFilter(jarFilesAt(new File(engineDir,
+ "target" + File.separator + s"scala-${scalaVersionNoPatch}")))
// Use libFiles is target is empty.
if (targetFiles.size > 0) targetFiles else libFiles
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7615b470/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala b/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala
deleted file mode 100644
index 334981c..0000000
--- a/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.tools
-
-import java.io.File
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.EngineManifest
-import org.apache.predictionio.data.storage.EngineManifestSerializer
-import org.apache.predictionio.data.storage.Storage
-import org.apache.predictionio.tools.ReturnTypes._
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
-import org.json4s._
-import org.json4s.native.Serialization.read
-
-import scala.io.Source
-
-object RegisterEngine extends EitherLogging {
- val engineManifests = Storage.getMetaDataEngineManifests
- implicit val formats = DefaultFormats + new EngineManifestSerializer
-
- def registerEngine(
- jsonManifest: File,
- engineFiles: Seq[File],
- copyLocal: Boolean = false): MaybeError = {
- val jsonString = try {
- Source.fromFile(jsonManifest).mkString
- } catch {
- case e: java.io.FileNotFoundException =>
- return logAndFail(s"Engine manifest file not found: ${e.getMessage}. Aborting.")
- }
- val engineManifest = read[EngineManifest](jsonString)
-
- info(s"Registering engine ${engineManifest.id} ${engineManifest.version}")
- engineManifests.update(
- engineManifest.copy(files = engineFiles.map(_.toURI.toString)), true)
- Success
- }
-
- def unregisterEngine(jsonManifest: File): MaybeError = {
- val jsonString = try {
- Source.fromFile(jsonManifest).mkString
- } catch {
- case e: java.io.FileNotFoundException =>
- return logAndFail(s"Engine manifest file not found: ${e.getMessage}. Aborting.")
- }
- val fileEngineManifest = read[EngineManifest](jsonString)
- val engineManifest = engineManifests.get(
- fileEngineManifest.id,
- fileEngineManifest.version)
-
- engineManifest map { em =>
- val conf = new Configuration
- val fs = FileSystem.get(conf)
-
- em.files foreach { f =>
- val path = new Path(f)
- info(s"Removing ${f}")
- fs.delete(path, false)
- }
-
- engineManifests.delete(em.id, em.version)
- logAndSucceed(s"Unregistered engine ${em.id} ${em.version}")
- } getOrElse {
- logAndFail(s"${fileEngineManifest.id} ${fileEngineManifest.version} is not " +
- "registered.")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7615b470/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala
index 1431432..1c70974 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala
@@ -18,17 +18,16 @@
package org.apache.predictionio.tools
-import java.io.File
-import java.net.URI
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.EngineManifest
+import org.apache.predictionio.tools.Common._
import org.apache.predictionio.tools.ReturnTypes._
-import org.apache.predictionio.tools.console.ConsoleArgs
import org.apache.predictionio.workflow.WorkflowUtils
import org.apache.predictionio.workflow.JsonExtractorOption
import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
+import java.io.File
+import java.net.URI
+import grizzled.slf4j.Logging
+
import scala.sys.process._
case class DeployArgs(
@@ -58,18 +57,18 @@ object RunServer extends Logging {
engineInstanceId: String,
serverArgs: ServerArgs,
sparkArgs: SparkArgs,
- em: EngineManifest,
pioHome: String,
+ engineDirPath: String,
verbose: Boolean = false): Expected[(Process, () => Unit)] = {
- val jarFiles = em.files.map(new URI(_)) ++
+ val jarFiles = jarFilesForScala(engineDirPath).map(_.toURI) ++
Option(new File(pioHome, "plugins").listFiles())
.getOrElse(Array.empty[File]).map(_.toURI)
val args = Seq(
"--engineInstanceId",
engineInstanceId,
"--engine-variant",
- serverArgs.variantJson.toURI.toString,
+ engineDirPath + File.separator + serverArgs.variantJson.getName,
"--ip",
serverArgs.deploy.ip,
"--port",
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7615b470/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
index 8b8d769..97831a6 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
@@ -15,19 +15,18 @@
* limitations under the License.
*/
-
package org.apache.predictionio.tools
+import org.apache.predictionio.tools.console.Console
+import org.apache.predictionio.tools.Runner
+import org.apache.predictionio.tools.Common._
+import org.apache.predictionio.tools.ReturnTypes._
+import org.apache.predictionio.workflow.{WorkflowUtils, JsonExtractorOption}
+import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
+
import java.io.File
import java.net.URI
-
import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.EngineManifest
-import org.apache.predictionio.tools.console.ConsoleArgs
-import org.apache.predictionio.tools.ReturnTypes._
-import org.apache.predictionio.workflow.WorkflowUtils
-import org.apache.predictionio.workflow.JsonExtractorOption
-import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
@@ -49,22 +48,23 @@ case class WorkflowArgs(
jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both)
object RunWorkflow extends Logging {
-
def runWorkflow(
wa: WorkflowArgs,
sa: SparkArgs,
- em: EngineManifest,
pioHome: String,
+ engineDirPath: String,
verbose: Boolean = false): Expected[(Process, () => Unit)] = {
- val jarFiles = em.files.map(new URI(_))
+ val jarFiles = jarFilesForScala(engineDirPath).map(_.toURI)
+ val variantJson = engineDirPath + File.separator + wa.variantJson.getName
+ val ei = Console.getEngineInfo(new File(variantJson))
val args = Seq(
"--engine-id",
- em.id,
+ ei.engineId,
"--engine-version",
- em.version,
+ ei.engineVersion,
"--engine-variant",
- wa.variantJson.toURI.toString,
+ variantJson,
"--verbosity",
wa.verbosity.toString) ++
wa.engineFactory.map(
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7615b470/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala
index 6fd8977..4656457 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala
@@ -20,14 +20,12 @@ package org.apache.predictionio.tools.commands
import org.apache.predictionio.core.BuildInfo
import org.apache.predictionio.controller.Utils
import org.apache.predictionio.data.storage
-import org.apache.predictionio.data.storage.EngineManifest
-import org.apache.predictionio.data.storage.EngineManifestSerializer
-import org.apache.predictionio.tools.RegisterEngine
import org.apache.predictionio.tools.EitherLogging
import org.apache.predictionio.tools.{RunWorkflow, RunServer}
import org.apache.predictionio.tools.{DeployArgs, WorkflowArgs, SparkArgs, ServerArgs}
-import org.apache.predictionio.tools.ReturnTypes._
+import org.apache.predictionio.tools.console.Console
import org.apache.predictionio.tools.Common._
+import org.apache.predictionio.tools.ReturnTypes._
import org.apache.predictionio.workflow.WorkflowUtils
import org.apache.commons.io.FileUtils
@@ -52,96 +50,12 @@ case class BuildArgs(
forceGeneratePIOSbt: Boolean = false)
case class EngineArgs(
- manifestJson: File = new File("manifest.json"),
engineId: Option[String] = None,
- engineVersion: Option[String] = None)
+ engineVersion: Option[String] = None,
+ engineDir: Option[String] = None)
object Engine extends EitherLogging {
- private val manifestAutogenTag = "pio-autogen-manifest"
-
- private def readManifestJson(json: File): Expected[EngineManifest] = {
- implicit val formats = Utils.json4sDefaultFormats +
- new EngineManifestSerializer
- try {
- Right(read[EngineManifest](Source.fromFile(json).mkString))
- } catch {
- case e: java.io.FileNotFoundException =>
- logAndFail(s"${json.getCanonicalPath} does not exist. Aborting.")
- case e: MappingException =>
- logAndFail(s"${json.getCanonicalPath} has invalid content: " +
- e.getMessage)
- }
- }
-
- private def withRegisteredManifest[T](ea: EngineArgs)(
- op: EngineManifest => Expected[T]): Expected[T] = {
- val res: Expected[Expected[T]] = for {
- ej <- readManifestJson(ea.manifestJson).right
- id <- Right(ea.engineId getOrElse ej.id).right
- version <- Right(ea.engineVersion getOrElse ej.version).right
- manifest <- storage.Storage.getMetaDataEngineManifests.get(id, version)
- .toRight {
- val errStr =
- s"""Engine ${id} ${version} cannot be found in the system.")
- |Possible reasons:
- |- the engine is not yet built by the 'build' command;
- |- the meta data store is offline."""
- error(errStr)
- errStr
- }.right
- } yield {
- op(manifest)
- }
- res.joinRight
- }
-
- private def generateManifestJson(json: File): MaybeError = {
- val cwd = sys.props("user.dir")
- implicit val formats = Utils.json4sDefaultFormats +
- new EngineManifestSerializer
- val rand = Random.alphanumeric.take(32).mkString
- val ha = java.security.MessageDigest.getInstance("SHA-1").
- digest(cwd.getBytes).map("%02x".format(_)).mkString
- val em = EngineManifest(
- id = rand,
- version = ha,
- name = new File(cwd).getName,
- description = Some(manifestAutogenTag),
- files = Seq(),
- engineFactory = "")
- try {
- FileUtils.writeStringToFile(json, write(em), "ISO-8859-1")
- Success
- } catch {
- case e: java.io.IOException =>
- logAndFail(s"Cannot generate ${json} automatically (${e.getMessage}). " +
- "Aborting.")
- }
- }
-
- private def regenerateManifestJson(json: File): MaybeError = {
- val cwd = sys.props("user.dir")
- val ha = java.security.MessageDigest.getInstance("SHA-1").
- digest(cwd.getBytes).map("%02x".format(_)).mkString
- if (json.exists) {
- readManifestJson(json).right.flatMap { em =>
- if (em.description == Some(manifestAutogenTag) && ha != em.version) {
- warn("This engine project directory contains an auto-generated " +
- "manifest that has been copied/moved from another location. ")
- warn("Regenerating the manifest to reflect the updated location. " +
- "This will dissociate with all previous engine instances.")
- generateManifestJson(json)
- } else {
- logAndSucceed(s"Using existing engine manifest JSON at " +
- "${json.getCanonicalPath}")
- }
- }
- } else {
- generateManifestJson(json)
- }
- }
-
private def detectSbt(sbt: Option[File], pioHome: String): String = {
sbt map {
_.getCanonicalPath
@@ -156,11 +70,16 @@ object Engine extends EitherLogging {
}
private def compile(
- buildArgs: BuildArgs, pioHome: String, verbose: Boolean): MaybeError = {
- // only add pioVersion to sbt if project/pio.sbt exists
- if (new File("project", "pio-build.sbt").exists || buildArgs.forceGeneratePIOSbt) {
+ buildArgs: BuildArgs,
+ pioHome: String,
+ engineDirPath: String,
+ verbose: Boolean): MaybeError = {
+
+ val f = new File(
+ Seq(engineDirPath, "project", "pio-build.sbt").mkString(File.separator))
+ if (f.exists || buildArgs.forceGeneratePIOSbt) {
FileUtils.writeLines(
- new File("pio.sbt"),
+ new File(engineDirPath, "pio.sbt"),
Seq(
"// Generated automatically by pio build.",
"// Changes in this file will be overridden.",
@@ -170,7 +89,7 @@ object Engine extends EitherLogging {
implicit val formats = Utils.json4sDefaultFormats
val sbt = detectSbt(buildArgs.sbt, pioHome)
- info(s"Using command '${sbt}' at the current working directory to build.")
+ info(s"Using command '${sbt}' at ${engineDirPath} to build.")
info("If the path above is incorrect, this process will fail.")
val asm =
if (buildArgs.sbtAssemblyPackageDependency) {
@@ -181,10 +100,10 @@ object Engine extends EitherLogging {
val clean = if (buildArgs.sbtClean) " clean" else ""
val buildCmd = s"${sbt} ${buildArgs.sbtExtra.getOrElse("")}${clean} " +
(if (buildArgs.uberJar) "assembly" else s"package${asm}")
- val core = new File(s"pio-assembly-${BuildInfo.version}.jar")
+ val core = new File(engineDirPath, s"pio-assembly-${BuildInfo.version}.jar")
if (buildArgs.uberJar) {
info(s"Uber JAR enabled. Putting ${core.getName} in lib.")
- val dst = new File("lib")
+ val dst = new File(engineDirPath, "lib")
dst.mkdir()
coreAssembly(pioHome) match {
case Right(coreFile) =>
@@ -195,7 +114,7 @@ object Engine extends EitherLogging {
case Left(errStr) => return Left(errStr)
}
} else {
- if (new File("engine.json").exists()) {
+ if (new File(engineDirPath, "engine.json").exists()) {
info(s"Uber JAR disabled. Making sure lib/${core.getName} is absent.")
new File("lib", core.getName).delete()
} else {
@@ -203,13 +122,14 @@ object Engine extends EitherLogging {
s"like an engine project directory. Please delete lib/${core.getName} manually.")
}
}
- info(s"Going to run: ${buildCmd}")
+ info(s"Going to run: ${buildCmd} in ${engineDirPath}")
try {
+ val p = Process(s"${buildCmd}", new File(engineDirPath))
val r =
if (verbose) {
- buildCmd.!(ProcessLogger(line => info(line), line => error(line)))
+ p.!(ProcessLogger(line => info(line), line => error(line)))
} else {
- buildCmd.!(ProcessLogger(
+ p.!(ProcessLogger(
line => outputSbtError(line),
line => outputSbtError(line)))
}
@@ -225,31 +145,26 @@ object Engine extends EitherLogging {
}
def build(
+ ea: EngineArgs,
buildArgs: BuildArgs,
pioHome: String,
- manifestJson: File,
verbose: Boolean): MaybeError = {
- regenerateManifestJson(manifestJson) match {
- case Left(err) => return Left(err)
- case _ => Unit
- }
+ val engineDirPath = getEngineDirPath(ea.engineDir)
+ Template.verifyTemplateMinVersion(
+ new File(engineDirPath, "template.json")) match {
- Template.verifyTemplateMinVersion(new File("template.json")) match {
case Left(err) => return Left(err)
case Right(_) =>
- compile(buildArgs, pioHome, verbose)
+ compile(buildArgs, pioHome, engineDirPath, verbose)
info("Looking for an engine...")
- val jarFiles = jarFilesForScala
+ val jarFiles = jarFilesForScala(engineDirPath)
if (jarFiles.isEmpty) {
return logAndFail("No engine found. Your build might have failed. Aborting.")
}
jarFiles foreach { f => info(s"Found ${f.getName}")}
- RegisterEngine.registerEngine(
- manifestJson,
- jarFiles,
- false)
}
+ logAndSucceed("Build finished successfully.")
}
/** Training an engine.
@@ -272,17 +187,10 @@ object Engine extends EitherLogging {
pioHome: String,
verbose: Boolean = false): Expected[(Process, () => Unit)] = {
- regenerateManifestJson(ea.manifestJson) match {
- case Left(err) => return Left(err)
- case _ => Unit
- }
-
- Template.verifyTemplateMinVersion(new File("template.json")).right.flatMap {
- _ =>
- withRegisteredManifest(ea) { em =>
- RunWorkflow.runWorkflow(wa, sa, em, pioHome, verbose)
- }
- }
+ val engineDirPath = getEngineDirPath(ea.engineDir)
+ Template.verifyTemplateMinVersion(
+ new File(engineDirPath, "template.json"))
+ RunWorkflow.runWorkflow(wa, sa, pioHome, engineDirPath, verbose)
}
/** Deploying an engine.
@@ -307,33 +215,30 @@ object Engine extends EitherLogging {
pioHome: String,
verbose: Boolean = false): Expected[(Process, () => Unit)] = {
- val verifyResult = Template.verifyTemplateMinVersion(new File("template.json"))
+ val engineDirPath = getEngineDirPath(ea.engineDir)
+ val verifyResult = Template.verifyTemplateMinVersion(
+ new File(engineDirPath, "template.json"))
if (verifyResult.isLeft) {
return Left(verifyResult.left.get)
}
- withRegisteredManifest(ea) { em =>
- val variantJson = parse(Source.fromFile(serverArgs.variantJson).mkString)
- val variantId = variantJson \ "id" match {
- case JString(s) => s
- case _ =>
- return logAndFail("Unable to read engine variant ID from " +
- s"${serverArgs.variantJson.getCanonicalPath}. Aborting.")
- }
- val engineInstances = storage.Storage.getMetaDataEngineInstances
- val engineInstance = engineInstanceId map { eid =>
- engineInstances.get(eid)
- } getOrElse {
- engineInstances.getLatestCompleted(em.id, em.version, variantId)
- }
- engineInstance map { r =>
- RunServer.runServer(r.id, serverArgs, sparkArgs, em, pioHome, verbose)
+ val ei = Console.getEngineInfo(
+ new File(engineDirPath, serverArgs.variantJson.getName))
+ val engineInstances = storage.Storage.getMetaDataEngineInstances
+ val engineInstance = engineInstanceId map { eid =>
+ engineInstances.get(eid)
+ } getOrElse {
+ engineInstances.getLatestCompleted(
+ ei.engineId, ei.engineVersion, ei.variantId)
+ }
+ engineInstance map { r =>
+ RunServer.runServer(
+ r.id, serverArgs, sparkArgs, pioHome, engineDirPath, verbose)
+ } getOrElse {
+ engineInstanceId map { eid =>
+ logAndFail(s"Invalid engine instance ID ${eid}. Aborting.")
} getOrElse {
- engineInstanceId map { eid =>
- logAndFail(s"Invalid engine instance ID ${eid}. Aborting.")
- } getOrElse {
- logAndFail(s"No valid engine instance found for engine ${em.id} " +
- s"${em.version}.\nTry running 'train' before 'deploy'. Aborting.")
- }
+ logAndFail(s"No valid engine instance found for engine ${ei.engineId} " +
+ s"${ei.engineVersion}.\nTry running 'train' before 'deploy'. Aborting.")
}
}
}
@@ -368,7 +273,6 @@ object Engine extends EitherLogging {
*
* @param mainClass A [[String]] with the class containing a main functionto run
* @param driverArguments Arguments to be passed to the main function
- * @param manifestJson An instance of [[File]] for running a single training.
* @param buildArgs An instance of [[BuildArgs]]
* @param sparkArgs an instance of [[SparkArgs]]
* @param pioHome [[String]] with a path to PIO installation
@@ -378,26 +282,23 @@ object Engine extends EitherLogging {
* of a running driver
*/
def run(
+ ea: EngineArgs,
mainClass: String,
driverArguments: Seq[String],
- manifestJson: File,
buildArgs: BuildArgs,
sparkArgs: SparkArgs,
pioHome: String,
verbose: Boolean): Expected[Process] = {
- generateManifestJson(manifestJson) match {
- case Left(err) => return Left(err)
- case _ => Unit
- }
+ val engineDirPath = getEngineDirPath(ea.engineDir)
- compile(buildArgs, pioHome, verbose)
+ compile(buildArgs, pioHome, engineDirPath, verbose)
val extraFiles = WorkflowUtils.thirdPartyConfFiles
-
- val jarFiles = jarFilesForScala
+ val jarFiles = jarFilesForScala(engineDirPath)
jarFiles foreach { f => info(s"Found JAR: ${f.getName}") }
val allJarFiles = jarFiles.map(_.getCanonicalPath)
+
val cmd = s"${getSparkHome(sparkArgs.sparkHome)}/bin/spark-submit --jars " +
s"${allJarFiles.mkString(",")} " +
(if (extraFiles.size > 0) {
@@ -416,9 +317,4 @@ object Engine extends EitherLogging {
"SPARK_YARN_USER_ENV" -> sys.env.filter(kv => kv._1.startsWith("PIO_")).
map(kv => s"${kv._1}=${kv._2}").mkString(",")).run())
}
-
- def unregister(jsonManifest: File): MaybeError = {
- RegisterEngine.unregisterEngine(jsonManifest)
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7615b470/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
index 195740b..f4c891a 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
@@ -22,42 +22,30 @@ import java.io.File
import java.net.URI
import grizzled.slf4j.Logging
+import org.apache.commons.io.FileUtils
import org.apache.predictionio.controller.Utils
import org.apache.predictionio.core.BuildInfo
-import org.apache.predictionio.data.api.EventServer
-import org.apache.predictionio.data.api.EventServerConfig
+import org.apache.predictionio.data.api.{EventServer, EventServerConfig}
import org.apache.predictionio.data.storage
-import org.apache.predictionio.data.storage.EngineManifest
-import org.apache.predictionio.data.storage.EngineManifestSerializer
-import org.apache.predictionio.tools.RegisterEngine
-import org.apache.predictionio.tools.RunServer
-import org.apache.predictionio.tools.RunWorkflow
-import org.apache.predictionio.tools.Common
+import org.apache.predictionio.tools.{RunServer, RunWorkflow, Common}
import org.apache.predictionio.tools.commands.{
DashboardArgs, AdminServerArgs, ImportArgs, ExportArgs,
BuildArgs, EngineArgs}
import org.apache.predictionio.tools.{
EventServerArgs, SparkArgs, WorkflowArgs, ServerArgs, DeployArgs}
import org.apache.predictionio.tools.EventServerArgs
-import org.apache.predictionio.tools.admin.AdminServer
-import org.apache.predictionio.tools.admin.AdminServerConfig
-import org.apache.predictionio.tools.dashboard.Dashboard
-import org.apache.predictionio.tools.dashboard.DashboardConfig
-import org.apache.predictionio.workflow.JsonExtractorOption
-import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
-import org.apache.predictionio.workflow.WorkflowUtils
+import org.apache.predictionio.tools.admin.{AdminServer, AdminServerConfig}
+import org.apache.predictionio.tools.dashboard.{Dashboard, DashboardConfig}
import org.apache.predictionio.tools.commands
-import org.apache.commons.io.FileUtils
+import org.apache.predictionio.workflow.{JsonExtractorOption, WorkflowUtils}
+import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
import org.json4s._
import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
import semverfi._
import scala.collection.JavaConversions._
import scala.io.Source
import scala.sys.process._
-import scala.util.Random
import scalaj.http.Http
case class ConsoleArgs(
@@ -95,6 +83,11 @@ case class AccessKeyArgs(
accessKey: String = "",
events: Seq[String] = Seq())
+case class EngineInfo(
+ engineId: String,
+ engineVersion: String,
+ variantId: String)
+
object Console extends Logging {
def main(args: Array[String]): Unit = {
val parser = new scopt.OptionParser[ConsoleArgs]("pio") {
@@ -125,12 +118,13 @@ object Console extends Logging {
c.copy(engine = c.engine.copy(engineVersion = Some(x)))
} text("Specify an engine version. Usually used by distributed " +
"deployment.")
+ opt[String]("engine-dir") abbr("ed") action { (x, c) =>
+ c.copy(engine = c.engine.copy(engineDir = Some(x)))
+ } text("Specify absolute path for engine directory, default to " +
+ "current directory.")
opt[File]("variant") abbr("v") action { (x, c) =>
c.copy(workflow = c.workflow.copy(variantJson = x))
}
- opt[File]("manifest") abbr("m") action { (x, c) =>
- c.copy(engine = c.engine.copy(manifestJson = x))
- }
opt[File]("sbt") action { (x, c) =>
c.copy(build = c.build.copy(sbt = Some(x)))
} validate { x =>
@@ -166,7 +160,8 @@ object Console extends Logging {
)
note("")
cmd("build").
- text("Build an engine at the current directory.").
+ text("Build an engine at the specific directory, or current " +
+ "directory by default.").
action { (_, c) =>
c.copy(commands = c.commands :+ "build")
} children(
@@ -188,7 +183,8 @@ object Console extends Logging {
)
note("")
cmd("unregister").
- text("Unregister an engine at the current directory.").
+ text("Unregister an engine at the specific directory, or current " +
+ "directory by default.").
action { (_, c) =>
c.copy(commands = c.commands :+ "unregister")
}
@@ -640,9 +636,7 @@ object Console extends Logging {
Pio.version()
case Seq("build") =>
Pio.build(
- ca.build, ca.pioHome.get, ca.engine.manifestJson, ca.verbose)
- case Seq("unregister") =>
- Pio.unregister(ca.engine.manifestJson)
+ ca.engine, ca.build, ca.pioHome.get, ca.verbose)
case Seq("train") =>
Pio.train(
ca.engine, ca.workflow, ca.spark, ca.pioHome.get, ca.verbose)
@@ -673,9 +667,9 @@ object Console extends Logging {
Pio.adminserver(ca.adminServer)
case Seq("run") =>
Pio.run(
+ ca.engine,
ca.mainClass.get,
ca.driverPassThrough,
- ca.engine.manifestJson,
ca.build,
ca.spark,
ca.pioHome.get,
@@ -740,6 +734,33 @@ object Console extends Logging {
}
}
+ def getEngineInfo(jsonFile: File): EngineInfo = {
+ // Use engineFactory as engineId
+ val variantJson = parse(Source.fromFile(jsonFile).mkString)
+ val engineId = variantJson \ "engineFactory" match {
+ case JString(s) => s
+ case _ =>
+ error("unable to read engine factory from " +
+ s"${jsonFile.getCanonicalPath}. Aborting.")
+ sys.exit(1)
+ }
+
+ val variantId = variantJson \ "id" match {
+ case JString(s) => s
+ case _ =>
+ error("Unable to read engine variant ID from " +
+ s"${jsonFile.getCanonicalPath}. Aborting.")
+ sys.exit(1)
+ }
+
+ // Use hash of engine directory as engineVersion
+ val engineDir = sys.props("user.dir")
+ val engineVersion = java.security.MessageDigest.getInstance("SHA-1").
+ digest(engineDir.getBytes).map("%02x".format(_)).mkString
+
+ EngineInfo(engineId, engineVersion, variantId)
+ }
+
val mainHelp = txt.main().toString
val helpText = Map(
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7615b470/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala
index 77075a7..19d7590 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala
@@ -75,19 +75,17 @@ object Pio extends Logging {
}
def build(
+ ea: EngineArgs,
buildArgs: BuildArgs,
pioHome: String,
- manifestJson: File,
verbose: Boolean = false): Int = {
- doOnSuccess(Engine.build(buildArgs, pioHome, manifestJson, verbose)) {
+ doOnSuccess(Engine.build(ea, buildArgs, pioHome, verbose)) {
_ => info("Your engine is ready for training.")
0
}
}
- def unregister(manifestJson: File): Int = Engine.unregister(manifestJson)
-
def train(
ea: EngineArgs,
wa: WorkflowArgs,
@@ -132,16 +130,16 @@ object Pio extends Logging {
}
def run(
+ ea: EngineArgs,
mainClass: String,
driverArguments: Seq[String],
- manifestJson: File,
buildArgs: BuildArgs,
sparkArgs: SparkArgs,
pioHome: String,
verbose: Boolean = false): Int =
doOnSuccess(Engine.run(
- mainClass, driverArguments, manifestJson,
- buildArgs, sparkArgs, pioHome, verbose)) { proc =>
+ ea, mainClass, driverArguments, buildArgs,
+ sparkArgs, pioHome, verbose)) { proc =>
val r = proc.exitValue()
if (r != 0) {
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7615b470/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt
----------------------------------------------------------------------
diff --git a/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt b/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt
index a97ecb3..5efa4bf 100644
--- a/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt
+++ b/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt
@@ -2,7 +2,7 @@ Usage: pio <command> [options] <args>...
Options common to all commands:
[--pio-home <value>] [--spark-home <value>] [--sbt <value>]
- [-ei <value>] [-ev <value>] [-v <value>] [-m <value>]
+ [-ei <value>] [-ev <value>] [-ed <value>] [-v <value>]
[-sk | --spark-kryo] [--verbose]
[<args>] [-- [<args passed to Spark>] [-- [<args passed to runner]]]
@@ -12,10 +12,10 @@ Options common to all commands:
Specify an engine ID. Usually used by distributed deployment.
-ev <value> | --engine-version <value>
Specify an engine version. Usually used by distributed deployment.
+ -ed <value> | --engine-dir <value>
+ Specify path of engine directory to run build/train/deploy commands.
-v <value> | --variant <value>
Path to an engine variant JSON file. Default: engine.json
- -m <value> | --manifest <value>
- Path to an engine manifest JSON file. Default: manifest.json
-sk | --spark-kryo
Shorthand for setting the spark.serializer property to
org.apache.spark.serializer.KryoSerializer.
|