predictionio-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From don...@apache.org
Subject [01/10] incubator-predictionio git commit: Add support for Elasticsearch 5.x
Date Sat, 11 Feb 2017 22:00:08 GMT
Repository: incubator-predictionio
Updated Branches:
  refs/heads/feature/es5 [created] c64941b6e


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
new file mode 100644
index 0000000..0e3eec8
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.MapWritable
+import org.apache.hadoop.io.Text
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.PEvents
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.elasticsearch.client.RestClient
+import org.elasticsearch.hadoop.mr.EsInputFormat
+import org.elasticsearch.spark._
+import org.joda.time.DateTime
+import java.io.IOException
+import org.apache.http.util.EntityUtils
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.entity.ContentType
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.ext.JodaTimeSerializers
+
+
+class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
+    extends PEvents {
+  implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
+
+  // client is not used.
+  try client.close() catch {
+    case e: Exception =>
+      logger.error("Failed to close client.", e)
+  }
+
+  def getEsType(appId: Int, channelId: Option[Int] = None): String = {
+    channelId.map { ch =>
+      s"${appId}_${ch}"
+    }.getOrElse {
+      s"${appId}"
+    }
+  }
+
+  def getESNodes(): String = {
+    val hosts = config.properties.get("HOSTS").
+      map(_.split(",").toSeq).getOrElse(Seq("localhost"))
+    val ports = config.properties.get("PORTS").
+      map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200))
+    val schemes = config.properties.get("SCHEMES").
+      map(_.split(",").toSeq).getOrElse(Seq("http"))
+    (hosts, ports, schemes).zipped.map(
+      (h, p, s) => s"$s://$h:$p").mkString(",")
+  }
+
+  override def find(
+    appId: Int,
+    channelId: Option[Int] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None)(sc: SparkContext): RDD[Event] = {
+
+    val query = ESUtils.createEventQuery(
+      startTime, untilTime, entityType, entityId,
+      eventNames, targetEntityType, targetEntityId, None)
+
+    val estype = getEsType(appId, channelId)
+    val conf = new Configuration()
+    conf.set("es.resource", s"$index/$estype")
+    conf.set("es.query", query)
+    conf.set("es.nodes", getESNodes())
+
+    val rdd = sc.newAPIHadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]],
+      classOf[Text], classOf[MapWritable]).map {
+        case (key, doc) => {
+          ESEventsUtil.resultToEvent(key, doc, appId)
+        }
+      }
+
+    rdd
+  }
+
+  override def write(
+    events: RDD[Event],
+    appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
+    val estype = getEsType(appId, channelId)
+    events.map { event =>
+      ESEventsUtil.eventToPut(event, appId)
+    }.saveToEs(s"$index/$estype")
+  }
+
+  override def delete(
+    eventIds: RDD[String],
+    appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
+    val estype = getEsType(appId, channelId)
+    val restClient = ESUtils.createRestClient(config)
+    try {
+      eventIds.foreachPartition { iter =>
+        iter.foreach { eventId =>
+          try {
+            val json =
+              ("query" ->
+                ("term" ->
+                  ("eventId" -> eventId)))
+            val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
+            val response = client.performRequest(
+              "POST",
+              s"/$index/$estype/_delete_by_query",
+              Map.empty[String, String].asJava)
+            val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+            val result = (jsonResponse \ "result").extract[String]
+            result match {
+              case "deleted" => true
+              case _ =>
+                logger.error(s"[$result] Failed to update $index/$estype:$eventId")
+                false
+            }
+          } catch {
+            case e: IOException =>
+              logger.error(s"Failed to update $index/$estype:$eventId", e)
+              false
+          }
+        }
+      }
+    } finally {
+      restClient.close()
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
index 5c9e170..c067f3a 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
@@ -15,50 +15,57 @@
  * limitations under the License.
  */
 
-
 package org.apache.predictionio.data.storage.elasticsearch
 
-import grizzled.slf4j.Logging
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.http.Header
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
 import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.json4s.JsonDSL._
+import org.apache.predictionio.data.storage.StorageClientException
+import org.elasticsearch.client.RestClient
 import org.json4s._
+import org.json4s.JsonDSL._
 import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.write
+
+import grizzled.slf4j.Logging
 
-class ESSequences(client: Client, config: StorageClientConfig, index: String) extends Logging
{
+class ESSequences(client: RestClient, config: StorageClientConfig, index: String) extends
Logging {
   implicit val formats = DefaultFormats
   private val estype = "sequences"
 
-  val indices = client.admin.indices
-  val indexExistResponse = indices.prepareExists(index).get
-  if (!indexExistResponse.isExists) {
-    // val settingsJson =
-    //   ("number_of_shards" -> 1) ~
-    //   ("auto_expand_replicas" -> "0-all")
-    indices.prepareCreate(index).get
-  }
-  val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get
-  if (!typeExistResponse.isExists) {
-    val mappingJson =
-      (estype ->
-        ("_source" -> ("enabled" -> 0)) ~
-        ("_all" -> ("enabled" -> 0)) ~
-        ("_type" -> ("index" -> "no")) ~
-        ("enabled" -> 0))
-    indices.preparePutMapping(index).setType(estype).
-      setSource(compact(render(mappingJson))).get
-  }
+  ESUtils.createIndex(client, index)
+  val mappingJson =
+    (estype ->
+      ("_all" -> ("enabled" -> 0)))
+  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
 
   def genNext(name: String): Int = {
     try {
-      val response = client.prepareIndex(index, estype, name).
-        setSource(compact(render("n" -> name))).get
-      response.getVersion().toInt
+      val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON)
+      val response = client.performRequest(
+        "POST",
+        s"/$index/$estype/$name",
+        Map.empty[String, String].asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "created" =>
+          (jsonResponse \ "_version").extract[Int]
+        case "updated" =>
+          (jsonResponse \ "_version").extract[Int]
+        case _ =>
+          throw new IllegalStateException(s"[$result] Failed to update $index/$estype/$name")
+      }
     } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        0
+      case e: IOException =>
+        throw new StorageClientException(s"Failed to update $index/$estype/$name", e)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
index f5c99bf..68e3f57 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
@@ -15,34 +15,151 @@
  * limitations under the License.
  */
 
-
 package org.apache.predictionio.data.storage.elasticsearch
 
-import org.elasticsearch.action.search.SearchRequestBuilder
-import org.elasticsearch.client.Client
-import org.elasticsearch.common.unit.TimeValue
-import org.json4s.Formats
-import org.json4s.native.Serialization.read
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
-import scala.collection.mutable.ArrayBuffer
+import org.apache.http.entity.ContentType
+import org.apache.http.entity.StringEntity
+import org.apache.http.nio.entity.NStringEntity
+import org.elasticsearch.client.RestClient
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.apache.http.util.EntityUtils
+import org.joda.time.DateTime
+import org.joda.time.format.DateTimeFormat
+import org.joda.time.DateTimeZone
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.http.HttpHost
 
 object ESUtils {
-  val scrollLife = new TimeValue(60000)
+  val scrollLife = "1m"
 
-  def getAll[T : Manifest](
-      client: Client,
-      builder: SearchRequestBuilder)(
+  def getAll[T: Manifest](
+    client: RestClient,
+    index: String,
+    estype: String,
+    query: String)(
       implicit formats: Formats): Seq[T] = {
-    val results = ArrayBuffer[T]()
-    var response = builder.setScroll(scrollLife).get
-    var hits = response.getHits().hits()
-    results ++= hits.map(h => read[T](h.getSourceAsString))
-    while (hits.size > 0) {
-      response = client.prepareSearchScroll(response.getScrollId).
-        setScroll(scrollLife).get
-      hits = response.getHits().hits()
-      results ++= hits.map(h => read[T](h.getSourceAsString))
+
+    @scala.annotation.tailrec
+    def scroll(scrollId: String, hits: Seq[JValue], results: Seq[T]): Seq[T] = {
+      if (hits.isEmpty) results
+      else {
+        val json = ("scroll" -> scrollLife) ~ ("scroll_id" -> scrollId)
+        val scrollBody = new StringEntity(compact(render(json)))
+        val response = client.performRequest(
+          "POST",
+          "/_search/scroll",
+          Map[String, String](),
+          scrollBody)
+        val responseJValue = parse(EntityUtils.toString(response.getEntity))
+        scroll((responseJValue \ "_scroll_id").extract[String],
+          (responseJValue \ "hits" \ "hits").extract[Seq[JValue]],
+          hits.map(h => (h \ "_source").extract[T]) ++ results)
+      }
     }
-    results
+
+    val response = client.performRequest(
+      "POST",
+      s"/$index/$estype/_search",
+      Map("scroll" -> scrollLife),
+      new StringEntity(query))
+    val responseJValue = parse(EntityUtils.toString(response.getEntity))
+    scroll((responseJValue \ "_scroll_id").extract[String],
+        (responseJValue \ "hits" \ "hits").extract[Seq[JValue]],
+        Nil)
+  }
+
+  def createIndex(
+    client: RestClient,
+    index: String): Unit = {
+    client.performRequest(
+      "HEAD",
+      s"/$index",
+      Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
+        case 404 =>
+          client.performRequest(
+            "PUT",
+            s"/$index",
+            Map.empty[String, String].asJava)
+        case 200 =>
+        case _ =>
+          throw new IllegalStateException(s"/$index is invalid.")
+      }
+  }
+
+  def createMapping(
+    client: RestClient,
+    index: String,
+    estype: String,
+    json: String): Unit = {
+    client.performRequest(
+      "HEAD",
+      s"/$index/_mapping/$estype",
+      Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
+        case 404 =>
+          val entity = new NStringEntity(json, ContentType.APPLICATION_JSON)
+          client.performRequest(
+            "PUT",
+            s"/$index/_mapping/$estype",
+            Map.empty[String, String].asJava,
+            entity)
+        case 200 =>
+        case _ =>
+          throw new IllegalStateException(s"/$index/$estype is invalid: $json")
+      }
+  }
+
+  def createEventQuery(
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    reversed: Option[Boolean] = None): String = {
+    val mustQueries = Seq(
+      startTime.map(x => {
+        val v = DateTimeFormat
+          .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC))
+        s"""{"range":{"eventTime":{"gte":"${v}"}}}"""
+      }),
+      untilTime.map(x => {
+        val v = DateTimeFormat
+          .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC))
+        s"""{"range":{"eventTime":{"gte":"${v}"}}}"""
+      }),
+      entityType.map(x => s"""{"term":{"entityType":"${x}"}}"""),
+      entityId.map(x => s"""{"term":{"entityId":"${x}"}}"""),
+      targetEntityType.flatMap(xx => xx.map(x => s"""{"term":{"targetEntityType":"${x}"}}""")),
+      targetEntityId.flatMap(xx => xx.map(x => s"""{"term":{"targetEntityId":"${x}"}}""")),
+      eventNames
+        .map { xx => xx.map(x => "\"%s\"".format(x)) }
+        .map(x => s"""{"terms":{"event":[${x.mkString(",")}]}}""")).flatten.mkString(",")
+    val sortOrder = reversed.map(x => x match {
+      case true => "desc"
+      case _ => "asc"
+    })
+    s"""{
+       |"query":{"bool":{"must":[${mustQueries}]}},
+       |"sort":[{"eventTime":{"order":"${sortOrder}"}}]
+       |}""".stripMargin
+  }
+
+  def createRestClient(config: StorageClientConfig): RestClient = {
+    val hosts = config.properties.get("HOSTS").
+      map(_.split(",").toSeq).getOrElse(Seq("localhost"))
+    val ports = config.properties.get("PORTS").
+      map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200))
+    val schemes = config.properties.get("SCHEMES").
+      map(_.split(",").toSeq).getOrElse(Seq("http"))
+    val httpHosts = (hosts, ports, schemes).zipped.map(
+      (h, p, s) => new HttpHost(h, p, s))
+    RestClient.builder(httpHosts: _*).build()
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
index 75ac2b0..912d467 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
@@ -15,36 +15,24 @@
  * limitations under the License.
  */
 
-
 package org.apache.predictionio.data.storage.elasticsearch
 
 import grizzled.slf4j.Logging
 import org.apache.predictionio.data.storage.BaseStorageClient
 import org.apache.predictionio.data.storage.StorageClientConfig
 import org.apache.predictionio.data.storage.StorageClientException
-import org.elasticsearch.client.transport.TransportClient
-import org.elasticsearch.common.settings.ImmutableSettings
-import org.elasticsearch.common.transport.InetSocketTransportAddress
-import org.elasticsearch.transport.ConnectTransportException
+import java.net.InetAddress
+import org.elasticsearch.client.RestClient
+import org.apache.http.HttpHost
 
 class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
     with Logging {
   override val prefix = "ES"
+
   val client = try {
-    val hosts = config.properties.get("HOSTS").
-      map(_.split(",").toSeq).getOrElse(Seq("localhost"))
-    val ports = config.properties.get("PORTS").
-      map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9300))
-    val settings = ImmutableSettings.settingsBuilder()
-      .put("cluster.name", config.properties.getOrElse("CLUSTERNAME", "elasticsearch"))
-    val transportClient = new TransportClient(settings)
-    (hosts zip ports) foreach { hp =>
-      transportClient.addTransportAddress(
-        new InetSocketTransportAddress(hp._1, hp._2))
-    }
-    transportClient
+    ESUtils.createRestClient(config)
   } catch {
-    case e: ConnectTransportException =>
+    case e: Throwable =>
       throw new StorageClientException(e.getMessage, e)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/tools/build.sbt
----------------------------------------------------------------------
diff --git a/tools/build.sbt b/tools/build.sbt
index fefdb45..4e2b266 100644
--- a/tools/build.sbt
+++ b/tools/build.sbt
@@ -42,6 +42,7 @@ dependencyOverrides +=   "org.slf4j" % "slf4j-log4j12" % "1.7.18"
 assemblyMergeStrategy in assembly := {
   case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat
   case PathList("META-INF", "NOTICE.txt")  => MergeStrategy.concat
+  case PathList("org", "joda", "time", "base", "BaseDateTime.class") => MergeStrategy.first
   case x =>
     val oldStrategy = (assemblyMergeStrategy in assembly).value
     oldStrategy(x)
@@ -62,7 +63,8 @@ excludedJars in assembly <<= (fullClasspath in assembly) map { cp
=>
 assemblyShadeRules in assembly := Seq(
   ShadeRule.rename("org.objenesis.**" -> "shadeio.@1").inLibrary("com.esotericsoftware.kryo"
% "kryo" % "2.21").inProject,
   ShadeRule.rename("com.esotericsoftware.reflectasm.**" -> "shadeio.@1").inLibrary("com.esotericsoftware.kryo"
% "kryo" % "2.21").inProject,
-  ShadeRule.rename("com.esotericsoftware.minlog.**" -> "shadeio.@1").inLibrary("com.esotericsoftware.kryo"
% "kryo" % "2.21").inProject
+  ShadeRule.rename("com.esotericsoftware.minlog.**" -> "shadeio.@1").inLibrary("com.esotericsoftware.kryo"
% "kryo" % "2.21").inProject,
+  ShadeRule.rename("org.apache.http.**" -> "shadeio.http.@1").inAll
 )
 
 // skip test in assembly


Mime
View raw message