predictionio-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shimam...@apache.org
Subject [predictionio] branch develop updated: [PIO-203] Fixes pio status warnings in ES storage (#507)
Date Mon, 21 Jan 2019 05:21:41 GMT
This is an automated email from the ASF dual-hosted git repository.

shimamoto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/predictionio.git


The following commit(s) were added to refs/heads/develop by this push:
     new 52eb306  [PIO-203] Fixes pio status warnings in ES storage (#507)
52eb306 is described below

commit 52eb306bfba86fd1c732f7f36d28c24dbfffed2f
Author: takako shimamoto <chibochibo@gmail.com>
AuthorDate: Mon Jan 21 14:21:34 2019 +0900

    [PIO-203] Fixes pio status warnings in ES storage (#507)
---
 .../data/storage/elasticsearch/ESAccessKeys.scala  |  5 +--
 .../data/storage/elasticsearch/ESApps.scala        |  8 ++---
 .../data/storage/elasticsearch/ESChannels.scala    |  7 ++--
 .../storage/elasticsearch/ESEngineInstances.scala  | 42 +++++++++++-----------
 .../elasticsearch/ESEvaluationInstances.scala      |  8 ++---
 .../data/storage/elasticsearch/ESLEvents.scala     |  5 +--
 .../data/storage/elasticsearch/ESSequences.scala   |  6 +---
 .../data/storage/elasticsearch/ESUtils.scala       | 20 ++---------
 8 files changed, 31 insertions(+), 70 deletions(-)

diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
index 15f223f..eef83e4 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -42,12 +42,9 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index:
Strin
   private val estype = "accesskeys"
   private val internalIndex = index + "_" + estype
    
-  ESUtils.createIndex(client, internalIndex,
-    ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
-    ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
+  ESUtils.createIndex(client, internalIndex)
   val mappingJson =
     (estype ->
-      ("_all" -> ("enabled" -> false)) ~
       ("properties" ->
         ("key" -> ("type" -> "keyword")) ~
         ("events" -> ("type" -> "keyword"))))
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
index cb17af8..26621cf 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -40,16 +40,12 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String)
     extends Apps with Logging {
   implicit val formats = DefaultFormats.lossless
   private val estype = "apps"
+  private val seq = new ESSequences(client, config, index)
   private val internalIndex = index + "_" + estype
 
-  private val seq = new ESSequences(client, config, internalIndex)
-
-  ESUtils.createIndex(client, internalIndex,
-    ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
-    ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
+  ESUtils.createIndex(client, internalIndex)
   val mappingJson =
     (estype ->
-      ("_all" -> ("enabled" -> false)) ~
       ("properties" ->
         ("id" -> ("type" -> "keyword")) ~
         ("name" -> ("type" -> "keyword"))))
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
index 63b108f..ac248de 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
@@ -39,15 +39,12 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index:
String)
     extends Channels with Logging {
   implicit val formats = DefaultFormats.lossless
   private val estype = "channels"
-  private val seq = new ESSequences(client, config, internalIndex)
+  private val seq = new ESSequences(client, config, index)
   private val internalIndex = index + "_" + estype
 
-  ESUtils.createIndex(client, internalIndex,
-    ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
-    ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
+  ESUtils.createIndex(client, internalIndex)
   val mappingJson =
     (estype ->
-      ("_all" -> ("enabled" -> false)) ~
       ("properties" ->
         ("name" -> ("type" -> "keyword"))))
   ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson)))
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
index 02f7b98..96f8a67 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
@@ -40,13 +40,11 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig,
index:
     extends EngineInstances with Logging {
   implicit val formats = DefaultFormats + new EngineInstanceSerializer
   private val estype = "engine_instances"
-  
-  ESUtils.createIndex(client, index,
-    ESUtils.getNumberOfShards(config, index.toUpperCase),
-    ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+  private val internalIndex = index + "_" + estype
+
+  ESUtils.createIndex(client, internalIndex)
   val mappingJson =
     (estype ->
-      ("_all" -> ("enabled" -> false)) ~
       ("properties" ->
         ("status" -> ("type" -> "keyword")) ~
         ("startTime" -> ("type" -> "date")) ~
@@ -61,7 +59,7 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig,
index:
         ("algorithmsParams" -> ("type" -> "keyword")) ~
         ("servingParams" -> ("type" -> "keyword"))
         ))
-  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+  ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson)))
 
   def insert(i: EngineInstance): String = {
     val id = i.id match {
@@ -86,7 +84,7 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig,
index:
       val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON)
       val response = client.performRequest(
         "POST",
-        s"/$index/$estype/",
+        s"/$internalIndex/$estype/",
         Map("refresh" -> "true").asJava,
         entity)
       val jsonResponse = parse(EntityUtils.toString(response.getEntity))
@@ -95,12 +93,12 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig,
index:
         case "created" =>
           Some((jsonResponse \ "_id").extract[String])
         case _ =>
-          error(s"[$result] Failed to create $index/$estype")
+          error(s"[$result] Failed to create $internalIndex/$estype")
           None
       }
     } catch {
       case e: IOException =>
-        error(s"Failed to create $index/$estype", e)
+        error(s"Failed to create $internalIndex/$estype", e)
         None
     }
   }
