predictionio-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From don...@apache.org
Subject [13/50] [abbrv] incubator-predictionio git commit: [PIO-49] Add support for Elasticsearch 5
Date Tue, 25 Apr 2017 22:11:36 GMT
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala b/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala
deleted file mode 100644
index b453820..0000000
--- a/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.view
-
-import org.apache.predictionio.data.storage.hbase.HBPEvents
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.EventValidation
-import org.apache.predictionio.data.storage.DataMap
-import org.apache.predictionio.data.storage.Storage
-
-import org.joda.time.DateTime
-
-import org.json4s.JValue
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-
-// each JValue data associated with the time it is set
-private[predictionio] case class PropTime(val d: JValue, val t: Long) extends Serializable
-
-private[predictionio] case class SetProp (
-  val fields: Map[String, PropTime],
-  // last set time. Note: fields could be empty with valid set time
-  val t: Long) extends Serializable {
-
-  def ++ (that: SetProp): SetProp = {
-    val commonKeys = fields.keySet.intersect(that.fields.keySet)
-
-    val common: Map[String, PropTime] = commonKeys.map { k =>
-      val thisData = this.fields(k)
-      val thatData = that.fields(k)
-      // only keep the value with latest time
-      val v = if (thisData.t > thatData.t) thisData else thatData
-      (k, v)
-    }.toMap
-
-    val combinedFields = common ++
-      (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
-
-    // keep the latest set time
-    val combinedT = if (this.t > that.t) this.t else that.t
-
-    SetProp(
-      fields = combinedFields,
-      t = combinedT
-    )
-  }
-}
-
-private[predictionio] case class UnsetProp (fields: Map[String, Long]) extends Serializable {
-  def ++ (that: UnsetProp): UnsetProp = {
-    val commonKeys = fields.keySet.intersect(that.fields.keySet)
-
-    val common: Map[String, Long] = commonKeys.map { k =>
-      val thisData = this.fields(k)
-      val thatData = that.fields(k)
-      // only keep the value with latest time
-      val v = if (thisData > thatData) thisData else thatData
-      (k, v)
-    }.toMap
-
-    val combinedFields = common ++
-      (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
-
-    UnsetProp(
-      fields = combinedFields
-    )
-  }
-}
-
-private[predictionio] case class DeleteEntity (t: Long) extends Serializable {
-  def ++ (that: DeleteEntity): DeleteEntity = {
-    if (this.t > that.t) this else that
-  }
-}
-
-private[predictionio] case class EventOp (
-  val setProp: Option[SetProp] = None,
-  val unsetProp: Option[UnsetProp] = None,
-  val deleteEntity: Option[DeleteEntity] = None
-) extends Serializable {
-
-  def ++ (that: EventOp): EventOp = {
-    EventOp(
-      setProp = (setProp ++ that.setProp).reduceOption(_ ++ _),
-      unsetProp = (unsetProp ++ that.unsetProp).reduceOption(_ ++ _),
-      deleteEntity = (deleteEntity ++ that.deleteEntity).reduceOption(_ ++ _)
-    )
-  }
-
-  def toDataMap(): Option[DataMap] = {
-    setProp.flatMap { set =>
-
-      val unsetKeys: Set[String] = unsetProp.map( unset =>
-        unset.fields.filter{ case (k, v) => (v >= set.fields(k).t) }.keySet
-      ).getOrElse(Set())
-
-      val combinedFields = deleteEntity.map { delete =>
-        if (delete.t >= set.t) {
-          None
-        } else {
-          val deleteKeys: Set[String] = set.fields
-            .filter { case (k, PropTime(kv, t)) =>
-              (delete.t >= t)
-            }.keySet
-          Some(set.fields -- unsetKeys -- deleteKeys)
-        }
-      }.getOrElse{
-        Some(set.fields -- unsetKeys)
-      }
-
-      // Note: mapValues() doesn't return concrete Map and causes
-      // NotSerializableException issue. Use map(identity) to work around this.
-      // see https://issues.scala-lang.org/browse/SI-7005
-      combinedFields.map(f => DataMap(f.mapValues(_.d).map(identity)))
-    }
-  }
-
-}
-
-private[predictionio] object EventOp {
-  def apply(e: Event): EventOp = {
-    val t = e.eventTime.getMillis
-    e.event match {
-      case "$set" => {
-        val fields = e.properties.fields.mapValues(jv =>
-          PropTime(jv, t)
-        ).map(identity)
-
-        EventOp(
-          setProp = Some(SetProp(fields = fields, t = t))
-        )
-      }
-      case "$unset" => {
-        val fields = e.properties.fields.mapValues(jv => t).map(identity)
-        EventOp(
-          unsetProp = Some(UnsetProp(fields = fields))
-        )
-      }
-      case "$delete" => {
-        EventOp(
-          deleteEntity = Some(DeleteEntity(t))
-        )
-      }
-      case _ => {
-        EventOp()
-      }
-    }
-  }
-}
-
-@deprecated("Use PEvents or PEventStore instead.", "0.9.2")
-class PBatchView(
-  val appId: Int,
-  val startTime: Option[DateTime],
-  val untilTime: Option[DateTime],
-  val sc: SparkContext) {
-
-  // NOTE: parallel Events DB interface
-  @transient lazy val eventsDb = Storage.getPEvents()
-
-  @transient lazy val _events: RDD[Event] =
-    eventsDb.getByAppIdAndTimeAndEntity(
-      appId = appId,
-      startTime = startTime,
-      untilTime = untilTime,
-      entityType = None,
-      entityId = None)(sc)
-
-  // TODO: change to use EventSeq?
-  @transient lazy val events: RDD[Event] = _events
-
-  def aggregateProperties(
-    entityType: String,
-    startTimeOpt: Option[DateTime] = None,
-    untilTimeOpt: Option[DateTime] = None
-  ): RDD[(String, DataMap)] = {
-
-    _events
-      .filter( e => ((e.entityType == entityType) &&
-        (EventValidation.isSpecialEvents(e.event))) )
-      .map( e => (e.entityId, EventOp(e) ))
-      .aggregateByKey[EventOp](EventOp())(
-        // within same partition
-        seqOp = { case (u, v) => u ++ v },
-        // across partition
-        combOp = { case (accu, u) => accu ++ u }
-      )
-      .mapValues(_.toDataMap)
-      .filter{ case (k, v) => v.isDefined }
-      .map{ case (k, v) => (k, v.get) }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/make-distribution.sh
----------------------------------------------------------------------
diff --git a/make-distribution.sh b/make-distribution.sh
index b6c8ed3..31954c0 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -27,13 +27,15 @@ VERSION=$(grep version ${FWDIR}/build.sbt | grep ThisBuild | grep -o '".*"' | se
 echo "Building binary distribution for PredictionIO $VERSION..."
 
 cd ${FWDIR}
-sbt/sbt common/publishLocal data/publishLocal core/publishLocal e2/publishLocal tools/assembly
+sbt/sbt common/publishLocal data/publishLocal core/publishLocal e2/publishLocal dataElasticsearch1/assembly dataElasticsearch/assembly dataHbase/assembly dataHdfs/assembly dataJdbc/assembly dataLocalfs/assembly tools/assembly
 
 cd ${FWDIR}
 rm -rf ${DISTDIR}
 mkdir -p ${DISTDIR}/bin
 mkdir -p ${DISTDIR}/conf
 mkdir -p ${DISTDIR}/lib
+mkdir -p ${DISTDIR}/lib/spark
+mkdir -p ${DISTDIR}/lib/extra
 mkdir -p ${DISTDIR}/project
 mkdir -p ${DISTDIR}/sbt
 
@@ -42,6 +44,8 @@ cp ${FWDIR}/conf/* ${DISTDIR}/conf
 cp ${FWDIR}/project/build.properties ${DISTDIR}/project
 cp ${FWDIR}/sbt/sbt ${DISTDIR}/sbt
 cp ${FWDIR}/assembly/*assembly*jar ${DISTDIR}/lib
+cp ${FWDIR}/assembly/spark/*jar ${DISTDIR}/lib/spark
+cp ${FWDIR}/assembly/extra/*jar ${DISTDIR}/lib/extra
 
 rm -f ${DISTDIR}/lib/*javadoc.jar
 rm -f ${DISTDIR}/lib/*sources.jar

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/.gitignore
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/.gitignore b/storage/elasticsearch/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/storage/elasticsearch/.gitignore
@@ -0,0 +1 @@
+/bin/

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/build.sbt
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/build.sbt b/storage/elasticsearch/build.sbt
new file mode 100644
index 0000000..b22cbd8
--- /dev/null
+++ b/storage/elasticsearch/build.sbt
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+name := "apache-predictionio-data-elasticsearch"
+
+elasticsearchVersion := "5.2.1"
+
+libraryDependencies ++= Seq(
+  "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided",
+  "org.apache.predictionio" %% "apache-predictionio-data" % version.value % "provided",
+  "org.apache.spark"        %% "spark-core"     % sparkVersion.value % "provided",
+  "org.apache.spark"        %% "spark-sql"      % sparkVersion.value % "provided",
+  "org.elasticsearch.client" % "rest"           % elasticsearchVersion.value,
+  "org.elasticsearch"       %% "elasticsearch-spark-13" % elasticsearchVersion.value
+    exclude("org.apache.spark", "spark-sql_2.10")
+    exclude("org.apache.spark", "spark-streaming_2.10"),
+  "org.elasticsearch"        % "elasticsearch-hadoop-mr" % elasticsearchVersion.value,
+  "org.scalatest"           %% "scalatest"      % "2.1.7" % "test",
+  "org.specs2"              %% "specs2"         % "2.3.13" % "test")
+
+parallelExecution in Test := false
+
+pomExtra := childrenPomExtra.value
+
+assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false, includeDependency = true)
+
+assemblyMergeStrategy in assembly := {
+  case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat
+  case PathList("META-INF", "NOTICE.txt")  => MergeStrategy.concat
+  case x =>
+    val oldStrategy = (assemblyMergeStrategy in assembly).value
+    oldStrategy(x)
+}
+
+assemblyShadeRules in assembly := Seq(
+  ShadeRule.rename("org.apache.http.**" -> "shadeio.data.http.@1").inAll
+)
+
+// skip test in assembly
+test in assembly := {}
+
+outputPath in assembly := baseDirectory.value.getAbsoluteFile.getParentFile.getParentFile / "assembly" / "spark" / ("pio-data-elasticsearch-assembly-" + version.value + ".jar")
+

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
new file mode 100644
index 0000000..cb6d330
--- /dev/null
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import java.io.IOException
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.predictionio.data.storage.AccessKey
+import org.apache.predictionio.data.storage.AccessKeys
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.RestClient
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.write
+
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+/** Elasticsearch implementation of AccessKeys. */
+class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String)
+    extends AccessKeys with Logging {
+  implicit val formats = DefaultFormats.lossless
+  private val estype = "accesskeys"
+
+  val restClient = client.open()
+  try {
+    ESUtils.createIndex(restClient, index)
+    val mappingJson =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)) ~
+        ("properties" ->
+          ("key" -> ("type" -> "keyword")) ~
+          ("events" -> ("type" -> "keyword"))))
+    ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
+  } finally {
+    restClient.close()
+  }
+
+  def insert(accessKey: AccessKey): Option[String] = {
+    val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
+    update(accessKey.copy(key = key))
+    Some(key)
+  }
+
+  def get(id: String): Option[AccessKey] = {
+    if (id.isEmpty) {
+      return None
+    }
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "found").extract[Boolean] match {
+        case true =>
+          Some((jsonResponse \ "_source").extract[AccessKey])
+        case _ =>
+          None
+      }
+    } catch {
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => None
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            None
+        }
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/$id", e)
+        None
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getAll(): Seq[AccessKey] = {
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("match_all" -> List.empty))
+      ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json)))
+    } catch {
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getByAppid(appid: Int): Seq[AccessKey] = {
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("term" ->
+            ("appid" -> appid)))
+      ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json)))
+    } catch {
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def update(accessKey: AccessKey): Unit = {
+    val id = accessKey.key
+    val restClient = client.open()
+    try {
+      val entity = new NStringEntity(write(accessKey), ContentType.APPLICATION_JSON)
+      val response = restClient.performRequest(
+        "POST",
+        s"/$index/$estype/$id",
+        Map("refresh" -> "true").asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "created" =>
+        case "updated" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def delete(id: String): Unit = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "DELETE",
+        s"/$index/$estype/$id",
+        Map("refresh" -> "true").asJava)
+      val json = parse(EntityUtils.toString(response.getEntity))
+      val result = (json \ "result").extract[String]
+      result match {
+        case "deleted" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/id", e)
+    } finally {
+      restClient.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
new file mode 100644
index 0000000..abea2b8
--- /dev/null
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import java.io.IOException
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.predictionio.data.storage.App
+import org.apache.predictionio.data.storage.Apps
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.RestClient
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.write
+
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+/** Elasticsearch implementation of Items. */
+class ESApps(client: ESClient, config: StorageClientConfig, index: String)
+    extends Apps with Logging {
+  implicit val formats = DefaultFormats.lossless
+  private val estype = "apps"
+  private val seq = new ESSequences(client, config, index)
+
+  val restClient = client.open()
+  try {
+    ESUtils.createIndex(restClient, index)
+    val mappingJson =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)) ~
+        ("properties" ->
+          ("id" -> ("type" -> "keyword")) ~
+          ("name" -> ("type" -> "keyword"))))
+    ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
+  } finally {
+    restClient.close()
+  }
+
+  def insert(app: App): Option[Int] = {
+    val id =
+      if (app.id == 0) {
+        var roll = seq.genNext(estype)
+        while (!get(roll).isEmpty) roll = seq.genNext(estype)
+        roll
+      } else app.id
+    update(app.copy(id = id))
+    Some(id)
+  }
+
+  def get(id: Int): Option[App] = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "found").extract[Boolean] match {
+        case true =>
+          Some((jsonResponse \ "_source").extract[App])
+        case _ =>
+          None
+      }
+    } catch {
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => None
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            None
+        }
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/$id", e)
+        None
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getByName(name: String): Option[App] = {
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("term" ->
+            ("name" -> name)))
+      val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
+      val response = restClient.performRequest(
+        "POST",
+        s"/$index/$estype/_search",
+        Map.empty[String, String].asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "hits" \ "total").extract[Long] match {
+        case 0 => None
+        case _ =>
+          val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]]
+          val result = (results.head \ "_source").extract[App]
+          Some(result)
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/_search", e)
+        None
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getAll(): Seq[App] = {
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("match_all" -> List.empty))
+      ESUtils.getAll[App](restClient, index, estype, compact(render(json)))
+    } catch {
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def update(app: App): Unit = {
+    val id = app.id.toString
+    val restClient = client.open()
+    try {
+      val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON);
+      val response = restClient.performRequest(
+        "POST",
+        s"/$index/$estype/$id",
+        Map("refresh" -> "true").asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "created" =>
+        case "updated" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def delete(id: Int): Unit = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "DELETE",
+        s"/$index/$estype/$id",
+        Map("refresh" -> "true").asJava)
+      val json = parse(EntityUtils.toString(response.getEntity))
+      val result = (json \ "result").extract[String]
+      result match {
+        case "deleted" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/id", e)
+    } finally {
+      restClient.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
new file mode 100644
index 0000000..f092cc7
--- /dev/null
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import java.io.IOException
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.predictionio.data.storage.Channel
+import org.apache.predictionio.data.storage.Channels
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.RestClient
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.write
+
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
+    extends Channels with Logging {
+  implicit val formats = DefaultFormats.lossless
+  private val estype = "channels"
+  private val seq = new ESSequences(client, config, index)
+
+  val restClient = client.open()
+  try {
+    ESUtils.createIndex(restClient, index)
+    val mappingJson =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)) ~
+        ("properties" ->
+          ("name" -> ("type" -> "keyword"))))
+    ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
+  } finally {
+    restClient.close()
+  }
+
+  def insert(channel: Channel): Option[Int] = {
+    val id =
+      if (channel.id == 0) {
+        var roll = seq.genNext(estype)
+        while (!get(roll).isEmpty) roll = seq.genNext(estype)
+        roll
+      } else channel.id
+
+    if (update(channel.copy(id = id))) Some(id) else None
+  }
+
+  def get(id: Int): Option[Channel] = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "found").extract[Boolean] match {
+        case true =>
+          Some((jsonResponse \ "_source").extract[Channel])
+        case _ =>
+          None
+      }
+    } catch {
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => None
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            None
+        }
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/$id", e)
+        None
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getByAppid(appid: Int): Seq[Channel] = {
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("term" ->
+            ("appid" -> appid)))
+      ESUtils.getAll[Channel](restClient, index, estype, compact(render(json)))
+    } catch {
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/_search", e)
+        Nil
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def update(channel: Channel): Boolean = {
+    val id = channel.id.toString
+    val restClient = client.open()
+    try {
+      val entity = new NStringEntity(write(channel), ContentType.APPLICATION_JSON)
+      val response = restClient.performRequest(
+        "POST",
+        s"/$index/$estype/$id",
+        Map("refresh" -> "true").asJava,
+        entity)
+      val json = parse(EntityUtils.toString(response.getEntity))
+      val result = (json \ "result").extract[String]
+      result match {
+        case "created" => true
+        case "updated" => true
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+          false
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
+        false
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def delete(id: Int): Unit = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "DELETE",
+        s"/$index/$estype/$id",
+        Map("refresh" -> "true").asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "deleted" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
new file mode 100644
index 0000000..4dbacb7
--- /dev/null
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import java.io.IOException
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.predictionio.data.storage.EngineInstance
+import org.apache.predictionio.data.storage.EngineInstanceSerializer
+import org.apache.predictionio.data.storage.EngineInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.RestClient
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.write
+
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: String)
+    extends EngineInstances with Logging {
+  implicit val formats = DefaultFormats + new EngineInstanceSerializer
+  private val estype = "engine_instances"
+
+  val restClient = client.open()
+  try {
+    ESUtils.createIndex(restClient, index)
+    val mappingJson =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)) ~
+        ("properties" ->
+          ("status" -> ("type" -> "keyword")) ~
+          ("startTime" -> ("type" -> "date")) ~
+          ("endTime" -> ("type" -> "date")) ~
+          ("engineId" -> ("type" -> "keyword")) ~
+          ("engineVersion" -> ("type" -> "keyword")) ~
+          ("engineVariant" -> ("type" -> "keyword")) ~
+          ("engineFactory" -> ("type" -> "keyword")) ~
+          ("batch" -> ("type" -> "keyword")) ~
+          ("dataSourceParams" -> ("type" -> "keyword")) ~
+          ("preparatorParams" -> ("type" -> "keyword")) ~
+          ("algorithmsParams" -> ("type" -> "keyword")) ~
+          ("servingParams" -> ("type" -> "keyword")) ~
+          ("status" -> ("type" -> "keyword"))))
+    ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
+  } finally {
+    restClient.close()
+  }
+
+  def insert(i: EngineInstance): String = {
+    val id = i.id match {
+      case x if x.isEmpty =>
+        @scala.annotation.tailrec
+        def generateId(newId: Option[String]): String = {
+          newId match {
+            case Some(x) => x
+            case _ => generateId(preInsert())
+          }
+        }
+        generateId(preInsert())
+      case x => x
+    }
+
+    update(i.copy(id = id))
+    id
+  }
+
+  def preInsert(): Option[String] = {
+    val restClient = client.open()
+    try {
+      val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON)
+      val response = restClient.performRequest(
+        "POST",
+        s"/$index/$estype/",
+        Map("refresh" -> "true").asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "created" =>
+          Some((jsonResponse \ "_id").extract[String])
+        case _ =>
+          error(s"[$result] Failed to create $index/$estype")
+          None
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to create $index/$estype", e)
+        None
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def get(id: String): Option[EngineInstance] = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "found").extract[Boolean] match {
+        case true =>
+          Some((jsonResponse \ "_source").extract[EngineInstance])
+        case _ =>
+          None
+      }
+    } catch {
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => None
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            None
+        }
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/$id", e)
+        None
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getAll(): Seq[EngineInstance] = {
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("match_all" -> List.empty))
+      ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json)))
+    } catch {
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getCompleted(
+    engineId: String,
+    engineVersion: String,
+    engineVariant: String): Seq[EngineInstance] = {
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("bool" ->
+            ("must" -> List(
+              ("term" ->
+                ("status" -> "COMPLETED")),
+              ("term" ->
+                ("engineId" -> engineId)),
+              ("term" ->
+                ("engineVersion" -> engineVersion)),
+              ("term" ->
+                ("engineVariant" -> engineVariant)))))) ~
+              ("sort" -> List(
+                ("startTime" ->
+                  ("order" -> "desc"))))
+      ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json)))
+    } catch {
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/_search", e)
+        Nil
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getLatestCompleted(
+    engineId: String,
+    engineVersion: String,
+    engineVariant: String): Option[EngineInstance] =
+    getCompleted(
+      engineId,
+      engineVersion,
+      engineVariant).headOption
+
+  def update(i: EngineInstance): Unit = {
+    val id = i.id
+    val restClient = client.open()
+    try {
+      val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
+      val response = restClient.performRequest(
+        "POST",
+        s"/$index/$estype/$id",
+        Map("refresh" -> "true").asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "created" =>
+        case "updated" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def delete(id: String): Unit = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "DELETE",
+        s"/$index/$estype/$id",
+        Map("refresh" -> "true").asJava)
+      val json = parse(EntityUtils.toString(response.getEntity))
+      val result = (json \ "result").extract[String]
+      result match {
+        case "deleted" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
new file mode 100644
index 0000000..5bdc0fb
--- /dev/null
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.predictionio.data.storage.EvaluationInstance
+import org.apache.predictionio.data.storage.EvaluationInstanceSerializer
+import org.apache.predictionio.data.storage.EvaluationInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.StorageClientException
+import org.elasticsearch.client.RestClient
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.write
+
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index: String)
+    extends EvaluationInstances with Logging {
+  implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
+  private val estype = "evaluation_instances"
+  private val seq = new ESSequences(client, config, index)
+
+  val restClient = client.open()
+  try {
+    ESUtils.createIndex(restClient, index)
+    val mappingJson =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)) ~
+        ("properties" ->
+          ("status" -> ("type" -> "keyword")) ~
+          ("startTime" -> ("type" -> "date")) ~
+          ("endTime" -> ("type" -> "date")) ~
+          ("evaluationClass" -> ("type" -> "keyword")) ~
+          ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~
+          ("batch" -> ("type" -> "keyword")) ~
+          ("evaluatorResults" -> ("type" -> "text") ~ ("index" -> "no")) ~
+          ("evaluatorResultsHTML" -> ("type" -> "text") ~ ("index" -> "no")) ~
+          ("evaluatorResultsJSON" -> ("type" -> "text") ~ ("index" -> "no"))))
+    ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
+  } finally {
+    restClient.close()
+  }
+
+  def insert(i: EvaluationInstance): String = {
+    val id = i.id match {
+      case x if x.isEmpty =>
+        var roll = seq.genNext(estype).toString
+        while (!get(roll).isEmpty) roll = seq.genNext(estype).toString
+        roll
+      case x => x
+    }
+
+    update(i.copy(id = id))
+    id
+  }
+
+  def get(id: String): Option[EvaluationInstance] = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "found").extract[Boolean] match {
+        case true =>
+          Some((jsonResponse \ "_source").extract[EvaluationInstance])
+        case _ =>
+          None
+      }
+    } catch {
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => None
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            None
+        }
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/$id", e)
+        None
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getAll(): Seq[EvaluationInstance] = {
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("match_all" -> List.empty))
+      ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json)))
+    } catch {
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getCompleted(): Seq[EvaluationInstance] = {
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("term" ->
+            ("status" -> "EVALCOMPLETED"))) ~
+            ("sort" ->
+              ("startTime" ->
+                ("order" -> "desc")))
+      ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json)))
+    } catch {
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def update(i: EvaluationInstance): Unit = {
+    val id = i.id
+    val restClient = client.open()
+    try {
+      val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
+      val response = restClient.performRequest(
+        "POST",
+        s"/$index/$estype/$id",
+        Map("refresh" -> "true").asJava,
+        entity)
+      val json = parse(EntityUtils.toString(response.getEntity))
+      val result = (json \ "result").extract[String]
+      result match {
+        case "created" =>
+        case "updated" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def delete(id: String): Unit = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "DELETE",
+        s"/$index/$estype/$id",
+        Map("refresh" -> "true").asJava)
+      val json = parse(EntityUtils.toString(response.getEntity))
+      val result = (json \ "result").extract[String]
+      result match {
+        case "deleted" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
new file mode 100644
index 0000000..56f47ab
--- /dev/null
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import org.apache.hadoop.io.DoubleWritable
+import org.apache.hadoop.io.LongWritable
+import org.apache.hadoop.io.MapWritable
+import org.apache.hadoop.io.Text
+import org.apache.predictionio.data.storage.DataMap
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventValidation
+import org.joda.time.DateTime
+import org.joda.time.DateTimeZone
+import org.json4s._
+
+object ESEventsUtil {
+
+  implicit val formats = DefaultFormats
+
+  def resultToEvent(id: Text, result: MapWritable, appId: Int): Event = {
+
+    def getStringCol(col: String): String = {
+      val r = result.get(new Text(col)).asInstanceOf[Text]
+      require(r != null,
+        s"Failed to get value for column ${col}. " +
+          s"StringBinary: ${r.getBytes()}.")
+
+      r.toString()
+    }
+
+    def getOptStringCol(col: String): Option[String] = {
+      val r = result.get(new Text(col))
+      if (r == null) {
+        None
+      } else {
+        Some(r.asInstanceOf[Text].toString())
+      }
+    }
+
+    val tmp = result
+      .get(new Text("properties")).asInstanceOf[MapWritable]
+      .get(new Text("fields")).asInstanceOf[MapWritable]
+      .get(new Text("rating"))
+
+    val rating =
+      if (tmp.isInstanceOf[DoubleWritable]) tmp.asInstanceOf[DoubleWritable]
+      else if (tmp.isInstanceOf[LongWritable]) {
+        new DoubleWritable(tmp.asInstanceOf[LongWritable].get().toDouble)
+      }
+      else null
+
+    val properties: DataMap =
+      if (rating != null) DataMap(s"""{"rating":${rating.get().toString}}""")
+      else DataMap()
+
+
+    val eventId = Some(getStringCol("eventId"))
+    val event = getStringCol("event")
+    val entityType = getStringCol("entityType")
+    val entityId = getStringCol("entityId")
+    val targetEntityType = getOptStringCol("targetEntityType")
+    val targetEntityId = getOptStringCol("targetEntityId")
+    val prId = getOptStringCol("prId")
+    val eventTimeZone = getOptStringCol("eventTimeZone")
+      .map(DateTimeZone.forID(_))
+      .getOrElse(EventValidation.defaultTimeZone)
+    val eventTime = new DateTime(
+      getStringCol("eventTime"), eventTimeZone)
+    val creationTimeZone = getOptStringCol("creationTimeZone")
+      .map(DateTimeZone.forID(_))
+      .getOrElse(EventValidation.defaultTimeZone)
+    val creationTime: DateTime = new DateTime(
+      getStringCol("creationTime"), creationTimeZone)
+
+
+    Event(
+      eventId = eventId,
+      event = event,
+      entityType = entityType,
+      entityId = entityId,
+      targetEntityType = targetEntityType,
+      targetEntityId = targetEntityId,
+      properties = properties,
+      eventTime = eventTime,
+      tags = Seq(),
+      prId = prId,
+      creationTime = creationTime
+    )
+  }
+
+  def eventToPut(event: Event, appId: Int): Map[String, Any] = {
+    Map(
+      "eventId" -> event.eventId,
+      "event" -> event.event,
+      "entityType" -> event.entityType,
+      "entityId" -> event.entityId,
+      "targetEntityType" -> event.targetEntityType,
+      "targetEntityId" -> event.targetEntityId,
+      "properties" -> event.properties.toJObject,
+      "eventTime" -> event.eventTime.toString,
+      "tags" -> event.tags,
+      "prId" -> event.prId,
+      "creationTime" -> event.creationTime.toString
+    )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
new file mode 100644
index 0000000..fdd370a
--- /dev/null
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.LEvents
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.RestClient
+import org.joda.time.DateTime
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+import org.json4s.ext.JodaTimeSerializers
+
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+import org.apache.http.entity.StringEntity
+
+class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String)
+    extends LEvents with Logging {
+  implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
+  private val seq = new ESSequences(client, config, index)
+  private val seqName = "events"
+
+  def getEsType(appId: Int, channelId: Option[Int] = None): String = {
+    channelId.map { ch =>
+      s"${appId}_${ch}"
+    }.getOrElse {
+      s"${appId}"
+    }
+  }
+
+  override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
+    val estype = getEsType(appId, channelId)
+    val restClient = client.open()
+    try {
+      ESUtils.createIndex(restClient, index)
+      val json =
+        (estype ->
+          ("_all" -> ("enabled" -> 0)) ~
+          ("properties" ->
+            ("name" -> ("type" -> "keyword")) ~
+            ("eventId" -> ("type" -> "keyword")) ~
+            ("event" -> ("type" -> "keyword")) ~
+            ("entityType" -> ("type" -> "keyword")) ~
+            ("entityId" -> ("type" -> "keyword")) ~
+            ("targetEntityType" -> ("type" -> "keyword")) ~
+            ("targetEntityId" -> ("type" -> "keyword")) ~
+            ("properties" ->
+              ("type" -> "nested") ~
+              ("properties" ->
+                ("fields" -> ("type" -> "nested") ~
+                  ("properties" ->
+                    ("user" -> ("type" -> "long")) ~
+                    ("num" -> ("type" -> "long")))))) ~
+                    ("eventTime" -> ("type" -> "date")) ~
+                    ("tags" -> ("type" -> "keyword")) ~
+                    ("prId" -> ("type" -> "keyword")) ~
+                    ("creationTime" -> ("type" -> "date"))))
+      ESUtils.createMapping(restClient, index, estype, compact(render(json)))
+    } finally {
+      restClient.close()
+    }
+    true
+  }
+
+  override def remove(appId: Int, channelId: Option[Int] = None): Boolean = {
+    val estype = getEsType(appId, channelId)
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("match_all" -> List.empty))
+      val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
+      restClient.performRequest(
+        "POST",
+        s"/$index/$estype/_delete_by_query",
+        Map("refresh" -> "true").asJava,
+        entity).getStatusLine.getStatusCode match {
+          case 200 => true
+          case _ =>
+            error(s"Failed to remove $index/$estype")
+            false
+        }
+    } catch {
+      case e: Exception =>
+        error(s"Failed to remove $index/$estype", e)
+        false
+    } finally {
+      restClient.close()
+    }
+  }
+
+  override def close(): Unit = {
+    // nothing
+  }
+
+  override def futureInsert(
+    event: Event,
+    appId: Int,
+    channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = {
+    Future {
+      val estype = getEsType(appId, channelId)
+      val restClient = client.open()
+      try {
+        val id = event.eventId.getOrElse {
+          var roll = seq.genNext(seqName)
+          while (exists(restClient, estype, roll)) roll = seq.genNext(seqName)
+          roll.toString
+        }
+        val json = write(event.copy(eventId = Some(id)))
+        val entity = new NStringEntity(json, ContentType.APPLICATION_JSON);
+        val response = restClient.performRequest(
+          "POST",
+          s"/$index/$estype/$id",
+          Map("refresh" -> "true").asJava,
+          entity)
+        val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+        val result = (jsonResponse \ "result").extract[String]
+        result match {
+          case "created" => id
+          case "updated" => id
+          case _ =>
+            error(s"[$result] Failed to update $index/$estype/$id")
+            ""
+        }
+      } catch {
+        case e: IOException =>
+          error(s"Failed to update $index/$estype/<id>", e)
+          ""
+      } finally {
+        restClient.close()
+      }
+    }
+  }
+
+  private def exists(restClient: RestClient, estype: String, id: Int): Boolean = {
+    try {
+      restClient.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
+          case 200 => true
+          case _ => false
+        }
+    } catch {
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => false
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            false
+        }
+      case e: IOException =>
+        error(s"Failed to access to $index/$estype/$id", e)
+        false
+    }
+  }
+
+  override def futureGet(
+    eventId: String,
+    appId: Int,
+    channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = {
+    Future {
+      val estype = getEsType(appId, channelId)
+      val restClient = client.open()
+      try {
+        val json =
+          ("query" ->
+            ("term" ->
+              ("eventId" -> eventId)))
+        val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
+        val response = restClient.performRequest(
+          "POST",
+          s"/$index/$estype/_search",
+          Map.empty[String, String].asJava,
+          entity)
+        val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+        (jsonResponse \ "hits" \ "total").extract[Long] match {
+          case 0 => None
+          case _ =>
+            val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]]
+            val result = (results.head \ "_source").extract[Event]
+            Some(result)
+        }
+      } catch {
+        case e: IOException =>
+          error("Failed to access to /$index/$estype/_search", e)
+          None
+      } finally {
+        restClient.close()
+      }
+    }
+  }
+
+  override def futureDelete(
+    eventId: String,
+    appId: Int,
+    channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = {
+    Future {
+      val estype = getEsType(appId, channelId)
+      val restClient = client.open()
+      try {
+        val json =
+          ("query" ->
+            ("term" ->
+              ("eventId" -> eventId)))
+        val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
+        val response = restClient.performRequest(
+          "POST",
+          s"/$index/$estype/_delete_by_query",
+          Map("refresh" -> "true").asJava)
+        val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+        val result = (jsonResponse \ "result").extract[String]
+        result match {
+          case "deleted" => true
+          case _ =>
+            error(s"[$result] Failed to update $index/$estype:$eventId")
+            false
+        }
+      } catch {
+        case e: IOException =>
+          error(s"Failed to update $index/$estype:$eventId", e)
+          false
+      } finally {
+        restClient.close()
+      }
+    }
+  }
+
+  override def futureFind(
+    appId: Int,
+    channelId: Option[Int] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    limit: Option[Int] = None,
+    reversed: Option[Boolean] = None)
+    (implicit ec: ExecutionContext): Future[Iterator[Event]] = {
+    Future {
+      val estype = getEsType(appId, channelId)
+      val restClient = client.open()
+      try {
+        val query = ESUtils.createEventQuery(
+          startTime, untilTime, entityType, entityId,
+          eventNames, targetEntityType, targetEntityId, reversed)
+        limit.getOrElse(20) match {
+          case -1 => ESUtils.getAll[Event](restClient, index, estype, query).toIterator
+          case size => ESUtils.get[Event](restClient, index, estype, query, size).toIterator
+        }
+      } catch {
+        case e: IOException =>
+          error(e.getMessage)
+          Iterator[Event]()
+      } finally {
+        restClient.close()
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
new file mode 100644
index 0000000..390e78c
--- /dev/null
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.MapWritable
+import org.apache.hadoop.io.Text
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.PEvents
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.elasticsearch.client.RestClient
+import org.elasticsearch.hadoop.mr.EsInputFormat
+import org.elasticsearch.spark._
+import org.joda.time.DateTime
+import java.io.IOException
+import org.apache.http.util.EntityUtils
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.entity.ContentType
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.ext.JodaTimeSerializers
+
+
+class ESPEvents(client: ESClient, config: StorageClientConfig, index: String)
+    extends PEvents {
+  implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
+
+  def getEsType(appId: Int, channelId: Option[Int] = None): String = {
+    channelId.map { ch =>
+      s"${appId}_${ch}"
+    }.getOrElse {
+      s"${appId}"
+    }
+  }
+
+  def getESNodes(): String = {
+    val hosts = config.properties.get("HOSTS").
+      map(_.split(",").toSeq).getOrElse(Seq("localhost"))
+    val ports = config.properties.get("PORTS").
+      map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200))
+    (hosts, ports).zipped.map(
+      (h, p) => s"$h:$p").mkString(",")
+  }
+
+  override def find(
+    appId: Int,
+    channelId: Option[Int] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None)(sc: SparkContext): RDD[Event] = {
+
+    val query = ESUtils.createEventQuery(
+      startTime, untilTime, entityType, entityId,
+      eventNames, targetEntityType, targetEntityId, None)
+
+    val estype = getEsType(appId, channelId)
+    val conf = new Configuration()
+    conf.set("es.resource", s"$index/$estype")
+    conf.set("es.query", query)
+    conf.set("es.nodes", getESNodes())
+
+    val rdd = sc.newAPIHadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]],
+      classOf[Text], classOf[MapWritable]).map {
+        case (key, doc) => {
+          ESEventsUtil.resultToEvent(key, doc, appId)
+        }
+      }
+
+    rdd
+  }
+
+  override def write(
+    events: RDD[Event],
+    appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
+    val estype = getEsType(appId, channelId)
+    val conf = Map("es.resource" -> s"$index/$estype", "es.nodes" -> getESNodes())
+    events.map { event =>
+      ESEventsUtil.eventToPut(event, appId)
+    }.saveToEs(conf)
+  }
+
+  override def delete(
+    eventIds: RDD[String],
+    appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
+    val estype = getEsType(appId, channelId)
+    val restClient = client.open()
+    try {
+      eventIds.foreachPartition { iter =>
+        iter.foreach { eventId =>
+          try {
+            val json =
+              ("query" ->
+                ("term" ->
+                  ("eventId" -> eventId)))
+            val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
+            val response = restClient.performRequest(
+              "POST",
+              s"/$index/$estype/_delete_by_query",
+              Map("refresh" -> "true").asJava)
+            val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+            val result = (jsonResponse \ "result").extract[String]
+            result match {
+              case "deleted" => true
+              case _ =>
+                logger.error(s"[$result] Failed to update $index/$estype:$eventId")
+                false
+            }
+          } catch {
+            case e: IOException =>
+              logger.error(s"Failed to update $index/$estype:$eventId", e)
+              false
+          }
+        }
+      }
+    } finally {
+      restClient.close()
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
new file mode 100644
index 0000000..e5264ae
--- /dev/null
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.http.Header
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.StorageClientException
+import org.elasticsearch.client.RestClient
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.write
+
+import grizzled.slf4j.Logging
+
+class ESSequences(client: ESClient, config: StorageClientConfig, index: String) extends Logging {
+  implicit val formats = DefaultFormats
+  private val estype = "sequences"
+
+  val restClient = client.open()
+  try {
+    ESUtils.createIndex(restClient, index)
+    val mappingJson =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)))
+    ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
+  } finally {
+    restClient.close()
+  }
+
+  def genNext(name: String): Int = {
+    val restClient = client.open()
+    try {
+      val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON)
+      val response = restClient.performRequest(
+        "POST",
+        s"/$index/$estype/$name",
+        Map("refresh" -> "true").asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "created" =>
+          (jsonResponse \ "_version").extract[Int]
+        case "updated" =>
+          (jsonResponse \ "_version").extract[Int]
+        case _ =>
+          throw new IllegalStateException(s"[$result] Failed to update $index/$estype/$name")
+      }
+    } catch {
+      case e: IOException =>
+        throw new StorageClientException(s"Failed to update $index/$estype/$name", e)
+    } finally {
+      restClient.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
new file mode 100644
index 0000000..72f4dd6
--- /dev/null
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+import org.apache.http.entity.ContentType
+import org.apache.http.entity.StringEntity
+import org.apache.http.nio.entity.NStringEntity
+import org.elasticsearch.client.RestClient
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.apache.http.util.EntityUtils
+import org.joda.time.DateTime
+import org.joda.time.format.DateTimeFormat
+import org.joda.time.DateTimeZone
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.http.HttpHost
+
+object ESUtils {
+  val scrollLife = "1m"
+
+  def get[T: Manifest](
+    client: RestClient,
+    index: String,
+    estype: String,
+    query: String,
+    size: Int)(
+      implicit formats: Formats): Seq[T] = {
+    val response = client.performRequest(
+      "POST",
+      s"/$index/$estype/_search",
+      Map("size" -> s"${size}"),
+      new StringEntity(query))
+    val responseJValue = parse(EntityUtils.toString(response.getEntity))
+    val hits = (responseJValue \ "hits" \ "hits").extract[Seq[JValue]]
+    hits.map(h => (h \ "_source").extract[T])
+  }
+
+  def getAll[T: Manifest](
+    client: RestClient,
+    index: String,
+    estype: String,
+    query: String)(
+      implicit formats: Formats): Seq[T] = {
+
+    @scala.annotation.tailrec
+    def scroll(scrollId: String, hits: Seq[JValue], results: Seq[T]): Seq[T] = {
+      if (hits.isEmpty) results
+      else {
+        val json = ("scroll" -> scrollLife) ~ ("scroll_id" -> scrollId)
+        val scrollBody = new StringEntity(compact(render(json)))
+        val response = client.performRequest(
+          "POST",
+          "/_search/scroll",
+          Map[String, String](),
+          scrollBody)
+        val responseJValue = parse(EntityUtils.toString(response.getEntity))
+        scroll((responseJValue \ "_scroll_id").extract[String],
+          (responseJValue \ "hits" \ "hits").extract[Seq[JValue]],
+          hits.map(h => (h \ "_source").extract[T]) ++ results)
+      }
+    }
+
+    val response = client.performRequest(
+      "POST",
+      s"/$index/$estype/_search",
+      Map("scroll" -> scrollLife),
+      new StringEntity(query))
+    val responseJValue = parse(EntityUtils.toString(response.getEntity))
+    scroll((responseJValue \ "_scroll_id").extract[String],
+        (responseJValue \ "hits" \ "hits").extract[Seq[JValue]],
+        Nil)
+  }
+
+  def createIndex(
+    client: RestClient,
+    index: String): Unit = {
+    client.performRequest(
+      "HEAD",
+      s"/$index",
+      Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
+        case 404 =>
+          client.performRequest(
+            "PUT",
+            s"/$index",
+            Map.empty[String, String].asJava)
+        case 200 =>
+        case _ =>
+          throw new IllegalStateException(s"/$index is invalid.")
+      }
+  }
+
+  def createMapping(
+    client: RestClient,
+    index: String,
+    estype: String,
+    json: String): Unit = {
+    client.performRequest(
+      "HEAD",
+      s"/$index/_mapping/$estype",
+      Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
+        case 404 =>
+          val entity = new NStringEntity(json, ContentType.APPLICATION_JSON)
+          client.performRequest(
+            "PUT",
+            s"/$index/_mapping/$estype",
+            Map.empty[String, String].asJava,
+            entity)
+        case 200 =>
+        case _ =>
+          throw new IllegalStateException(s"/$index/$estype is invalid: $json")
+      }
+  }
+
+  def createEventQuery(
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    reversed: Option[Boolean] = None): String = {
+    val mustQueries = Seq(
+      startTime.map(x => {
+        val v = DateTimeFormat
+          .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC))
+        s"""{"range":{"eventTime":{"gte":"${v}"}}}"""
+      }),
+      untilTime.map(x => {
+        val v = DateTimeFormat
+          .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC))
+        s"""{"range":{"eventTime":{"lt":"${v}"}}}"""
+      }),
+      entityType.map(x => s"""{"term":{"entityType":"${x}"}}"""),
+      entityId.map(x => s"""{"term":{"entityId":"${x}"}}"""),
+      targetEntityType.flatMap(xx => xx.map(x => s"""{"term":{"targetEntityType":"${x}"}}""")),
+      targetEntityId.flatMap(xx => xx.map(x => s"""{"term":{"targetEntityId":"${x}"}}""")),
+      eventNames
+        .map { xx => xx.map(x => "\"%s\"".format(x)) }
+        .map(x => s"""{"terms":{"event":[${x.mkString(",")}]}}""")).flatten.mkString(",")
+    val query = mustQueries.isEmpty match {
+      case true => """query":{"match_all":{}}"""
+      case _ => s"""query":{"bool":{"must":[${mustQueries}]}}"""
+    }
+    val sortOrder = reversed.map(x => x match {
+      case true => "desc"
+      case _ => "asc"
+    }).getOrElse("asc")
+    s"""{
+       |"${query},
+       |"sort":[{"eventTime":{"order":"${sortOrder}"}}]
+       |}""".stripMargin
+  }
+
+  def getHttpHosts(config: StorageClientConfig): Seq[HttpHost] = {
+    val hosts = config.properties.get("HOSTS").
+      map(_.split(",").toSeq).getOrElse(Seq("localhost"))
+    val ports = config.properties.get("PORTS").
+      map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200))
+    val schemes = config.properties.get("SCHEMES").
+      map(_.split(",").toSeq).getOrElse(Seq("http"))
+    (hosts, ports, schemes).zipped.map((h, p, s) => new HttpHost(h, p, s))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
new file mode 100644
index 0000000..647d180
--- /dev/null
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import org.apache.http.HttpHost
+import org.apache.predictionio.data.storage.BaseStorageClient
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.StorageClientException
+import org.elasticsearch.client.RestClient
+
+import grizzled.slf4j.Logging
+
+case class ESClient(hosts: Seq[HttpHost]) {
+  def open(): RestClient = {
+    try {
+      RestClient.builder(hosts: _*).build()
+    } catch {
+      case e: Throwable =>
+        throw new StorageClientException(e.getMessage, e)
+    }
+  }
+}
+
+class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
+    with Logging {
+  override val prefix = "ES"
+
+  val client = ESClient(ESUtils.getHttpHosts(config))
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala
new file mode 100644
index 0000000..fdc3b48
--- /dev/null
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage
+
+/** Elasticsearch implementation of storage traits, supporting meta data only
+  *
+  * @group Implementation
+  */
+package elasticsearch {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/test/resources/application.conf b/storage/elasticsearch/src/test/resources/application.conf
new file mode 100644
index 0000000..eecae44
--- /dev/null
+++ b/storage/elasticsearch/src/test/resources/application.conf
@@ -0,0 +1,28 @@
+org.apache.predictionio.data.storage {
+  sources {
+    mongodb {
+      type = mongodb
+      hosts = [localhost]
+      ports = [27017]
+    }
+    elasticsearch {
+      type = elasticsearch
+      hosts = [localhost]
+      ports = [9300]
+    }
+  }
+  repositories {
+    # This section is dummy just to make storage happy.
+    # The actual testing will not bypass these repository settings completely.
+    # Please refer to StorageTestUtils.scala.
+    settings {
+      name = "test_predictionio"
+      source = mongodb
+    }
+
+    appdata {
+      name = "test_predictionio_appdata"
+      source = mongodb
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/.gitignore
----------------------------------------------------------------------
diff --git a/storage/elasticsearch1/.gitignore b/storage/elasticsearch1/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/storage/elasticsearch1/.gitignore
@@ -0,0 +1 @@
+/bin/

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/build.sbt
----------------------------------------------------------------------
diff --git a/storage/elasticsearch1/build.sbt b/storage/elasticsearch1/build.sbt
new file mode 100644
index 0000000..8c29b84
--- /dev/null
+++ b/storage/elasticsearch1/build.sbt
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+name := "apache-predictionio-data-elasticsearch1"
+
+elasticsearchVersion := "1.7.3"
+
+libraryDependencies ++= Seq(
+  "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided",
+  "org.apache.predictionio" %% "apache-predictionio-data" % version.value % "provided",
+  "org.elasticsearch"        % "elasticsearch"  % elasticsearchVersion.value,
+  "org.scalatest"           %% "scalatest"      % "2.1.7" % "test",
+  "org.specs2"              %% "specs2"         % "2.3.13" % "test")
+
+parallelExecution in Test := false
+
+pomExtra := childrenPomExtra.value
+
+assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false, includeDependency = true)
+
+assemblyMergeStrategy in assembly := {
+  case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat
+  case PathList("META-INF", "NOTICE.txt")  => MergeStrategy.concat
+  case x =>
+    val oldStrategy = (assemblyMergeStrategy in assembly).value
+    oldStrategy(x)
+}
+
+// skip test in assembly
+test in assembly := {}
+
+outputPath in assembly := baseDirectory.value.getAbsoluteFile.getParentFile.getParentFile / "assembly" / "extra" / ("pio-data-elasticsearch1-assembly-" + version.value + ".jar")
+


Mime
View raw message