predictionio-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From don...@apache.org
Subject [16/34] incubator-predictionio git commit: rename all except examples
Date Mon, 18 Jul 2016 20:17:47 GMT
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/DateTimeJson4sSupport.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/DateTimeJson4sSupport.scala b/data/src/main/scala/io/prediction/data/storage/DateTimeJson4sSupport.scala
deleted file mode 100644
index 48f935a..0000000
--- a/data/src/main/scala/io/prediction/data/storage/DateTimeJson4sSupport.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import io.prediction.annotation.DeveloperApi
-import io.prediction.data.{Utils => DataUtils}
-import org.joda.time.DateTime
-import org.json4s._
-
-/** :: DeveloperApi ::
-  * JSON4S serializer for Joda-Time
-  *
-  * @group Common
-  */
-@DeveloperApi
-object DateTimeJson4sSupport {
-
-  @transient lazy implicit val formats = DefaultFormats
-
-  /** Serialize DateTime to JValue */
-  def serializeToJValue: PartialFunction[Any, JValue] = {
-    case d: DateTime => JString(DataUtils.dateTimeToString(d))
-  }
-
-  /** Deserialize JValue to DateTime */
-  def deserializeFromJValue: PartialFunction[JValue, DateTime] = {
-    case jv: JValue => DataUtils.stringToDateTime(jv.extract[String])
-  }
-
-  /** Custom JSON4S serializer for Joda-Time */
-  class Serializer extends CustomSerializer[DateTime](format => (
-    deserializeFromJValue, serializeToJValue))
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/EngineInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/EngineInstances.scala b/data/src/main/scala/io/prediction/data/storage/EngineInstances.scala
deleted file mode 100644
index fdbb6ba..0000000
--- a/data/src/main/scala/io/prediction/data/storage/EngineInstances.scala
+++ /dev/null
@@ -1,177 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import com.github.nscala_time.time.Imports._
-import io.prediction.annotation.DeveloperApi
-import org.json4s._
-
-/** :: DeveloperApi ::
-  * Stores parameters, model, and other information for each engine instance
-  *
-  * @param id Engine instance ID.
-  * @param status Status of the engine instance.
-  * @param startTime Start time of the training/evaluation.
-  * @param endTime End time of the training/evaluation.
-  * @param engineId Engine ID of the instance.
-  * @param engineVersion Engine version of the instance.
-  * @param engineVariant Engine variant ID of the instance.
-  * @param engineFactory Engine factory class for the instance.
-  * @param batch A batch label of the engine instance.
-  * @param env The environment in which the instance was created.
-  * @param sparkConf Custom Spark configuration of the instance.
-  * @param dataSourceParams Data source parameters of the instance.
-  * @param preparatorParams Preparator parameters of the instance.
-  * @param algorithmsParams Algorithms parameters of the instance.
-  * @param servingParams Serving parameters of the instance.
-  * @group Meta Data
-  */
-@DeveloperApi
-case class EngineInstance(
-  id: String,
-  status: String,
-  startTime: DateTime,
-  endTime: DateTime,
-  engineId: String,
-  engineVersion: String,
-  engineVariant: String,
-  engineFactory: String,
-  batch: String,
-  env: Map[String, String],
-  sparkConf: Map[String, String],
-  dataSourceParams: String,
-  preparatorParams: String,
-  algorithmsParams: String,
-  servingParams: String)
-
-/** :: DeveloperApi ::
-  * Base trait of the [[EngineInstance]] data access object
-  *
-  * @group Meta Data
-  */
-@DeveloperApi
-trait EngineInstances {
-  /** Insert a new [[EngineInstance]] */
-  def insert(i: EngineInstance): String
-
-  /** Get an [[EngineInstance]] by ID */
-  def get(id: String): Option[EngineInstance]
-
-  /** Get all [[EngineInstance]]s */
-  def getAll(): Seq[EngineInstance]
-
-  /** Get an instance that has started training the latest and has trained to
-    * completion
-    */
-  def getLatestCompleted(
-      engineId: String,
-      engineVersion: String,
-      engineVariant: String): Option[EngineInstance]
-
-  /** Get all instances that has trained to completion */
-  def getCompleted(
-    engineId: String,
-    engineVersion: String,
-    engineVariant: String): Seq[EngineInstance]
-
-  /** Update an [[EngineInstance]] */
-  def update(i: EngineInstance): Unit
-
-  /** Delete an [[EngineInstance]] */
-  def delete(id: String): Unit
-}
-
-/** :: DeveloperApi ::
-  * JSON4S serializer for [[EngineInstance]]
-  *
-  * @group Meta Data
-  */
-@DeveloperApi
-class EngineInstanceSerializer
-    extends CustomSerializer[EngineInstance](
-  format => ({
-    case JObject(fields) =>
-      implicit val formats = DefaultFormats
-      val seed = EngineInstance(
-          id = "",
-          status = "",
-          startTime = DateTime.now,
-          endTime = DateTime.now,
-          engineId = "",
-          engineVersion = "",
-          engineVariant = "",
-          engineFactory = "",
-          batch = "",
-          env = Map(),
-          sparkConf = Map(),
-          dataSourceParams = "",
-          preparatorParams = "",
-          algorithmsParams = "",
-          servingParams = "")
-      fields.foldLeft(seed) { case (i, field) =>
-        field match {
-          case JField("id", JString(id)) => i.copy(id = id)
-          case JField("status", JString(status)) => i.copy(status = status)
-          case JField("startTime", JString(startTime)) =>
-            i.copy(startTime = Utils.stringToDateTime(startTime))
-          case JField("endTime", JString(endTime)) =>
-            i.copy(endTime = Utils.stringToDateTime(endTime))
-          case JField("engineId", JString(engineId)) =>
-            i.copy(engineId = engineId)
-          case JField("engineVersion", JString(engineVersion)) =>
-            i.copy(engineVersion = engineVersion)
-          case JField("engineVariant", JString(engineVariant)) =>
-            i.copy(engineVariant = engineVariant)
-          case JField("engineFactory", JString(engineFactory)) =>
-            i.copy(engineFactory = engineFactory)
-          case JField("batch", JString(batch)) => i.copy(batch = batch)
-          case JField("env", env) =>
-            i.copy(env = Extraction.extract[Map[String, String]](env))
-          case JField("sparkConf", sparkConf) =>
-            i.copy(sparkConf = Extraction.extract[Map[String, String]](sparkConf))
-          case JField("dataSourceParams", JString(dataSourceParams)) =>
-            i.copy(dataSourceParams = dataSourceParams)
-          case JField("preparatorParams", JString(preparatorParams)) =>
-            i.copy(preparatorParams = preparatorParams)
-          case JField("algorithmsParams", JString(algorithmsParams)) =>
-            i.copy(algorithmsParams = algorithmsParams)
-          case JField("servingParams", JString(servingParams)) =>
-            i.copy(servingParams = servingParams)
-          case _ => i
-        }
-      }
-  },
-  {
-    case i: EngineInstance =>
-      JObject(
-        JField("id", JString(i.id)) ::
-        JField("status", JString(i.status)) ::
-        JField("startTime", JString(i.startTime.toString)) ::
-        JField("endTime", JString(i.endTime.toString)) ::
-        JField("engineId", JString(i.engineId)) ::
-        JField("engineVersion", JString(i.engineVersion)) ::
-        JField("engineVariant", JString(i.engineVariant)) ::
-        JField("engineFactory", JString(i.engineFactory)) ::
-        JField("batch", JString(i.batch)) ::
-        JField("env", Extraction.decompose(i.env)(DefaultFormats)) ::
-        JField("sparkConf", Extraction.decompose(i.sparkConf)(DefaultFormats)) ::
-        JField("dataSourceParams", JString(i.dataSourceParams)) ::
-        JField("preparatorParams", JString(i.preparatorParams)) ::
-        JField("algorithmsParams", JString(i.algorithmsParams)) ::
-        JField("servingParams", JString(i.servingParams)) ::
-        Nil)
-  }
-))

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/EngineManifests.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/EngineManifests.scala b/data/src/main/scala/io/prediction/data/storage/EngineManifests.scala
deleted file mode 100644
index d69ceae..0000000
--- a/data/src/main/scala/io/prediction/data/storage/EngineManifests.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import io.prediction.annotation.DeveloperApi
-import org.json4s._
-
-/** :: DeveloperApi ::
-  * Provides a way to discover engines by ID and version in a distributed
-  * environment
-  *
-  * @param id Unique identifier of an engine.
-  * @param version Engine version string.
-  * @param name A short and descriptive name for the engine.
-  * @param description A long description of the engine.
-  * @param files Paths to engine files.
-  * @param engineFactory Engine's factory class name.
-  * @group Meta Data
-  */
-@DeveloperApi
-case class EngineManifest(
-  id: String,
-  version: String,
-  name: String,
-  description: Option[String],
-  files: Seq[String],
-  engineFactory: String)
-
-/** :: DeveloperApi ::
-  * Base trait of the [[EngineManifest]] data access object
-  *
-  * @group Meta Data
-  */
-@DeveloperApi
-trait EngineManifests {
-  /** Inserts an [[EngineManifest]] */
-  def insert(engineManifest: EngineManifest): Unit
-
-  /** Get an [[EngineManifest]] by its ID */
-  def get(id: String, version: String): Option[EngineManifest]
-
-  /** Get all [[EngineManifest]] */
-  def getAll(): Seq[EngineManifest]
-
-  /** Updates an [[EngineManifest]] */
-  def update(engineInfo: EngineManifest, upsert: Boolean = false): Unit
-
-  /** Delete an [[EngineManifest]] by its ID */
-  def delete(id: String, version: String): Unit
-}
-
-/** :: DeveloperApi ::
-  * JSON4S serializer for [[EngineManifest]]
-  *
-  * @group Meta Data
-  */
-@DeveloperApi
-class EngineManifestSerializer
-    extends CustomSerializer[EngineManifest](format => (
-  {
-    case JObject(fields) =>
-      val seed = EngineManifest(
-        id = "",
-        version = "",
-        name = "",
-        description = None,
-        files = Nil,
-        engineFactory = "")
-      fields.foldLeft(seed) { case (enginemanifest, field) =>
-        field match {
-          case JField("id", JString(id)) => enginemanifest.copy(id = id)
-          case JField("version", JString(version)) =>
-            enginemanifest.copy(version = version)
-          case JField("name", JString(name)) => enginemanifest.copy(name = name)
-          case JField("description", JString(description)) =>
-            enginemanifest.copy(description = Some(description))
-          case JField("files", JArray(s)) =>
-            enginemanifest.copy(files = s.map(t =>
-              t match {
-                case JString(file) => file
-                case _ => ""
-              }
-            ))
-          case JField("engineFactory", JString(engineFactory)) =>
-            enginemanifest.copy(engineFactory = engineFactory)
-          case _ => enginemanifest
-        }
-      }
-  },
-  {
-    case enginemanifest: EngineManifest =>
-      JObject(
-        JField("id", JString(enginemanifest.id)) ::
-        JField("version", JString(enginemanifest.version)) ::
-        JField("name", JString(enginemanifest.name)) ::
-        JField("description",
-          enginemanifest.description.map(
-            x => JString(x)).getOrElse(JNothing)) ::
-        JField("files",
-          JArray(enginemanifest.files.map(x => JString(x)).toList)) ::
-        JField("engineFactory", JString(enginemanifest.engineFactory)) ::
-        Nil)
-  }
-))

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/EntityMap.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/EntityMap.scala b/data/src/main/scala/io/prediction/data/storage/EntityMap.scala
deleted file mode 100644
index d9cd4c8..0000000
--- a/data/src/main/scala/io/prediction/data/storage/EntityMap.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package io.prediction.data.storage
-
-import io.prediction.annotation.Experimental
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-/**
- * :: Experimental ::
- */
-@Experimental
-class EntityIdIxMap(val idToIx: BiMap[String, Long]) extends Serializable {
-
-  val ixToId: BiMap[Long, String] = idToIx.inverse
-
-  def apply(id: String): Long = idToIx(id)
-
-  def apply(ix: Long): String = ixToId(ix)
-
-  def contains(id: String): Boolean = idToIx.contains(id)
-
-  def contains(ix: Long): Boolean = ixToId.contains(ix)
-
-  def get(id: String): Option[Long] = idToIx.get(id)
-
-  def get(ix: Long): Option[String] = ixToId.get(ix)
-
-  def getOrElse(id: String, default: => Long): Long =
-    idToIx.getOrElse(id, default)
-
-  def getOrElse(ix: Long, default: => String): String =
-    ixToId.getOrElse(ix, default)
-
-  def toMap: Map[String, Long] = idToIx.toMap
-
-  def size: Long = idToIx.size
-
-  def take(n: Int): EntityIdIxMap = new EntityIdIxMap(idToIx.take(n))
-
-  override def toString: String = idToIx.toString
-}
-
-/** :: Experimental :: */
-@Experimental
-object EntityIdIxMap {
-  def apply(keys: RDD[String]): EntityIdIxMap = {
-    new EntityIdIxMap(BiMap.stringLong(keys))
-  }
-}
-
-/** :: Experimental :: */
-@Experimental
-class EntityMap[A](val idToData: Map[String, A],
-  override val idToIx: BiMap[String, Long]) extends EntityIdIxMap(idToIx) {
-
-  def this(idToData: Map[String, A]) = this(
-    idToData,
-    BiMap.stringLong(idToData.keySet)
-  )
-
-  def data(id: String): A = idToData(id)
-
-  def data(ix: Long): A = idToData(ixToId(ix))
-
-  def getData(id: String): Option[A] = idToData.get(id)
-
-  def getData(ix: Long): Option[A] = idToData.get(ixToId(ix))
-
-  def getOrElseData(id: String, default: => A): A =
-    getData(id).getOrElse(default)
-
-  def getOrElseData(ix: Long, default: => A): A =
-    getData(ix).getOrElse(default)
-
-  override def take(n: Int): EntityMap[A] = {
-    val newIdToIx = idToIx.take(n)
-    new EntityMap[A](idToData.filterKeys(newIdToIx.contains(_)), newIdToIx)
-  }
-
-  override def toString: String = {
-    s"idToData: ${idToData.toString} " + s"idToix: ${idToIx.toString}"
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/EvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/EvaluationInstances.scala b/data/src/main/scala/io/prediction/data/storage/EvaluationInstances.scala
deleted file mode 100644
index 0a7d502..0000000
--- a/data/src/main/scala/io/prediction/data/storage/EvaluationInstances.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import com.github.nscala_time.time.Imports._
-import io.prediction.annotation.DeveloperApi
-import org.json4s._
-
-/** :: DeveloperApi ::
-  * Stores meta information for each evaluation instance.
-  *
-  * @param id Instance ID.
-  * @param status Status of this instance.
-  * @param startTime Start time of this instance.
-  * @param endTime End time of this instance.
-  * @param evaluationClass Evaluation class name of this instance.
-  * @param engineParamsGeneratorClass Engine parameters generator class name of this instance.
-  * @param batch Batch label of this instance.
-  * @param env The environment in which this instance was created.
-  * @param evaluatorResults Results of the evaluator.
-  * @param evaluatorResultsHTML HTML results of the evaluator.
-  * @param evaluatorResultsJSON JSON results of the evaluator.
-  * @group Meta Data
-  */
-@DeveloperApi
-case class EvaluationInstance(
-  id: String = "",
-  status: String = "",
-  startTime: DateTime = DateTime.now,
-  endTime: DateTime = DateTime.now,
-  evaluationClass: String = "",
-  engineParamsGeneratorClass: String = "",
-  batch: String = "",
-  env: Map[String, String] = Map(),
-  sparkConf: Map[String, String] = Map(),
-  evaluatorResults: String = "",
-  evaluatorResultsHTML: String = "",
-  evaluatorResultsJSON: String = "")
-
-/** :: DeveloperApi ::
-  * Base trait of the [[EvaluationInstance]] data access object
-  *
-  * @group Meta Data
-  */
-@DeveloperApi
-trait EvaluationInstances {
-  /** Insert a new [[EvaluationInstance]] */
-  def insert(i: EvaluationInstance): String
-
-  /** Get an [[EvaluationInstance]] by ID */
-  def get(id: String): Option[EvaluationInstance]
-
-  /** Get all [[EvaluationInstances]] */
-  def getAll: Seq[EvaluationInstance]
-
-  /** Get instances that are produced by evaluation and have run to completion,
-    * reverse sorted by the start time
-    */
-  def getCompleted: Seq[EvaluationInstance]
-
-  /** Update an [[EvaluationInstance]] */
-  def update(i: EvaluationInstance): Unit
-
-  /** Delete an [[EvaluationInstance]] */
-  def delete(id: String): Unit
-}
-
-/** :: DeveloperApi ::
-  * JSON4S serializer for [[EvaluationInstance]]
-  *
-  * @group Meta Data
-  */
-class EvaluationInstanceSerializer extends CustomSerializer[EvaluationInstance](
-  format => ({
-    case JObject(fields) =>
-      implicit val formats = DefaultFormats
-      fields.foldLeft(EvaluationInstance()) { case (i, field) =>
-        field match {
-          case JField("id", JString(id)) => i.copy(id = id)
-          case JField("status", JString(status)) => i.copy(status = status)
-          case JField("startTime", JString(startTime)) =>
-            i.copy(startTime = Utils.stringToDateTime(startTime))
-          case JField("endTime", JString(endTime)) =>
-            i.copy(endTime = Utils.stringToDateTime(endTime))
-          case JField("evaluationClass", JString(evaluationClass)) =>
-            i.copy(evaluationClass = evaluationClass)
-          case JField("engineParamsGeneratorClass", JString(engineParamsGeneratorClass)) =>
-            i.copy(engineParamsGeneratorClass = engineParamsGeneratorClass)
-          case JField("batch", JString(batch)) => i.copy(batch = batch)
-          case JField("env", env) =>
-            i.copy(env = Extraction.extract[Map[String, String]](env))
-          case JField("sparkConf", sparkConf) =>
-            i.copy(sparkConf = Extraction.extract[Map[String, String]](sparkConf))
-          case JField("evaluatorResults", JString(evaluatorResults)) =>
-            i.copy(evaluatorResults = evaluatorResults)
-          case JField("evaluatorResultsHTML", JString(evaluatorResultsHTML)) =>
-            i.copy(evaluatorResultsHTML = evaluatorResultsHTML)
-          case JField("evaluatorResultsJSON", JString(evaluatorResultsJSON)) =>
-            i.copy(evaluatorResultsJSON = evaluatorResultsJSON)
-          case _ => i
-        }
-      }
-  }, {
-    case i: EvaluationInstance =>
-      JObject(
-        JField("id", JString(i.id)) ::
-          JField("status", JString(i.status)) ::
-          JField("startTime", JString(i.startTime.toString)) ::
-          JField("endTime", JString(i.endTime.toString)) ::
-          JField("evaluationClass", JString(i.evaluationClass)) ::
-          JField("engineParamsGeneratorClass", JString(i.engineParamsGeneratorClass)) ::
-          JField("batch", JString(i.batch)) ::
-          JField("env", Extraction.decompose(i.env)(DefaultFormats)) ::
-          JField("sparkConf", Extraction.decompose(i.sparkConf)(DefaultFormats)) ::
-          JField("evaluatorResults", JString(i.evaluatorResults)) ::
-          JField("evaluatorResultsHTML", JString(i.evaluatorResultsHTML)) ::
-          JField("evaluatorResultsJSON", JString(i.evaluatorResultsJSON)) ::
-          Nil
-      )
-  }
-  )
-)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/Event.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/Event.scala b/data/src/main/scala/io/prediction/data/storage/Event.scala
deleted file mode 100644
index abc16b9..0000000
--- a/data/src/main/scala/io/prediction/data/storage/Event.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import io.prediction.annotation.DeveloperApi
-import org.joda.time.DateTime
-import org.joda.time.DateTimeZone
-
-/** Each event in the Event Store can be represented by fields in this case
-  * class.
-  *
-  * @param eventId Unique ID of this event.
-  * @param event Name of this event.
-  * @param entityType Type of the entity associated with this event.
-  * @param entityId ID of the entity associated with this event.
-  * @param targetEntityType Type of the target entity associated with this
-  *                         event.
-  * @param targetEntityId ID of the target entity associated with this event.
-  * @param properties Properties associated with this event.
-  * @param eventTime Time of the happening of this event.
-  * @param tags Tags of this event.
-  * @param prId PredictedResultId of this event.
-  * @param creationTime Time of creation in the system of this event.
-  * @group Event Data
-  */
-case class Event(
-  val eventId: Option[String] = None,
-  val event: String,
-  val entityType: String,
-  val entityId: String,
-  val targetEntityType: Option[String] = None,
-  val targetEntityId: Option[String] = None,
-  val properties: DataMap = DataMap(), // default empty
-  val eventTime: DateTime = DateTime.now,
-  val tags: Seq[String] = Nil,
-  val prId: Option[String] = None,
-  val creationTime: DateTime = DateTime.now
-) {
-  override def toString(): String = {
-    s"Event(id=$eventId,event=$event,eType=$entityType,eId=$entityId," +
-    s"tType=$targetEntityType,tId=$targetEntityId,p=$properties,t=$eventTime," +
-    s"tags=$tags,pKey=$prId,ct=$creationTime)"
-  }
-}
-
-/** :: DeveloperApi ::
-  * Utilities for validating [[Event]]s
-  *
-  * @group Event Data
-  */
-@DeveloperApi
-object EventValidation {
-  /** Default time zone is set to UTC */
-  val defaultTimeZone = DateTimeZone.UTC
-
-  /** Checks whether an event name contains a reserved prefix
-    *
-    * @param name Event name
-    * @return true if event name starts with \$ or pio_, false otherwise
-    */
-  def isReservedPrefix(name: String): Boolean = name.startsWith("$") ||
-    name.startsWith("pio_")
-
-  /** PredictionIO reserves some single entity event names. They are currently
-    * \$set, \$unset, and \$delete.
-    */
-  val specialEvents = Set("$set", "$unset", "$delete")
-
-  /** Checks whether an event name is a special PredictionIO event name
-    *
-    * @param name Event name
-    * @return true if the name is a special event, false otherwise
-    */
-  def isSpecialEvents(name: String): Boolean = specialEvents.contains(name)
-
-  /** Validate an [[Event]], throwing exceptions when the candidate violates any
-    * of the following:
-    *
-    *  - event name must not be empty
-    *  - entityType must not be empty
-    *  - entityId must not be empty
-    *  - targetEntityType must not be Some of empty
-    *  - targetEntityId must not be Some of empty
-    *  - targetEntityType and targetEntityId must be both Some or None
-    *  - properties must not be empty when event is \$unset
-    *  - event name must be a special event if it has a reserved prefix
-    *  - targetEntityType and targetEntityId must be None if the event name has
-    *    a reserved prefix
-    *  - entityType must be a built-in entity type if entityType has a
-    *    reserved prefix
-    *  - targetEntityType must be a built-in entity type if targetEntityType is
-    *    Some and has a reserved prefix
-    *
-    * @param e Event to be validated
-    */
-  def validate(e: Event): Unit = {
-
-    require(!e.event.isEmpty, "event must not be empty.")
-    require(!e.entityType.isEmpty, "entityType must not be empty string.")
-    require(!e.entityId.isEmpty, "entityId must not be empty string.")
-    require(e.targetEntityType.map(!_.isEmpty).getOrElse(true),
-      "targetEntityType must not be empty string")
-    require(e.targetEntityId.map(!_.isEmpty).getOrElse(true),
-      "targetEntityId must not be empty string.")
-    require(!((e.targetEntityType != None) && (e.targetEntityId == None)),
-      "targetEntityType and targetEntityId must be specified together.")
-    require(!((e.targetEntityType == None) && (e.targetEntityId != None)),
-      "targetEntityType and targetEntityId must be specified together.")
-    require(!((e.event == "$unset") && e.properties.isEmpty),
-      "properties cannot be empty for $unset event")
-    require(!isReservedPrefix(e.event) || isSpecialEvents(e.event),
-      s"${e.event} is not a supported reserved event name.")
-    require(!isSpecialEvents(e.event) ||
-      ((e.targetEntityType == None) && (e.targetEntityId == None)),
-      s"Reserved event ${e.event} cannot have targetEntity")
-    require(!isReservedPrefix(e.entityType) ||
-      isBuiltinEntityTypes(e.entityType),
-      s"The entityType ${e.entityType} is not allowed. " +
-        s"'pio_' is a reserved name prefix.")
-    require(e.targetEntityType.map{ t =>
-      (!isReservedPrefix(t) || isBuiltinEntityTypes(t))}.getOrElse(true),
-      s"The targetEntityType ${e.targetEntityType.get} is not allowed. " +
-        s"'pio_' is a reserved name prefix.")
-    validateProperties(e)
-  }
-
-  /** Defines built-in entity types. The current built-in type is pio_pr. */
-  val builtinEntityTypes: Set[String] = Set("pio_pr")
-
-  /** Defines built-in properties. This is currently empty. */
-  val builtinProperties: Set[String] = Set()
-
-  /** Checks whether an entity type is a built-in entity type */
-  def isBuiltinEntityTypes(name: String): Boolean = builtinEntityTypes.contains(name)
-
-  /** Validate event properties, throwing exceptions when the candidate violates
-    * any of the following:
-    *
-    *  - property name must not contain a reserved prefix
-    *
-    * @param e Event to be validated
-    */
-  def validateProperties(e: Event): Unit = {
-    e.properties.keySet.foreach { k =>
-      require(!isReservedPrefix(k) || builtinProperties.contains(k),
-        s"The property ${k} is not allowed. " +
-          s"'pio_' is a reserved name prefix.")
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/EventJson4sSupport.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/EventJson4sSupport.scala b/data/src/main/scala/io/prediction/data/storage/EventJson4sSupport.scala
deleted file mode 100644
index 22243c2..0000000
--- a/data/src/main/scala/io/prediction/data/storage/EventJson4sSupport.scala
+++ /dev/null
@@ -1,236 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import io.prediction.annotation.DeveloperApi
-import io.prediction.data.{Utils => DataUtils}
-import org.joda.time.DateTime
-import org.json4s._
-import scala.util.{Try, Success, Failure}
-
-/** :: DeveloperApi ::
-  * Support library for dealing with [[Event]] and JSON4S
-  *
-  * @group Event Data
-  */
-@DeveloperApi
-object EventJson4sSupport {
-  /** This is set to org.json4s.DefaultFormats. Do not use JSON4S to serialize
-    * or deserialize Joda-Time DateTime because it has some issues with timezone
-    * (as of version 3.2.10)
-    */
-  implicit val formats = DefaultFormats
-
-  /** :: DeveloperApi ::
-    * Convert JSON from Event Server to [[Event]]
-    *
-    * @return deserialization routine used by [[APISerializer]]
-    */
-  @DeveloperApi
-  def readJson: PartialFunction[JValue, Event] = {
-    case JObject(x) => {
-      val fields = new DataMap(x.toMap)
-      // use get() if required in json
-      // use getOpt() if not required in json
-      try {
-        val event = fields.get[String]("event")
-        val entityType = fields.get[String]("entityType")
-        val entityId = fields.get[String]("entityId")
-        val targetEntityType = fields.getOpt[String]("targetEntityType")
-        val targetEntityId = fields.getOpt[String]("targetEntityId")
-        val properties = fields.getOrElse[Map[String, JValue]](
-          "properties", Map())
-        // default currentTime expressed as UTC timezone
-        lazy val currentTime = DateTime.now(EventValidation.defaultTimeZone)
-        val eventTime = fields.getOpt[String]("eventTime")
-          .map{ s =>
-            try {
-              DataUtils.stringToDateTime(s)
-            } catch {
-              case _: Exception =>
-                throw new MappingException(s"Fail to extract eventTime ${s}")
-            }
-          }.getOrElse(currentTime)
-
-        // disable tags from API for now.
-        val tags = List()
-      // val tags = fields.getOpt[Seq[String]]("tags").getOrElse(List())
-
-        val prId = fields.getOpt[String]("prId")
-
-        // don't allow user set creationTime from API for now.
-        val creationTime = currentTime
-      // val creationTime = fields.getOpt[String]("creationTime")
-      //   .map{ s =>
-      //     try {
-      //       DataUtils.stringToDateTime(s)
-      //     } catch {
-      //       case _: Exception =>
-      //         throw new MappingException(s"Fail to extract creationTime ${s}")
-      //     }
-      //   }.getOrElse(currentTime)
-
-
-        val newEvent = Event(
-          event = event,
-          entityType = entityType,
-          entityId = entityId,
-          targetEntityType = targetEntityType,
-          targetEntityId = targetEntityId,
-          properties = DataMap(properties),
-          eventTime = eventTime,
-          prId = prId,
-          creationTime = creationTime
-        )
-        EventValidation.validate(newEvent)
-        newEvent
-      } catch {
-        case e: Exception => throw new MappingException(e.toString, e)
-      }
-    }
-  }
-
-  /** :: DeveloperApi ::
-    * Convert [[Event]] to JSON for use by the Event Server
-    *
-    * @return serialization routine used by [[APISerializer]]
-    */
-  @DeveloperApi
-  def writeJson: PartialFunction[Any, JValue] = {
-    case d: Event => {
-      JObject(
-        JField("eventId",
-          d.eventId.map( eid => JString(eid)).getOrElse(JNothing)) ::
-        JField("event", JString(d.event)) ::
-        JField("entityType", JString(d.entityType)) ::
-        JField("entityId", JString(d.entityId)) ::
-        JField("targetEntityType",
-          d.targetEntityType.map(JString(_)).getOrElse(JNothing)) ::
-        JField("targetEntityId",
-          d.targetEntityId.map(JString(_)).getOrElse(JNothing)) ::
-        JField("properties", d.properties.toJObject) ::
-        JField("eventTime", JString(DataUtils.dateTimeToString(d.eventTime))) ::
-        // disable tags from API for now
-        // JField("tags", JArray(d.tags.toList.map(JString(_)))) ::
-        // disable tags from API for now
-        JField("prId",
-          d.prId.map(JString(_)).getOrElse(JNothing)) ::
-        // don't show creationTime for now
-        JField("creationTime",
-          JString(DataUtils.dateTimeToString(d.creationTime))) ::
-        Nil)
-    }
-  }
-
-  /** :: DeveloperApi ::
-    * Convert JSON4S JValue to [[Event]]
-    *
-    * @return deserialization routine used by [[DBSerializer]]
-    */
-  @DeveloperApi
-  def deserializeFromJValue: PartialFunction[JValue, Event] = {
-    case jv: JValue => {
-      val event = (jv \ "event").extract[String]
-      val entityType = (jv \ "entityType").extract[String]
-      val entityId = (jv \ "entityId").extract[String]
-      val targetEntityType = (jv \ "targetEntityType").extract[Option[String]]
-      val targetEntityId = (jv \ "targetEntityId").extract[Option[String]]
-      val properties = (jv \ "properties").extract[JObject]
-      val eventTime = DataUtils.stringToDateTime(
-        (jv \ "eventTime").extract[String])
-      val tags = (jv \ "tags").extract[Seq[String]]
-      val prId = (jv \ "prId").extract[Option[String]]
-      val creationTime = DataUtils.stringToDateTime(
-        (jv \ "creationTime").extract[String])
-      Event(
-        event = event,
-        entityType = entityType,
-        entityId = entityId,
-        targetEntityType = targetEntityType,
-        targetEntityId = targetEntityId,
-        properties = DataMap(properties),
-        eventTime = eventTime,
-        tags = tags,
-        prId = prId,
-        creationTime = creationTime)
-    }
-  }
-
-  /** :: DeveloperApi ::
-    * Convert [[Event]] to JSON4S JValue
-    *
-    * @return serialization routine used by [[DBSerializer]]
-    */
-  @DeveloperApi
-  def serializeToJValue: PartialFunction[Any, JValue] = {
-    case d: Event => {
-      JObject(
-        JField("event", JString(d.event)) ::
-        JField("entityType", JString(d.entityType)) ::
-        JField("entityId", JString(d.entityId)) ::
-        JField("targetEntityType",
-          d.targetEntityType.map(JString(_)).getOrElse(JNothing)) ::
-        JField("targetEntityId",
-          d.targetEntityId.map(JString(_)).getOrElse(JNothing)) ::
-        JField("properties", d.properties.toJObject) ::
-        JField("eventTime", JString(DataUtils.dateTimeToString(d.eventTime))) ::
-        JField("tags", JArray(d.tags.toList.map(JString(_)))) ::
-        JField("prId",
-          d.prId.map(JString(_)).getOrElse(JNothing)) ::
-        JField("creationTime",
-          JString(DataUtils.dateTimeToString(d.creationTime))) ::
-        Nil)
-    }
-  }
-
-  /** :: DeveloperApi ::
-    * Custom JSON4S serializer for [[Event]] intended to be used by database
-    * access, or anywhere that demands serdes of [[Event]] to/from JSON4S JValue
-    */
-  @DeveloperApi
-  class DBSerializer extends CustomSerializer[Event](format => (
-    deserializeFromJValue, serializeToJValue))
-
-  /** :: DeveloperApi ::
-    * Custom JSON4S serializer for [[Event]] intended to be used by the Event
-    * Server, or anywhere that demands serdes of [[Event]] to/from JSON
-    */
-  @DeveloperApi
-  class APISerializer extends CustomSerializer[Event](format => (
-    readJson, writeJson))
-}
-
-
-@DeveloperApi
-object BatchEventsJson4sSupport {
-  implicit val formats = DefaultFormats
-
-  @DeveloperApi
-  def readJson: PartialFunction[JValue, Seq[Try[Event]]] = {
-    case JArray(events) => {
-      events.map { event =>
-        try {
-          Success(EventJson4sSupport.readJson(event))
-        } catch {
-          case e: Exception => Failure(e)
-        }
-      }
-    }
-  }
-
-  @DeveloperApi
-  class APISerializer extends CustomSerializer[Seq[Try[Event]]](format => (readJson, Map.empty))
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/LEventAggregator.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/LEventAggregator.scala b/data/src/main/scala/io/prediction/data/storage/LEventAggregator.scala
deleted file mode 100644
index f3c4b11..0000000
--- a/data/src/main/scala/io/prediction/data/storage/LEventAggregator.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import io.prediction.annotation.DeveloperApi
-import org.joda.time.DateTime
-
-/** :: DeveloperApi ::
-  * Provides aggregation support of [[Event]]s to [[LEvents]]. Engine developers
-  * should use [[io.prediction.data.store.LEventStore]] instead of using this
-  * directly.
-  *
-  * @group Event Data
-  */
-@DeveloperApi
-object LEventAggregator {
-  /** :: DeveloperApi ::
-    * Aggregate all properties grouped by entity type given an iterator of
-    * [[Event]]s with the latest property values from all [[Event]]s, and their
-    * first and last updated time
-    *
-    * @param events An iterator of [[Event]]s whose properties will be aggregated
-    * @return A map of entity type to [[PropertyMap]]
-    */
-  @DeveloperApi
-  def aggregateProperties(events: Iterator[Event]): Map[String, PropertyMap] = {
-    events.toList
-      .groupBy(_.entityId)
-      .mapValues(_.sortBy(_.eventTime.getMillis)
-        .foldLeft[Prop](Prop())(propAggregator))
-      .filter{ case (k, v) => v.dm.isDefined }
-      .mapValues{ v =>
-        require(v.firstUpdated.isDefined,
-          "Unexpected Error: firstUpdated cannot be None.")
-        require(v.lastUpdated.isDefined,
-          "Unexpected Error: lastUpdated cannot be None.")
-
-        PropertyMap(
-          fields = v.dm.get.fields,
-          firstUpdated = v.firstUpdated.get,
-          lastUpdated = v.lastUpdated.get
-        )
-      }
-  }
-
-  /** :: DeveloperApi ::
-    * Aggregate all properties given an iterator of [[Event]]s with the latest
-    * property values from all [[Event]]s, and their first and last updated time
-    *
-    * @param events An iterator of [[Event]]s whose properties will be aggregated
-    * @return An optional [[PropertyMap]]
-    */
-  @DeveloperApi
-  def aggregatePropertiesSingle(events: Iterator[Event])
-  : Option[PropertyMap] = {
-    val prop = events.toList
-      .sortBy(_.eventTime.getMillis)
-      .foldLeft[Prop](Prop())(propAggregator)
-
-    prop.dm.map{ d =>
-      require(prop.firstUpdated.isDefined,
-        "Unexpected Error: firstUpdated cannot be None.")
-      require(prop.lastUpdated.isDefined,
-        "Unexpected Error: lastUpdated cannot be None.")
-
-      PropertyMap(
-        fields = d.fields,
-        firstUpdated = prop.firstUpdated.get,
-        lastUpdated = prop.lastUpdated.get
-      )
-    }
-  }
-
-  /** Event names that control aggregation: \$set, \$unset, and \$delete */
-  val eventNames = List("$set", "$unset", "$delete")
-
-  private
-  def dataMapAggregator: ((Option[DataMap], Event) => Option[DataMap]) = {
-    (p, e) => {
-      e.event match {
-        case "$set" => {
-          if (p == None) {
-            Some(e.properties)
-          } else {
-            p.map(_ ++ e.properties)
-          }
-        }
-        case "$unset" => {
-          if (p == None) {
-            None
-          } else {
-            p.map(_ -- e.properties.keySet)
-          }
-        }
-        case "$delete" => None
-        case _ => p // do nothing for others
-      }
-    }
-  }
-
-  private
-  def propAggregator: ((Prop, Event) => Prop) = {
-    (p, e) => {
-      e.event match {
-        case "$set" | "$unset" | "$delete" => {
-          Prop(
-            dm = dataMapAggregator(p.dm, e),
-            firstUpdated = p.firstUpdated.map { t =>
-              first(t, e.eventTime)
-            }.orElse(Some(e.eventTime)),
-            lastUpdated = p.lastUpdated.map { t =>
-              last(t, e.eventTime)
-            }.orElse(Some(e.eventTime))
-          )
-        }
-        case _ => p // do nothing for others
-      }
-    }
-  }
-
-  private
-  def first(a: DateTime, b: DateTime): DateTime = if (b.isBefore(a)) b else a
-
-  private
-  def last(a: DateTime, b: DateTime): DateTime = if (b.isAfter(a)) b else a
-
-  private case class Prop(
-    dm: Option[DataMap] = None,
-    firstUpdated: Option[DateTime] = None,
-    lastUpdated: Option[DateTime] = None
-  )
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/LEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/LEvents.scala b/data/src/main/scala/io/prediction/data/storage/LEvents.scala
deleted file mode 100644
index 411f3a4..0000000
--- a/data/src/main/scala/io/prediction/data/storage/LEvents.scala
+++ /dev/null
@@ -1,489 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import io.prediction.annotation.DeveloperApi
-import io.prediction.annotation.Experimental
-
-import scala.concurrent.Future
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-import scala.concurrent.ExecutionContext
-import scala.concurrent.TimeoutException
-
-import org.joda.time.DateTime
-
-/** :: DeveloperApi ::
-  * Base trait of a data access object that directly returns [[Event]] without
-  * going through Spark's parallelization. Engine developers should use
-  * [[io.prediction.data.store.LEventStore]] instead of using this directly.
-  *
-  * @group Event Data
-  */
-@DeveloperApi
-trait LEvents {
-  /** Default timeout for asynchronous operations that is set to 1 minute */
-  val defaultTimeout = Duration(60, "seconds")
-
-  /** :: DeveloperApi ::
-    * Initialize Event Store for an app ID and optionally a channel ID.
-    * This routine is to be called when an app is first created.
-    *
-    * @param appId App ID
-    * @param channelId Optional channel ID
-    * @return true if initialization was successful; false otherwise.
-    */
-  @DeveloperApi
-  def init(appId: Int, channelId: Option[Int] = None): Boolean
-
-  /** :: DeveloperApi ::
-    * Remove Event Store for an app ID and optional channel ID.
-    *
-    * @param appId App ID
-    * @param channelId Optional channel ID
-    * @return true if removal was successful; false otherwise.
-    */
-  @DeveloperApi
-  def remove(appId: Int, channelId: Option[Int] = None): Boolean
-
-  /** :: DeveloperApi ::
-    * Close this Event Store interface object, e.g. close connection, release
-    * resources, etc.
-    */
-  @DeveloperApi
-  def close(): Unit
-
-  /** :: DeveloperApi ::
-    * Insert an [[Event]] in a non-blocking fashion.
-    *
-    * @param event An [[Event]] to be inserted
-    * @param appId App ID for the [[Event]] to be inserted to
-    */
-  @DeveloperApi
-  def futureInsert(event: Event, appId: Int)(implicit ec: ExecutionContext):
-    Future[String] = futureInsert(event, appId, None)
-
-  /** :: DeveloperApi ::
-    * Insert an [[Event]] in a non-blocking fashion.
-    *
-    * @param event An [[Event]] to be inserted
-    * @param appId App ID for the [[Event]] to be inserted to
-    * @param channelId Optional channel ID for the [[Event]] to be inserted to
-    */
-  @DeveloperApi
-  def futureInsert(
-    event: Event, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[String]
-
-  /** :: DeveloperApi ::
-    * Get an [[Event]] in a non-blocking fashion.
-    *
-    * @param eventId ID of the [[Event]]
-    * @param appId ID of the app that contains the [[Event]]
-    */
-  @DeveloperApi
-  def futureGet(eventId: String, appId: Int)(implicit ec: ExecutionContext):
-    Future[Option[Event]] = futureGet(eventId, appId, None)
-
-  /** :: DeveloperApi ::
-    * Get an [[Event]] in a non-blocking fashion.
-    *
-    * @param eventId ID of the [[Event]]
-    * @param appId ID of the app that contains the [[Event]]
-    * @param channelId Optional channel ID that contains the [[Event]]
-    */
-  @DeveloperApi
-  def futureGet(
-      eventId: String,
-      appId: Int,
-      channelId: Option[Int]
-    )(implicit ec: ExecutionContext): Future[Option[Event]]
-
-  /** :: DeveloperApi ::
-    * Delete an [[Event]] in a non-blocking fashion.
-    *
-    * @param eventId ID of the [[Event]]
-    * @param appId ID of the app that contains the [[Event]]
-    */
-  @DeveloperApi
-  def futureDelete(eventId: String, appId: Int)(implicit ec: ExecutionContext):
-    Future[Boolean] = futureDelete(eventId, appId, None)
-
-  /** :: DeveloperApi ::
-    * Delete an [[Event]] in a non-blocking fashion.
-    *
-    * @param eventId ID of the [[Event]]
-    * @param appId ID of the app that contains the [[Event]]
-    * @param channelId Optional channel ID that contains the [[Event]]
-    */
-  @DeveloperApi
-  def futureDelete(
-      eventId: String,
-      appId: Int,
-      channelId: Option[Int]
-    )(implicit ec: ExecutionContext): Future[Boolean]
-
-  /** :: DeveloperApi ::
-    * Reads from database and returns a Future of Iterator of [[Event]]s.
-    *
-    * @param appId return events of this app ID
-    * @param channelId return events of this channel ID (default channel if it's None)
-    * @param startTime return events with eventTime >= startTime
-    * @param untilTime return events with eventTime < untilTime
-    * @param entityType return events of this entityType
-    * @param entityId return events of this entityId
-    * @param eventNames return events with any of these event names.
-    * @param targetEntityType return events of this targetEntityType:
-    *   - None means no restriction on targetEntityType
-    *   - Some(None) means no targetEntityType for this event
-    *   - Some(Some(x)) means targetEntityType should match x.
-    * @param targetEntityId return events of this targetEntityId
-    *   - None means no restriction on targetEntityId
-    *   - Some(None) means no targetEntityId for this event
-    *   - Some(Some(x)) means targetEntityId should match x.
-    * @param limit Limit number of events. Get all events if None or Some(-1)
-    * @param reversed Reverse the order.
-    *   - return oldest events first if None or Some(false) (default)
-    *   - return latest events first if Some(true)
-    * @param ec ExecutionContext
-    * @return Future[Iterator[Event]]
-    */
-  @DeveloperApi
-  def futureFind(
-      appId: Int,
-      channelId: Option[Int] = None,
-      startTime: Option[DateTime] = None,
-      untilTime: Option[DateTime] = None,
-      entityType: Option[String] = None,
-      entityId: Option[String] = None,
-      eventNames: Option[Seq[String]] = None,
-      targetEntityType: Option[Option[String]] = None,
-      targetEntityId: Option[Option[String]] = None,
-      limit: Option[Int] = None,
-      reversed: Option[Boolean] = None
-    )(implicit ec: ExecutionContext): Future[Iterator[Event]]
-
-  /** Aggregate properties of entities based on these special events:
-    * \$set, \$unset, \$delete events.
-    * and returns a Future of Map of entityId to properties.
-    *
-    * @param appId use events of this app ID
-    * @param channelId use events of this channel ID (default channel if it's None)
-    * @param entityType aggregate properties of the entities of this entityType
-    * @param startTime use events with eventTime >= startTime
-    * @param untilTime use events with eventTime < untilTime
-    * @param required only keep entities with these required properties defined
-    * @param ec ExecutionContext
-    * @return Future[Map[String, PropertyMap]]
-    */
-  private[prediction] def futureAggregateProperties(
-    appId: Int,
-    channelId: Option[Int] = None,
-    entityType: String,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    required: Option[Seq[String]] = None)(implicit ec: ExecutionContext):
-    Future[Map[String, PropertyMap]] = {
-      futureFind(
-        appId = appId,
-        channelId = channelId,
-        startTime = startTime,
-        untilTime = untilTime,
-        entityType = Some(entityType),
-        eventNames = Some(LEventAggregator.eventNames)
-      ).map{ eventIt =>
-        val dm = LEventAggregator.aggregateProperties(eventIt)
-        if (required.isDefined) {
-          dm.filter { case (k, v) =>
-            required.get.map(v.contains(_)).reduce(_ && _)
-          }
-        } else dm
-      }
-    }
-
-  /**
-    * :: Experimental ::
-    *
-    * Aggregate properties of the specified entity (entityType + entityId)
-    * based on these special events:
-    * \$set, \$unset, \$delete events.
-    * and returns a Future of Option[PropertyMap]
-    *
-    * @param appId use events of this app ID
-    * @param channelId use events of this channel ID (default channel if it's None)
-    * @param entityType the entityType
-    * @param entityId the entityId
-    * @param startTime use events with eventTime >= startTime
-    * @param untilTime use events with eventTime < untilTime
-    * @param ec ExecutionContext
-    * @return Future[Option[PropertyMap]]
-    */
-  @Experimental
-  private[prediction] def futureAggregatePropertiesOfEntity(
-    appId: Int,
-    channelId: Option[Int] = None,
-    entityType: String,
-    entityId: String,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None)(implicit ec: ExecutionContext):
-    Future[Option[PropertyMap]] = {
-      futureFind(
-        appId = appId,
-        channelId = channelId,
-        startTime = startTime,
-        untilTime = untilTime,
-        entityType = Some(entityType),
-        entityId = Some(entityId),
-        eventNames = Some(LEventAggregator.eventNames)
-      ).map{ eventIt =>
-        LEventAggregator.aggregatePropertiesSingle(eventIt)
-      }
-    }
-
-  // following is blocking
-  private[prediction] def insert(event: Event, appId: Int,
-    channelId: Option[Int] = None,
-    timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
-    String = {
-    Await.result(futureInsert(event, appId, channelId), timeout)
-  }
-
-  private[prediction] def get(eventId: String, appId: Int,
-    channelId: Option[Int] = None,
-    timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
-    Option[Event] = {
-    Await.result(futureGet(eventId, appId, channelId), timeout)
-  }
-
-  private[prediction] def delete(eventId: String, appId: Int,
-    channelId: Option[Int] = None,
-    timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
-    Boolean = {
-    Await.result(futureDelete(eventId, appId, channelId), timeout)
-  }
-
-  /** reads from database and returns events iterator.
-    *
-    * @param appId return events of this app ID
-    * @param channelId return events of this channel ID (default channel if it's None)
-    * @param startTime return events with eventTime >= startTime
-    * @param untilTime return events with eventTime < untilTime
-    * @param entityType return events of this entityType
-    * @param entityId return events of this entityId
-    * @param eventNames return events with any of these event names.
-    * @param targetEntityType return events of this targetEntityType:
-    *   - None means no restriction on targetEntityType
-    *   - Some(None) means no targetEntityType for this event
-    *   - Some(Some(x)) means targetEntityType should match x.
-    * @param targetEntityId return events of this targetEntityId
-    *   - None means no restriction on targetEntityId
-    *   - Some(None) means no targetEntityId for this event
-    *   - Some(Some(x)) means targetEntityId should match x.
-    * @param limit Limit number of events. Get all events if None or Some(-1)
-    * @param reversed Reverse the order (should be used with both
-    *   targetEntityType and targetEntityId specified)
-    *   - return oldest events first if None or Some(false) (default)
-    *   - return latest events first if Some(true)
-    * @param ec ExecutionContext
-    * @return Iterator[Event]
-    */
-  private[prediction] def find(
-    appId: Int,
-    channelId: Option[Int] = None,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    entityType: Option[String] = None,
-    entityId: Option[String] = None,
-    eventNames: Option[Seq[String]] = None,
-    targetEntityType: Option[Option[String]] = None,
-    targetEntityId: Option[Option[String]] = None,
-    limit: Option[Int] = None,
-    reversed: Option[Boolean] = None,
-    timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
-    Iterator[Event] = {
-      Await.result(futureFind(
-        appId = appId,
-        channelId = channelId,
-        startTime = startTime,
-        untilTime = untilTime,
-        entityType = entityType,
-        entityId = entityId,
-        eventNames = eventNames,
-        targetEntityType = targetEntityType,
-        targetEntityId = targetEntityId,
-        limit = limit,
-        reversed = reversed), timeout)
-  }
-
-  // NOTE: remove in next release
-  @deprecated("Use find() instead.", "0.9.2")
-  private[prediction] def findLegacy(
-    appId: Int,
-    channelId: Option[Int] = None,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    entityType: Option[String] = None,
-    entityId: Option[String] = None,
-    eventNames: Option[Seq[String]] = None,
-    targetEntityType: Option[Option[String]] = None,
-    targetEntityId: Option[Option[String]] = None,
-    limit: Option[Int] = None,
-    reversed: Option[Boolean] = None,
-    timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
-    Either[StorageError, Iterator[Event]] = {
-      try {
-        // return Either for legacy usage
-        Right(Await.result(futureFind(
-          appId = appId,
-          channelId = channelId,
-          startTime = startTime,
-          untilTime = untilTime,
-          entityType = entityType,
-          entityId = entityId,
-          eventNames = eventNames,
-          targetEntityType = targetEntityType,
-          targetEntityId = targetEntityId,
-          limit = limit,
-          reversed = reversed), timeout))
-      } catch {
-        case e: TimeoutException => Left(StorageError(s"${e}"))
-        case e: Exception => Left(StorageError(s"${e}"))
-      }
-  }
-
-  /** reads events of the specified entity.
-    *
-    * @param appId return events of this app ID
-    * @param channelId return events of this channel ID (default channel if it's None)
-    * @param entityType return events of this entityType
-    * @param entityId return events of this entityId
-    * @param eventNames return events with any of these event names.
-    * @param targetEntityType return events of this targetEntityType:
-    *   - None means no restriction on targetEntityType
-    *   - Some(None) means no targetEntityType for this event
-    *   - Some(Some(x)) means targetEntityType should match x.
-    * @param targetEntityId return events of this targetEntityId
-    *   - None means no restriction on targetEntityId
-    *   - Some(None) means no targetEntityId for this event
-    *   - Some(Some(x)) means targetEntityId should match x.
-    * @param startTime return events with eventTime >= startTime
-    * @param untilTime return events with eventTime < untilTime
-    * @param limit Limit number of events. Get all events if None or Some(-1)
-    * @param latest Return latest event first (default true)
-    * @param ec ExecutionContext
-    * @return Either[StorageError, Iterator[Event]]
-    */
-  // NOTE: remove this function in next release
-  @deprecated("Use LEventStore.findByEntity() instead.", "0.9.2")
-  def findSingleEntity(
-    appId: Int,
-    channelId: Option[Int] = None,
-    entityType: String,
-    entityId: String,
-    eventNames: Option[Seq[String]] = None,
-    targetEntityType: Option[Option[String]] = None,
-    targetEntityId: Option[Option[String]] = None,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    limit: Option[Int] = None,
-    latest: Boolean = true,
-    timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
-    Either[StorageError, Iterator[Event]] = {
-
-    findLegacy(
-      appId = appId,
-      channelId = channelId,
-      startTime = startTime,
-      untilTime = untilTime,
-      entityType = Some(entityType),
-      entityId = Some(entityId),
-      eventNames = eventNames,
-      targetEntityType = targetEntityType,
-      targetEntityId = targetEntityId,
-      limit = limit,
-      reversed = Some(latest),
-      timeout = timeout)
-
-  }
-
-  /** Aggregate properties of entities based on these special events:
-    * \$set, \$unset, \$delete events.
-    * and returns a Map of entityId to properties.
-    *
-    * @param appId use events of this app ID
-    * @param channelId use events of this channel ID (default channel if it's None)
-    * @param entityType aggregate properties of the entities of this entityType
-    * @param startTime use events with eventTime >= startTime
-    * @param untilTime use events with eventTime < untilTime
-    * @param required only keep entities with these required properties defined
-    * @param ec ExecutionContext
-    * @return Map[String, PropertyMap]
-    */
-  private[prediction] def aggregateProperties(
-    appId: Int,
-    channelId: Option[Int] = None,
-    entityType: String,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    required: Option[Seq[String]] = None,
-    timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
-    Map[String, PropertyMap] = {
-    Await.result(futureAggregateProperties(
-      appId = appId,
-      channelId = channelId,
-      entityType = entityType,
-      startTime = startTime,
-      untilTime = untilTime,
-      required = required), timeout)
-  }
-
-  /**
-    * :: Experimental ::
-    *
-    * Aggregate properties of the specified entity (entityType + entityId)
-    * based on these special events:
-    * \$set, \$unset, \$delete events.
-    * and returns Option[PropertyMap]
-    *
-    * @param appId use events of this app ID
-    * @param channelId use events of this channel ID
-    * @param entityType the entityType
-    * @param entityId the entityId
-    * @param startTime use events with eventTime >= startTime
-    * @param untilTime use events with eventTime < untilTime
-    * @param ec ExecutionContext
-    * @return Future[Option[PropertyMap]]
-    */
-  @Experimental
-  private[prediction] def aggregatePropertiesOfEntity(
-    appId: Int,
-    channelId: Option[Int] = None,
-    entityType: String,
-    entityId: String,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext):
-    Option[PropertyMap] = {
-
-    Await.result(futureAggregatePropertiesOfEntity(
-      appId = appId,
-      channelId = channelId,
-      entityType = entityType,
-      entityId = entityId,
-      startTime = startTime,
-      untilTime = untilTime), timeout)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/Models.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/Models.scala b/data/src/main/scala/io/prediction/data/storage/Models.scala
deleted file mode 100644
index 53a76ff..0000000
--- a/data/src/main/scala/io/prediction/data/storage/Models.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import com.google.common.io.BaseEncoding
-import io.prediction.annotation.DeveloperApi
-import org.json4s._
-
-/** :: DeveloperApi ::
-  * Stores model for each engine instance
-  *
-  * @param id ID of the model, which should be the same as engine instance ID
-  * @param models Trained models of all algorithms
-  * @group Model Data
-  */
-@DeveloperApi
-case class Model(
-  id: String,
-  models: Array[Byte])
-
-/** :: DeveloperApi ::
-  * Base trait for of the [[Model]] data access object
-  *
-  * @group Model Data
-  */
-@DeveloperApi
-trait Models {
-  /** Insert a new [[Model]] */
-  def insert(i: Model): Unit
-
-  /** Get a [[Model]] by ID */
-  def get(id: String): Option[Model]
-
-  /** Delete a [[Model]] */
-  def delete(id: String): Unit
-}
-
-/** :: DeveloperApi ::
-  * JSON4S serializer for [[Model]]
-  *
-  * @group Model Data
-  */
-@DeveloperApi
-class ModelSerializer extends CustomSerializer[Model](
-  format => ({
-    case JObject(fields) =>
-      implicit val formats = DefaultFormats
-      val seed = Model(
-          id = "",
-          models = Array[Byte]())
-      fields.foldLeft(seed) { case (i, field) =>
-        field match {
-          case JField("id", JString(id)) => i.copy(id = id)
-          case JField("models", JString(models)) =>
-            i.copy(models = BaseEncoding.base64.decode(models))
-          case _ => i
-        }
-      }
-  },
-  {
-    case i: Model =>
-      JObject(
-        JField("id", JString(i.id)) ::
-        JField("models", JString(BaseEncoding.base64.encode(i.models))) ::
-        Nil)
-  }
-))

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/PEventAggregator.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/PEventAggregator.scala b/data/src/main/scala/io/prediction/data/storage/PEventAggregator.scala
deleted file mode 100644
index 2430df9..0000000
--- a/data/src/main/scala/io/prediction/data/storage/PEventAggregator.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import org.joda.time.DateTime
-
-import org.json4s.JValue
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-// each JValue data associated with the time it is set
-private[prediction] case class PropTime(val d: JValue, val t: Long)
-    extends Serializable
-
-private[prediction] case class SetProp (
-  val fields: Map[String, PropTime],
-  // last set time. Note: fields could be empty with valid set time
-  val t: Long) extends Serializable {
-
-  def ++ (that: SetProp): SetProp = {
-    val commonKeys = fields.keySet.intersect(that.fields.keySet)
-
-    val common: Map[String, PropTime] = commonKeys.map { k =>
-      val thisData = this.fields(k)
-      val thatData = that.fields(k)
-      // only keep the value with latest time
-      val v = if (thisData.t > thatData.t) thisData else thatData
-      (k, v)
-    }.toMap
-
-    val combinedFields = common ++
-      (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
-
-    // keep the latest set time
-    val combinedT = if (this.t > that.t) this.t else that.t
-
-    SetProp(
-      fields = combinedFields,
-      t = combinedT
-    )
-  }
-}
-
-private[prediction] case class UnsetProp (fields: Map[String, Long])
-    extends Serializable {
-  def ++ (that: UnsetProp): UnsetProp = {
-    val commonKeys = fields.keySet.intersect(that.fields.keySet)
-
-    val common: Map[String, Long] = commonKeys.map { k =>
-      val thisData = this.fields(k)
-      val thatData = that.fields(k)
-      // only keep the value with latest time
-      val v = if (thisData > thatData) thisData else thatData
-      (k, v)
-    }.toMap
-
-    val combinedFields = common ++
-      (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
-
-    UnsetProp(
-      fields = combinedFields
-    )
-  }
-}
-
-private[prediction] case class DeleteEntity (t: Long) extends Serializable {
-  def ++ (that: DeleteEntity): DeleteEntity = {
-    if (this.t > that.t) this else that
-  }
-}
-
-private[prediction] case class EventOp (
-  val setProp: Option[SetProp] = None,
-  val unsetProp: Option[UnsetProp] = None,
-  val deleteEntity: Option[DeleteEntity] = None,
-  val firstUpdated: Option[DateTime] = None,
-  val lastUpdated: Option[DateTime] = None
-) extends Serializable {
-
-  def ++ (that: EventOp): EventOp = {
-    val firstUp = (this.firstUpdated ++ that.firstUpdated).reduceOption{
-      (a, b) => if (b.getMillis < a.getMillis) b else a
-    }
-    val lastUp = (this.lastUpdated ++ that.lastUpdated).reduceOption {
-      (a, b) => if (b.getMillis > a.getMillis) b else a
-    }
-
-    EventOp(
-      setProp = (setProp ++ that.setProp).reduceOption(_ ++ _),
-      unsetProp = (unsetProp ++ that.unsetProp).reduceOption(_ ++ _),
-      deleteEntity = (deleteEntity ++ that.deleteEntity).reduceOption(_ ++ _),
-      firstUpdated = firstUp,
-      lastUpdated = lastUp
-    )
-  }
-
-  def toPropertyMap(): Option[PropertyMap] = {
-    setProp.flatMap { set =>
-
-      val unsetKeys: Set[String] = unsetProp.map( unset =>
-        unset.fields.filter{ case (k, v) => (v >= set.fields(k).t) }.keySet
-      ).getOrElse(Set())
-
-      val combinedFields = deleteEntity.map { delete =>
-        if (delete.t >= set.t) {
-          None
-        } else {
-          val deleteKeys: Set[String] = set.fields
-            .filter { case (k, PropTime(kv, t)) =>
-              (delete.t >= t)
-            }.keySet
-          Some(set.fields -- unsetKeys -- deleteKeys)
-        }
-      }.getOrElse{
-        Some(set.fields -- unsetKeys)
-      }
-
-      // Note: mapValues() doesn't return concrete Map and causes
-      // NotSerializableException issue. Use map(identity) to work around this.
-      // see https://issues.scala-lang.org/browse/SI-7005
-      combinedFields.map{ f =>
-        require(firstUpdated.isDefined,
-          "Unexpected Error: firstUpdated cannot be None.")
-        require(lastUpdated.isDefined,
-          "Unexpected Error: lastUpdated cannot be None.")
-        PropertyMap(
-          fields = f.mapValues(_.d).map(identity),
-          firstUpdated = firstUpdated.get,
-          lastUpdated = lastUpdated.get
-        )
-      }
-    }
-  }
-
-}
-
-private[prediction] object EventOp {
-  // create EventOp from Event object
-  def apply(e: Event): EventOp = {
-    val t = e.eventTime.getMillis
-    e.event match {
-      case "$set" => {
-        val fields = e.properties.fields.mapValues(jv =>
-          PropTime(jv, t)
-        ).map(identity)
-
-        EventOp(
-          setProp = Some(SetProp(fields = fields, t = t)),
-          firstUpdated = Some(e.eventTime),
-          lastUpdated = Some(e.eventTime)
-        )
-      }
-      case "$unset" => {
-        val fields = e.properties.fields.mapValues(jv => t).map(identity)
-        EventOp(
-          unsetProp = Some(UnsetProp(fields = fields)),
-          firstUpdated = Some(e.eventTime),
-          lastUpdated = Some(e.eventTime)
-        )
-      }
-      case "$delete" => {
-        EventOp(
-          deleteEntity = Some(DeleteEntity(t)),
-          firstUpdated = Some(e.eventTime),
-          lastUpdated = Some(e.eventTime)
-        )
-      }
-      case _ => {
-        EventOp()
-      }
-    }
-  }
-}
-
-
-private[prediction] object PEventAggregator {
-
-  val eventNames = List("$set", "$unset", "$delete")
-
-  def aggregateProperties(eventsRDD: RDD[Event]): RDD[(String, PropertyMap)] = {
-    eventsRDD
-      .map( e => (e.entityId, EventOp(e) ))
-      .aggregateByKey[EventOp](EventOp())(
-        // within same partition
-        seqOp = { case (u, v) => u ++ v },
-        // across partition
-        combOp = { case (accu, u) => accu ++ u }
-      )
-      .mapValues(_.toPropertyMap)
-      .filter{ case (k, v) => v.isDefined }
-      .map{ case (k, v) => (k, v.get) }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/PEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/PEvents.scala b/data/src/main/scala/io/prediction/data/storage/PEvents.scala
deleted file mode 100644
index 96a11b8..0000000
--- a/data/src/main/scala/io/prediction/data/storage/PEvents.scala
+++ /dev/null
@@ -1,182 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import grizzled.slf4j.Logger
-import io.prediction.annotation.DeveloperApi
-import io.prediction.annotation.Experimental
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.joda.time.DateTime
-
-import scala.reflect.ClassTag
-
-/** :: DeveloperApi ::
-  * Base trait of a data access object that returns [[Event]] related RDD data
-  * structure. Engine developers should use
-  * [[io.prediction.data.store.PEventStore]] instead of using this directly.
-  *
-  * @group Event Data
-  */
-@DeveloperApi
-trait PEvents extends Serializable {
-  @transient protected lazy val logger = Logger[this.type]
-  @deprecated("Use PEventStore.find() instead.", "0.9.2")
-  def getByAppIdAndTimeAndEntity(appId: Int,
-    startTime: Option[DateTime],
-    untilTime: Option[DateTime],
-    entityType: Option[String],
-    entityId: Option[String])(sc: SparkContext): RDD[Event] = {
-      find(
-        appId = appId,
-        startTime = startTime,
-        untilTime = untilTime,
-        entityType = entityType,
-        entityId = entityId,
-        eventNames = None
-      )(sc)
-    }
-
-  /** :: DeveloperApi ::
-    * Read from database and return the events. The deprecation here is intended
-    * to engine developers only.
-    *
-    * @param appId return events of this app ID
-    * @param channelId return events of this channel ID (default channel if it's None)
-    * @param startTime return events with eventTime >= startTime
-    * @param untilTime return events with eventTime < untilTime
-    * @param entityType return events of this entityType
-    * @param entityId return events of this entityId
-    * @param eventNames return events with any of these event names.
-    * @param targetEntityType return events of this targetEntityType:
-    *   - None means no restriction on targetEntityType
-    *   - Some(None) means no targetEntityType for this event
-    *   - Some(Some(x)) means targetEntityType should match x.
-    * @param targetEntityId return events of this targetEntityId
-    *   - None means no restriction on targetEntityId
-    *   - Some(None) means no targetEntityId for this event
-    *   - Some(Some(x)) means targetEntityId should match x.
-    * @param sc Spark context
-    * @return RDD[Event]
-    */
-  @deprecated("Use PEventStore.find() instead.", "0.9.2")
-  @DeveloperApi
-  def find(
-    appId: Int,
-    channelId: Option[Int] = None,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    entityType: Option[String] = None,
-    entityId: Option[String] = None,
-    eventNames: Option[Seq[String]] = None,
-    targetEntityType: Option[Option[String]] = None,
-    targetEntityId: Option[Option[String]] = None)(sc: SparkContext): RDD[Event]
-
-  /** Aggregate properties of entities based on these special events:
-    * \$set, \$unset, \$delete events. The deprecation here is intended to
-    * engine developers only.
-    *
-    * @param appId use events of this app ID
-    * @param channelId use events of this channel ID (default channel if it's None)
-    * @param entityType aggregate properties of the entities of this entityType
-    * @param startTime use events with eventTime >= startTime
-    * @param untilTime use events with eventTime < untilTime
-    * @param required only keep entities with these required properties defined
-    * @param sc Spark context
-    * @return RDD[(String, PropertyMap)] RDD of entityId and PropertyMap pair
-    */
-  @deprecated("Use PEventStore.aggregateProperties() instead.", "0.9.2")
-  def aggregateProperties(
-    appId: Int,
-    channelId: Option[Int] = None,
-    entityType: String,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    required: Option[Seq[String]] = None)
-    (sc: SparkContext): RDD[(String, PropertyMap)] = {
-    val eventRDD = find(
-      appId = appId,
-      channelId = channelId,
-      startTime = startTime,
-      untilTime = untilTime,
-      entityType = Some(entityType),
-      eventNames = Some(PEventAggregator.eventNames))(sc)
-
-    val dmRDD = PEventAggregator.aggregateProperties(eventRDD)
-
-    required map { r =>
-      dmRDD.filter { case (k, v) =>
-        r.map(v.contains(_)).reduce(_ && _)
-      }
-    } getOrElse dmRDD
-  }
-
-  /** :: Experimental ::
-    * Extract EntityMap[A] from events for the entityType
-    * NOTE: it is local EntityMap[A]
-    */
-  @deprecated("Use PEventStore.aggregateProperties() instead.", "0.9.2")
-  @Experimental
-  def extractEntityMap[A: ClassTag](
-    appId: Int,
-    entityType: String,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    required: Option[Seq[String]] = None)
-    (sc: SparkContext)(extract: DataMap => A): EntityMap[A] = {
-    val idToData: Map[String, A] = aggregateProperties(
-      appId = appId,
-      entityType = entityType,
-      startTime = startTime,
-      untilTime = untilTime,
-      required = required
-    )(sc).map{ case (id, dm) =>
-      try {
-        (id, extract(dm))
-      } catch {
-        case e: Exception => {
-          logger.error(s"Failed to get extract entity from DataMap $dm of " +
-            s"entityId $id.", e)
-          throw e
-        }
-      }
-    }.collectAsMap.toMap
-
-    new EntityMap(idToData)
-  }
-
-  /** :: DeveloperApi ::
-    * Write events to database
-    *
-    * @param events RDD of Event
-    * @param appId the app ID
-    * @param sc Spark Context
-    */
-  @DeveloperApi
-  def write(events: RDD[Event], appId: Int)(sc: SparkContext): Unit =
-    write(events, appId, None)(sc)
-
-  /** :: DeveloperApi ::
-    * Write events to database
-    *
-    * @param events RDD of Event
-    * @param appId the app ID
-    * @param channelId  channel ID (default channel if it's None)
-    * @param sc Spark Context
-    */
-  @DeveloperApi
-  def write(events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/PropertyMap.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/PropertyMap.scala b/data/src/main/scala/io/prediction/data/storage/PropertyMap.scala
deleted file mode 100644
index bc55fd3..0000000
--- a/data/src/main/scala/io/prediction/data/storage/PropertyMap.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package io.prediction.data.storage
-
-import org.joda.time.DateTime
-
-import org.json4s.JValue
-import org.json4s.JObject
-import org.json4s.native.JsonMethods.parse
-
-/** A PropertyMap stores aggregated properties of the entity.
-  * Internally it is a Map
-  * whose keys are property names and values are corresponding JSON values
-  * respectively. Use the get() method to retrieve the value of mandatory
-  * property or use getOpt() to retrieve the value of the optional property.
-  *
-  * @param fields Map of property name to JValue
-  * @param firstUpdated first updated time of this PropertyMap
-  * @param lastUpdated last updated time of this PropertyMap
-  */
-class PropertyMap(
-  fields: Map[String, JValue],
-  val firstUpdated: DateTime,
-  val lastUpdated: DateTime
-) extends DataMap(fields) {
-
-  override
-  def toString: String = s"PropertyMap(${fields}, ${firstUpdated}, ${lastUpdated})"
-
-  override
-  def hashCode: Int =
-    41 * (
-      41 * (
-        41 + fields.hashCode
-      ) + firstUpdated.hashCode
-    ) + lastUpdated.hashCode
-
-  override
-  def equals(other: Any): Boolean = other match {
-    case that: PropertyMap => {
-      (that.canEqual(this)) &&
-      (super.equals(that)) &&
-      (this.firstUpdated.equals(that.firstUpdated)) &&
-      (this.lastUpdated.equals(that.lastUpdated))
-    }
-    case that: DataMap => { // for testing purpose
-      super.equals(that)
-    }
-    case _ => false
-  }
-
-  override
-  def canEqual(other: Any): Boolean = other.isInstanceOf[PropertyMap]
-}
-
-/** Companion object of the [[PropertyMap]] class. */
-object PropertyMap {
-
-  /** Create an PropertyMap from a Map of String to JValue,
-    * firstUpdated and lastUpdated time.
-    *
-    * @param fields a Map of String to JValue
-    * @param firstUpdated First updated time
-    * @param lastUpdated Last updated time
-    * @return a new PropertyMap
-    */
-  def apply(fields: Map[String, JValue],
-    firstUpdated: DateTime, lastUpdated: DateTime): PropertyMap =
-    new PropertyMap(fields, firstUpdated, lastUpdated)
-
-  /** Create an PropertyMap from a JSON String and firstUpdated and lastUpdated
-    * time.
-    * @param js JSON String. eg """{ "a": 1, "b": "foo" }"""
-    * @param firstUpdated First updated time
-    * @param lastUpdated Last updated time
-    * @return a new PropertyMap
-    */
-  def apply(js: String, firstUpdated: DateTime, lastUpdated: DateTime)
-  : PropertyMap = apply(
-      fields = parse(js).asInstanceOf[JObject].obj.toMap,
-      firstUpdated = firstUpdated,
-      lastUpdated = lastUpdated
-    )
-}


Mime
View raw message