@@ -109,7 +107,7 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig,
index:
     try {
       val response = client.performRequest(
         "GET",
-        s"/$index/$estype/$id",
+        s"/$internalIndex/$estype/$id",
         Map.empty[String, String].asJava)
       val jsonResponse = parse(EntityUtils.toString(response.getEntity))
       (jsonResponse \ "found").extract[Boolean] match {
@@ -123,11 +121,11 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig,
index:
         e.getResponse.getStatusLine.getStatusCode match {
           case 404 => None
           case _ =>
-            error(s"Failed to access to /$index/$estype/$id", e)
+            error(s"Failed to access to /$internalIndex/$estype/$id", e)
             None
         }
       case e: IOException =>
-        error(s"Failed to access to /$index/$estype/$id", e)
+        error(s"Failed to access to /$internalIndex/$estype/$id", e)
         None
     }
   }
@@ -137,10 +135,10 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig,
index:
       val json =
         ("query" ->
           ("match_all" -> List.empty))
-      ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
+      ESUtils.getAll[EngineInstance](client, internalIndex, estype, compact(render(json)))
     } catch {
       case e: IOException =>
-        error("Failed to access to /$index/$estype/_search", e)
+        error(s"Failed to access to /$internalIndex/$estype/_search", e)
         Nil
     }
   }
@@ -165,10 +163,10 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig,
index:
               ("sort" -> List(
                 ("startTime" ->
                   ("order" -> "desc"))))
-      ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
+      ESUtils.getAll[EngineInstance](client, internalIndex, estype, compact(render(json)))
     } catch {
       case e: IOException =>
-        error(s"Failed to access to /$index/$estype/_search", e)
+        error(s"Failed to access to /$internalIndex/$estype/_search", e)
         Nil
     }
   }
@@ -188,7 +186,7 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig,
index:
       val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
       val response = client.performRequest(
         "POST",
-        s"/$index/$estype/$id",
+        s"/$internalIndex/$estype/$id",
         Map("refresh" -> "true").asJava,
         entity)
       val jsonResponse = parse(EntityUtils.toString(response.getEntity))
@@ -197,11 +195,11 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig,
index:
         case "created" =>
         case "updated" =>
         case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
+          error(s"[$result] Failed to update $internalIndex/$estype/$id")
       }
     } catch {
       case e: IOException =>
-        error(s"Failed to update $index/$estype/$id", e)
+        error(s"Failed to update $internalIndex/$estype/$id", e)
     }
   }
 
@@ -209,18 +207,18 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig,
index:
     try {
       val response = client.performRequest(
         "DELETE",
-        s"/$index/$estype/$id",
+        s"/$internalIndex/$estype/$id",
         Map("refresh" -> "true").asJava)
       val json = parse(EntityUtils.toString(response.getEntity))
       val result = (json \ "result").extract[String]
       result match {
         case "deleted" =>
         case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
+          error(s"[$result] Failed to update $internalIndex/$estype/$id")
       }
     } catch {
       case e: IOException =>
-        error(s"Failed to update $index/$estype/$id", e)
+        error(s"Failed to update $internalIndex/$estype/$id", e)
     }
   }
 }
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
index 03b851d..0025950 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
@@ -28,7 +28,6 @@ import org.apache.predictionio.data.storage.EvaluationInstance
 import org.apache.predictionio.data.storage.EvaluationInstanceSerializer
 import org.apache.predictionio.data.storage.EvaluationInstances
 import org.apache.predictionio.data.storage.StorageClientConfig
-import org.apache.predictionio.data.storage.StorageClientException
 import org.elasticsearch.client.{ResponseException, RestClient}
 import org.json4s._
 import org.json4s.JsonDSL._
@@ -41,15 +40,12 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig,
ind
     extends EvaluationInstances with Logging {
   implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
   private val estype = "evaluation_instances"
-  private val seq = new ESSequences(client, config, internalIndex)
+  private val seq = new ESSequences(client, config, index)
   private val internalIndex = index + "_" + estype
 
-  ESUtils.createIndex(client, internalIndex,
-    ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
-    ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
+  ESUtils.createIndex(client, internalIndex)
   val mappingJson =
     (estype ->
-      ("_all" -> ("enabled" -> false)) ~
       ("properties" ->
         ("status" -> ("type" -> "keyword")) ~
         ("startTime" -> ("type" -> "date")) ~
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
index f275ec9..708d3d3 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -53,12 +53,9 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val
baseInd
   override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
     val estype = getEsType(appId, channelId)
     val index = baseIndex + "_" + estype
-    ESUtils.createIndex(client, index,
-      ESUtils.getNumberOfShards(config, index.toUpperCase),
-      ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+    ESUtils.createIndex(client, index)
     val json =
       (estype ->
-        ("_all" -> ("enabled" -> false)) ~
         ("properties" ->
           ("name" -> ("type" -> "keyword")) ~
           ("eventId" -> ("type" -> "keyword")) ~
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
index d43ecc6..ade0f40 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
@@ -21,7 +21,6 @@ import java.io.IOException
 
 import scala.collection.JavaConverters._
 
-import org.apache.http.Header
 import org.apache.http.entity.ContentType
 import org.apache.http.nio.entity.NStringEntity
 import org.apache.http.util.EntityUtils
@@ -40,12 +39,9 @@ class ESSequences(client: RestClient, config: StorageClientConfig, index:
String
   private val estype = "sequences"
   private val internalIndex = index + "_" + estype
 
-  ESUtils.createIndex(client, internalIndex,
-    ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
-    ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
+  ESUtils.createIndex(client, internalIndex)
   val mappingJson =
     (estype ->
-      ("_all" -> ("enabled" -> false)) ~
       ("properties" ->
         ("n" -> ("enabled" -> false))))
   ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson)))
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
index cd9aa53..93d5d94 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
@@ -21,7 +21,6 @@ import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
 import org.apache.http.entity.ContentType
-import org.apache.http.entity.StringEntity
 import org.apache.http.nio.entity.NStringEntity
 import org.elasticsearch.client.RestClient
 import org.json4s._
@@ -165,23 +164,16 @@ object ESUtils {
 
   def createIndex(
     client: RestClient,
-    index: String,
-    numberOfShards: Option[Int],
-    numberOfReplicas: Option[Int]): Unit = {
+    index: String): Unit = {
     client.performRequest(
       "HEAD",
       s"/$index",
       Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
         case 404 =>
-          val json = ("settings" ->
-            ("number_of_shards" -> numberOfShards) ~
-            ("number_of_replicas" -> numberOfReplicas))
-          val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
           client.performRequest(
             "PUT",
             s"/$index",
-            Map.empty[String, String].asJava,
-            entity)
+            Map.empty[String, String].asJava)
         case 200 =>
         case _ =>
           throw new IllegalStateException(s"/$index is invalid.")
@@ -269,14 +261,6 @@ object ESUtils {
     (hosts, ports, schemes).zipped.map((h, p, s) => new HttpHost(h, p, s))
   }
 
-  def getNumberOfShards(config: StorageClientConfig, index: String): Option[Int] = {
-    config.properties.get(s"${index}_NUM_OF_SHARDS").map(_.toInt)
-  }
-
-  def getNumberOfReplicas(config: StorageClientConfig, index: String): Option[Int] = {
-    config.properties.get(s"${index}_NUM_OF_REPLICAS").map(_.toInt)
-  }
-
   def getEventDataRefresh(config: StorageClientConfig): String = {
     config.properties.getOrElse("EVENTDATA_REFRESH", "true")
   }


Mime
View raw message