openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [incubator-openwhisk] Diff for: [GitHub] dubee closed pull request #3977: Elasticsearch Activation Store
Date Mon, 14 Jan 2019 17:25:21 GMT
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 5f7a8db5a0..56ec590963 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -231,8 +231,11 @@ object ConfigKeys {
   val transactions = "whisk.transactions"
 
   val logStore = "whisk.logstore"
+  val activationStore = "whisk.activationstore"
+
   val splunk = s"$logStore.splunk"
   val logStoreElasticSearch = s"$logStore.elasticsearch"
+  val elasticSearchActivationStore = s"$activationStore.elasticsearch"
 
   val mesos = "whisk.mesos"
 
diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactElasticSearchActivationStore.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactElasticSearchActivationStore.scala
new file mode 100644
index 0000000000..f6a93227b1
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/ArtifactElasticSearchActivationStore.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database
+
+import java.time.Instant
+import java.nio.file.{Files, Path, Paths}
+import java.nio.file.attribute.PosixFilePermission.{
+  GROUP_READ,
+  GROUP_WRITE,
+  OTHERS_READ,
+  OTHERS_WRITE,
+  OWNER_READ,
+  OWNER_WRITE
+}
+import java.util.EnumSet
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model._
+import akka.stream.alpakka.file.scaladsl.LogRotatorSink
+import akka.stream.scaladsl.{Flow, MergeHub, RestartSink, Sink, Source}
+import akka.stream._
+import akka.util.ByteString
+import pureconfig.loadConfigOrThrow
+import spray.json._
+import whisk.common.{Logging, TransactionId}
+import whisk.core.ConfigKeys
+import whisk.core.containerpool.logging.ElasticSearchJsonProtocol._
+import whisk.core.entity._
+import whisk.core.entity.size._
+
+import scala.concurrent.{Future, Promise}
+import scala.concurrent.duration._
+import scala.util.Try
+
+class ArtifactElasticSearchActivationStore(
+  override val system: ActorSystem,
+  actorMaterializer: ActorMaterializer,
+  logging: Logging,
+  override val httpFlow: Option[
+    Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
+  override val elasticSearchConfig: ElasticSearchActivationStoreConfig =
+    loadConfigOrThrow[ElasticSearchActivationStoreConfig](ConfigKeys.elasticSearchActivationStore))
+    extends ArtifactActivationStore(system, actorMaterializer, logging)
+    with ElasticSearchActivationRestClient {
+
+  implicit val m = actorMaterializer
+
+  val destinationDirectory: Path = Paths.get("logs")
+  val bufferSize = 100.MB
+  val perms = EnumSet.of(OWNER_READ, OWNER_WRITE, GROUP_READ, GROUP_WRITE, OTHERS_READ, OTHERS_WRITE)
+  protected val writeToFile: Sink[ByteString, _] = MergeHub
+    .source[ByteString]
+    .batchWeighted(bufferSize.toBytes, _.length, identity)(_ ++ _)
+    .to(RestartSink.withBackoff(minBackoff = 1.seconds, maxBackoff = 60.seconds, randomFactor = 0.2) { () =>
+      LogRotatorSink(() => {
+        val maxSize = bufferSize.toBytes
+        var bytesRead = maxSize
+        element =>
+          {
+            val size = element.size
+            if (bytesRead + size > maxSize) {
+              bytesRead = size
+              val logFilePath = destinationDirectory.resolve(s"userlogs-${Instant.now.toEpochMilli}.log")
+              logging.info(this, s"Rotating log file to '$logFilePath'")
+              try {
+                Files.createFile(logFilePath)
+                Files.setPosixFilePermissions(logFilePath, perms)
+              } catch {
+                case t: Throwable =>
+                  logging.error(this, s"Couldn't create activation record file '$t'")
+                  throw t
+              }
+              Some(logFilePath)
+            } else {
+              bytesRead += size
+              None
+            }
+          }
+      })
+    })
+    .run()
+
+  protected def extractRequiredHeaders(headers: Seq[HttpHeader]) =
+    headers.filter(h => elasticSearchConfig.requiredHeaders.contains(h.lowercaseName)).toList
+
+  protected def writeActivation(activation: WhiskActivation, context: UserContext) = {
+    val userIdField = Map("namespaceId" -> context.user.namespace.uuid.toJson)
+    val namespace = Map("namespace" -> activation.namespace.namespace.toJson)
+    val name = Map("name" -> activation.name.toJson)
+    val subject = Map("subject" -> activation.subject.toJson)
+    val activationId = Map("activationId" -> activation.activationId.toJson)
+    val start = Map("start" -> activation.start.toEpochMilli.toJson)
+    val end = Map("end" -> activation.end.toEpochMilli.toJson)
+    val cause = Map("cause" -> activation.cause.toJson)
+    val result = Map("result" -> activation.response.result.get.compactPrint.toJson)
+    val statusCode = Map("statusCode" -> activation.response.statusCode.toJson)
+    val logs = Map("logs" -> activation.logs.toJson)
+    val version = Map("version" -> activation.version.toJson)
+    val annotations = activation.annotations.toJsObject.fields
+    val duration = activation.duration.map(d => Map("duration" -> d.toJson)) getOrElse Map.empty
+    val augmentedActivation = JsObject(
+      userIdField ++ namespace ++ name ++ subject ++ activationId ++ start ++ end ++ cause ++ result ++ statusCode ++ logs ++ version ++ annotations ++ duration)
+    val line = ByteString(augmentedActivation.compactPrint + "\n")
+
+    Source.single(line).runWith(Flow[ByteString].to(writeToFile))
+  }
+
+  override def store(activation: WhiskActivation, context: UserContext)(
+    implicit transid: TransactionId,
+    notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
+    writeActivation(activation, context)
+    super.store(activation, context)
+  }
+
+  override def get(activationId: ActivationId, context: UserContext)(
+    implicit transid: TransactionId): Future[WhiskActivation] = {
+    val headers = extractRequiredHeaders(context.request.headers)
+
+    // Return result from ElasticSearch or from artifact store if required headers are not present
+    if (headers.length == elasticSearchConfig.requiredHeaders.length) {
+      val uuid = elasticSearchConfig.path.format(context.user.namespace.uuid.asString)
+      val id = activationId.asString.substring(activationId.asString.indexOf("/") + 1)
+
+      getActivation(id, uuid, headers).map(_.toActivation())
+    } else {
+      super.get(activationId, context)
+    }
+  }
+
+  override def countActivationsInNamespace(namespace: EntityPath,
+                                           name: Option[EntityPath] = None,
+                                           skip: Int,
+                                           since: Option[Instant] = None,
+                                           upto: Option[Instant] = None,
+                                           context: UserContext)(implicit transid: TransactionId): Future[JsObject] = {
+    val uuid = elasticSearchConfig.path.format(context.user.namespace.uuid.asString)
+    val headers = extractRequiredHeaders(context.request.headers)
+
+    // Return result from ElasticSearch or from artifact store if required headers are not present
+    if (headers.length == elasticSearchConfig.requiredHeaders.length) {
+      count(uuid, name, namespace.asString, skip, since, upto, headers)
+    } else {
+      super.countActivationsInNamespace(namespace, name, skip, since, upto, context)
+    }
+  }
+
+  override def listActivationsMatchingName(
+    namespace: EntityPath,
+    name: EntityPath,
+    skip: Int,
+    limit: Int,
+    includeDocs: Boolean = false,
+    since: Option[Instant] = None,
+    upto: Option[Instant] = None,
+    context: UserContext)(implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = {
+    val uuid = elasticSearchConfig.path.format(context.user.namespace.uuid.asString)
+    val headers = extractRequiredHeaders(context.request.headers)
+
+    // Return result from ElasticSearch or from artifact store if required headers are not present
+    if (headers.length == elasticSearchConfig.requiredHeaders.length) {
+      listActivationMatching(uuid, name.toString, skip, limit, since, upto, headers).map { activationList =>
+        Right(activationList.map(activation => activation.toActivation()))
+      }
+    } else {
+      super.listActivationsMatchingName(namespace, name, skip, limit, includeDocs, since, upto, context)
+    }
+  }
+
+  override def listActivationsInNamespace(
+    namespace: EntityPath,
+    skip: Int,
+    limit: Int,
+    includeDocs: Boolean = false,
+    since: Option[Instant] = None,
+    upto: Option[Instant] = None,
+    context: UserContext)(implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = {
+    val uuid = elasticSearchConfig.path.format(context.user.namespace.uuid.asString)
+    val headers = extractRequiredHeaders(context.request.headers)
+
+    // Return result from ElasticSearch or from artifact store if required headers are not present
+    if (headers.length == elasticSearchConfig.requiredHeaders.length) {
+      listActivationsNamespace(uuid, namespace.asString, skip, limit, since, upto, headers).map { activationList =>
+        Right(activationList.map(activation => activation.toActivation()))
+      }
+    } else {
+      super.listActivationsInNamespace(namespace, skip, limit, includeDocs, since, upto, context)
+    }
+  }
+
+}
+
+object ArtifactElasticSearchActivationStoreProvider extends ActivationStoreProvider {
+  override def instance(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging) =
+    new ArtifactElasticSearchActivationStore(actorSystem, actorMaterializer, logging)
+}
diff --git a/common/scala/src/main/scala/whisk/core/database/ElasticSearchActivationRestClient.scala b/common/scala/src/main/scala/whisk/core/database/ElasticSearchActivationRestClient.scala
new file mode 100644
index 0000000000..c1a5567362
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/ElasticSearchActivationRestClient.scala
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model._
+import akka.stream.scaladsl.Flow
+
+import spray.json.{DefaultJsonProtocol, _}
+
+import whisk.common.TransactionId
+import whisk.core.containerpool.logging.ElasticSearchJsonProtocol._
+import whisk.core.containerpool.logging.{ElasticSearchRestClient, EsQuery, EsQueryString, EsSearchResult, _}
+import whisk.core.entity._
+
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.Try
+
+case class ElasticSearchActivationFieldConfig(name: String,
+                                              namespace: String,
+                                              subject: String,
+                                              version: String,
+                                              start: String,
+                                              end: String,
+                                              duration: String,
+                                              result: String,
+                                              statusCode: String,
+                                              activationId: String,
+                                              activationRecord: String,
+                                              stream: String)
+
+case class ElasticSearchActivationStoreConfig(protocol: String,
+                                              host: String,
+                                              port: Int,
+                                              path: String,
+                                              schema: ElasticSearchActivationFieldConfig,
+                                              requiredHeaders: Seq[String] = Seq.empty)
+
+trait ElasticSearchActivationRestClient {
+  implicit val executionContext: ExecutionContext
+  implicit val system: ActorSystem
+  val httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]]
+  val elasticSearchConfig: ElasticSearchActivationStoreConfig
+
+  protected val esActivationClient =
+    new ElasticSearchRestClient(
+      elasticSearchConfig.protocol,
+      elasticSearchConfig.host,
+      elasticSearchConfig.port,
+      httpFlow)
+
+  // Schema of resultant activations from ES
+  case class ActivationEntry(name: String,
+                             subject: String,
+                             activationId: String,
+                             version: String,
+                             start: Long,
+                             end: Long,
+                             result: String,
+                             statusCode: Int,
+                             duration: Option[Long] = None,
+                             namespace: String,
+                             kind: Option[String] = None,
+                             cause: Option[String] = None,
+                             causedBy: Option[String] = None,
+                             limits: Option[ActionLimits] = None,
+                             path: Option[String] = None,
+                             logs: ActivationLogs,
+                             waitTime: Option[Int] = None,
+                             initTime: Option[Int] = None) {
+
+    def toActivation() = {
+      val response = statusCode match {
+        case 0 => ActivationResponse.success(Some(result.parseJson.asJsObject))
+        case 1 => ActivationResponse.applicationError(result.parseJson.asJsObject.fields("error"))
+        case 2 => ActivationResponse.containerError(result.parseJson.asJsObject.fields("error"))
+        case 3 => ActivationResponse.whiskError(result.parseJson.asJsObject.fields("error"))
+      }
+      val causedByAnnotation = causedBy.map(value => Parameters("causedBy", value.toJson)).getOrElse(Parameters())
+      val memoryAnnotation = limits
+        .map { value =>
+          Parameters(
+            "limits",
+            JsObject(
+              "memory" -> value.memory.megabytes.toJson,
+              "timeout" -> value.timeout.toJson,
+              "logs" -> value.logs.toJson))
+        }
+        .getOrElse(Parameters())
+      val kindAnnotation = kind.map(value => Parameters("kind", value.toJson)).getOrElse(Parameters())
+      val pathAnnotation = path.map(value => Parameters("path", value.toJson)).getOrElse(Parameters())
+      val waitTimeAnnotation = waitTime.map(value => Parameters("waitTime", value.toJson)).getOrElse(Parameters())
+      val initTimeAnnotation = initTime.map(value => Parameters("initTime", value.toJson)).getOrElse(Parameters())
+
+      WhiskActivation(
+        EntityPath(namespace),
+        EntityName(name),
+        Subject(subject),
+        ActivationId(activationId),
+        Instant.ofEpochMilli(start),
+        Instant.ofEpochMilli(end),
+        response = response,
+        logs = logs,
+        duration = duration,
+        version = SemVer(version),
+        annotations = kindAnnotation ++ causedByAnnotation ++ memoryAnnotation ++ pathAnnotation ++ waitTimeAnnotation ++ initTimeAnnotation,
+        cause = cause.map(value => Some(ActivationId(value))).getOrElse(None))
+    }
+  }
+
+  object ActivationEntry extends DefaultJsonProtocol {
+    implicit val serdes =
+      jsonFormat(
+        ActivationEntry.apply,
+        elasticSearchConfig.schema.name,
+        elasticSearchConfig.schema.subject,
+        elasticSearchConfig.schema.activationId,
+        elasticSearchConfig.schema.version,
+        elasticSearchConfig.schema.start,
+        elasticSearchConfig.schema.end,
+        "result",
+        "statusCode",
+        elasticSearchConfig.schema.duration,
+        elasticSearchConfig.schema.namespace,
+        "kind",
+        "cause",
+        "causedBy",
+        "limits",
+        "path",
+        "logs",
+        "waitTime",
+        "initTime")
+  }
+
+  protected def transcribeActivations(queryResult: EsSearchResult): List[ActivationEntry] =
+    queryResult.hits.hits.map(_.source.convertTo[ActivationEntry]).toList
+
+  protected def getRanges(since: Option[Instant] = None, upto: Option[Instant] = None) = {
+    val sinceRange: Option[EsQueryRange] = since.map { time =>
+      Some(EsQueryRange(elasticSearchConfig.schema.start, EsRangeGt, time.toEpochMilli.toString))
+    } getOrElse None
+    val uptoRange: Option[EsQueryRange] = upto.map { time =>
+      Some(EsQueryRange(elasticSearchConfig.schema.start, EsRangeLt, time.toEpochMilli.toString))
+    } getOrElse None
+
+    Vector(sinceRange, uptoRange).flatten
+  }
+
+  protected def generateGetPayload(activationId: String) = {
+    val query =
+      s"type: ${elasticSearchConfig.schema.activationRecord} AND ${elasticSearchConfig.schema.activationId}: $activationId"
+
+    EsQuery(EsQueryString(query))
+  }
+
+  protected def generateCountActivationsInNamespacePayload(name: Option[EntityPath] = None,
+                                                           skip: Int,
+                                                           since: Option[Instant] = None,
+                                                           upto: Option[Instant] = None) = {
+    val queryRanges = getRanges(since, upto)
+    val activationMatch = Some(EsQueryBoolMatch("type", elasticSearchConfig.schema.activationRecord))
+    val entityMatch: Option[EsQueryBoolMatch] = name.map { n =>
+      Some(EsQueryBoolMatch(elasticSearchConfig.schema.name, n.asString))
+    } getOrElse None
+    val queryTerms = Vector(activationMatch, entityMatch).flatten
+    val queryMust = EsQueryMust(queryTerms, queryRanges)
+    val queryOrder = EsQueryOrder(elasticSearchConfig.schema.start, EsOrderDesc)
+
+    EsQuery(queryMust, Some(queryOrder), from = skip)
+  }
+
+  protected def generateListActiationsMatchNamePayload(name: String,
+                                                       skip: Int,
+                                                       limit: Int,
+                                                       since: Option[Instant] = None,
+                                                       upto: Option[Instant] = None) = {
+    val queryRanges = getRanges(since, upto)
+    val queryTerms = Vector(
+      EsQueryBoolMatch("type", elasticSearchConfig.schema.activationRecord),
+      EsQueryBoolMatch(elasticSearchConfig.schema.name, name))
+    val queryMust = EsQueryMust(queryTerms, queryRanges)
+    val queryOrder = EsQueryOrder(elasticSearchConfig.schema.start, EsOrderDesc)
+
+    EsQuery(queryMust, Some(queryOrder), Some(limit), from = skip)
+  }
+
+  protected def generateListActivationsInNamespacePayload(namespace: String,
+                                                          skip: Int,
+                                                          limit: Int,
+                                                          since: Option[Instant] = None,
+                                                          upto: Option[Instant] = None) = {
+    val queryRanges = getRanges(since, upto)
+    val queryTerms = Vector(
+      EsQueryBoolMatch("type", elasticSearchConfig.schema.activationRecord),
+      EsQueryBoolMatch(elasticSearchConfig.schema.subject, namespace))
+    val queryMust = EsQueryMust(queryTerms, queryRanges)
+    val queryOrder = EsQueryOrder(elasticSearchConfig.schema.start, EsOrderDesc)
+
+    EsQuery(queryMust, Some(queryOrder), Some(limit), from = skip)
+  }
+
+  def getActivation(activationId: String, uuid: String, headers: List[HttpHeader] = List.empty)(
+    implicit transid: TransactionId): Future[ActivationEntry] = {
+    val payload = generateGetPayload(activationId)
+
+    esActivationClient.search[EsSearchResult](uuid, payload, headers).flatMap {
+      case Right(queryResult) =>
+        val res = transcribeActivations(queryResult)
+
+        if (res.nonEmpty) {
+          Future.successful(res.head)
+        } else {
+          Future.failed(new NoDocumentException("Document not found"))
+        }
+
+      case Left(code) =>
+        Future.failed(new RuntimeException(s"Status code '$code' was returned from activation store"))
+    }
+  }
+
+  def count(uuid: String,
+            name: Option[EntityPath] = None,
+            namespace: String,
+            skip: Int,
+            since: Option[Instant] = None,
+            upto: Option[Instant] = None,
+            headers: List[HttpHeader] = List.empty)(implicit transid: TransactionId): Future[JsObject] = {
+    val payload = generateCountActivationsInNamespacePayload(name, skip, since, upto)
+
+    esActivationClient.search[EsSearchResult](uuid, payload, headers).flatMap {
+      case Right(queryResult) =>
+        val total = Math.max(0, queryResult.hits.total - skip)
+        Future.successful(JsObject("activations" -> total.toJson))
+      case Left(code) =>
+        Future.failed(new RuntimeException(s"Status code '$code' was returned from activation store"))
+    }
+  }
+
+  def listActivationMatching(
+    uuid: String,
+    name: String,
+    skip: Int,
+    limit: Int,
+    since: Option[Instant] = None,
+    upto: Option[Instant] = None,
+    headers: List[HttpHeader] = List.empty)(implicit transid: TransactionId): Future[List[ActivationEntry]] = {
+    val payload = generateListActiationsMatchNamePayload(name, skip, limit, since, upto)
+
+    esActivationClient.search[EsSearchResult](uuid, payload, headers).flatMap {
+      case Right(queryResult) =>
+        Future.successful(transcribeActivations(queryResult))
+      case Left(code) =>
+        Future.failed(new RuntimeException(s"Status code '$code' was returned from activation store"))
+    }
+  }
+
+  def listActivationsNamespace(
+    uuid: String,
+    namespace: String,
+    skip: Int,
+    limit: Int,
+    since: Option[Instant] = None,
+    upto: Option[Instant] = None,
+    headers: List[HttpHeader] = List.empty)(implicit transid: TransactionId): Future[List[ActivationEntry]] = {
+    val payload = generateListActivationsInNamespacePayload(namespace, skip, limit, since, upto)
+
+    esActivationClient.search[EsSearchResult](uuid, payload, headers).flatMap {
+      case Right(queryResult) =>
+        Future.successful(transcribeActivations(queryResult))
+      case Left(code) =>
+        Future.failed(new RuntimeException(s"Status code '$code' was returned from activation store"))
+    }
+  }
+
+}
diff --git a/common/scala/src/main/scala/whisk/core/entity/SemVer.scala b/common/scala/src/main/scala/whisk/core/entity/SemVer.scala
index 6038bd012c..287ef76ab1 100644
--- a/common/scala/src/main/scala/whisk/core/entity/SemVer.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/SemVer.scala
@@ -78,7 +78,7 @@ protected[core] object SemVer {
    * @return SemVer instance
    * @thrown IllegalArgumentException if string is not a valid semantic version
    */
-  protected[entity] def apply(str: String): SemVer = {
+  protected[core] def apply(str: String): SemVer = {
     try {
       val parts = if (str != null && str.nonEmpty) str.split('.') else Array[String]()
       val major = if (parts.size >= 1) parts(0).toInt else 0
diff --git a/tests/src/test/scala/system/basic/WskRestBasicTests.scala b/tests/src/test/scala/system/basic/WskRestBasicTests.scala
index 01fed1a44e..fdb5f02dae 100644
--- a/tests/src/test/scala/system/basic/WskRestBasicTests.scala
+++ b/tests/src/test/scala/system/basic/WskRestBasicTests.scala
@@ -1036,7 +1036,7 @@ class WskRestBasicTests extends TestHelpers with WskTestHelpers with WskActorSys
       result.getFieldJsValue("start").toString should not be JsObject.empty.toString
       result.getFieldJsValue("end").toString shouldBe JsObject.empty.toString
       result.getFieldJsValue("duration").toString shouldBe JsObject.empty.toString
-      result.getFieldListJsObject("annotations").length shouldBe 0
+      result.getFieldListJsObject("annotations").length shouldBe 1
     }
   }
 
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchActivationStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchActivationStoreTests.scala
new file mode 100644
index 0000000000..f71d3d8f9e
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchActivationStoreTests.scala
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import java.time.Instant
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpMethods.{GET, POST}
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.headers.Accept
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.Flow
+import akka.testkit.TestKit
+import common.StreamLogging
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpecLike, Matchers}
+import pureconfig.error.ConfigReaderException
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+import whisk.core.entity._
+import whisk.core.database.{
+  ArtifactElasticSearchActivationStore,
+  ElasticSearchActivationFieldConfig,
+  ElasticSearchActivationStoreConfig,
+  NoDocumentException,
+  UserContext
+}
+
+import whisk.common.TransactionId
+import whisk.core.entity.size.SizeInt
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future, Promise}
+import scala.util.{Success, Try}
+
+/*
+TODO:
+Required headers
+ */
+@RunWith(classOf[JUnitRunner])
+class ElasticSearchActivationStoreTests
+    extends TestKit(ActorSystem("ElasticSearchActivationStore"))
+    with FlatSpecLike
+    with Matchers
+    with ScalaFutures
+    with StreamLogging {
+
+  val materializer = ActorMaterializer()
+
+  implicit val transid: TransactionId = TransactionId.testing
+
+  private val uuid = UUID()
+  private val subject = Subject()
+  private val user =
+    Identity(subject, Namespace(EntityName("testSpace"), uuid), BasicAuthenticationAuthKey(uuid, Secret()), Set())
+  private val activationId = ActivationId.generate()
+  private val namespace = EntityPath("namespace")
+  private val name = EntityName("name")
+  private val response = JsObject("result key" -> JsString("result value"))
+  private val start = Instant.now
+  private val end = Instant.now
+  private val since = Instant.now
+  private val upto = Instant.now
+  private val logs =
+    Vector("2018-03-05T02:10:38.196689522Z stdout: some log stuff", "2018-03-05T02:10:38.196754258Z stdout: more logs")
+  private val expectedLogs = ActivationLogs(logs)
+  private val activation = WhiskActivation(
+    namespace = namespace,
+    name = name,
+    subject,
+    activationId = activationId,
+    start = start,
+    end = end,
+    response = ActivationResponse.success(Some(response)),
+    logs = expectedLogs,
+    duration = Some(101L),
+    annotations = Parameters("kind", "nodejs:6") ++ Parameters(
+      "limits",
+      ActionLimits(TimeLimit(60.second), MemoryLimit(256.MB), LogLimit(10.MB)).toJson) ++
+      Parameters("waitTime", 16.toJson) ++
+      Parameters("initTime", 44.toJson))
+
+  // Elasticsearch configuration
+  private val defaultSchema =
+    ElasticSearchActivationFieldConfig(
+      "name",
+      "namespace",
+      "subject",
+      "version",
+      "start",
+      "end",
+      "duration",
+      "result",
+      "statusCode",
+      "activationId",
+      "activation_record",
+      "stream")
+  private val defaultConfig =
+    ElasticSearchActivationStoreConfig("https", "host", 443, "/whisk_user_logs/_search", defaultSchema)
+
+  // Elasticsearch query responses
+  private val defaultQueryResponse =
+    JsObject(
+      "took" -> 4.toJson,
+      "timed_out" -> false.toJson,
+      "_shards" -> JsObject("total" -> 5.toJson, "successful" -> 5.toJson, "failed" -> 0.toJson),
+      "hits" -> JsObject(
+        "total" -> 2.toJson,
+        "max_score" -> 3.74084.toJson,
+        "hits" -> JsArray(
+          JsObject(
+            "_index" -> "whisk_user_logs".toJson,
+            "_type" -> defaultConfig.schema.activationRecord.toJson,
+            "_id" -> "AWUQoCrVV6WHiq7A5LL8".toJson,
+            "_score" -> 3.74084.toJson,
+            "_source" -> JsObject(
+              defaultConfig.schema.statusCode -> 0.toJson,
+              defaultConfig.schema.duration -> 101.toJson,
+              defaultConfig.schema.name -> name.toJson,
+              defaultConfig.schema.subject -> subject.toJson,
+              "waitTime" -> 16.toJson,
+              defaultConfig.schema.activationId -> activationId.toJson,
+              defaultConfig.schema.result -> response.compactPrint.toJson,
+              defaultConfig.schema.version -> "0.0.1".toJson,
+              "cause" -> JsNull,
+              defaultConfig.schema.end -> end.toEpochMilli.toJson,
+              "kind" -> "nodejs:6".toJson,
+              "logs" -> logs.toJson,
+              defaultConfig.schema.start -> start.toEpochMilli.toJson,
+              "limits" -> JsObject("timeout" -> 60000.toJson, "memory" -> 256.toJson, "logs" -> 10.toJson),
+              "initTime" -> 44.toJson,
+              defaultConfig.schema.namespace -> namespace.toJson,
+              "@version" -> "1".toJson,
+              "type" -> defaultConfig.schema.activationRecord.toJson,
+              "ALCH_TENANT_ID" -> "9cfe57a0-7ac1-4bf4-9026-d7e9e591271f".toJson // UUID
+            )),
+          JsObject(
+            "_index" -> "whisk_user_logs".toJson,
+            "_type" -> defaultConfig.schema.activationRecord.toJson,
+            "_id" -> "AWUQoCrVV6WHiq7A5LL8".toJson,
+            "_score" -> 3.74084.toJson,
+            "_source" -> JsObject(
+              defaultConfig.schema.statusCode -> 0.toJson,
+              defaultConfig.schema.duration -> 101.toJson,
+              defaultConfig.schema.name -> name.toJson,
+              defaultConfig.schema.subject -> subject.toJson,
+              "waitTime" -> 16.toJson,
+              defaultConfig.schema.activationId -> activationId.toJson,
+              defaultConfig.schema.result -> response.compactPrint.toJson,
+              defaultConfig.schema.version -> "0.0.1".toJson,
+              "cause" -> JsNull,
+              defaultConfig.schema.end -> end.toEpochMilli.toJson,
+              "kind" -> "nodejs:6".toJson,
+              "logs" -> logs.toJson,
+              defaultConfig.schema.start -> start.toEpochMilli.toJson,
+              "limits" -> JsObject("timeout" -> 60000.toJson, "memory" -> 256.toJson, "logs" -> 10.toJson),
+              "initTime" -> 44.toJson,
+              defaultConfig.schema.namespace -> namespace.toJson,
+              "@version" -> "1".toJson,
+              "type" -> defaultConfig.schema.activationRecord.toJson,
+              "ALCH_TENANT_ID" -> "9cfe57a0-7ac1-4bf4-9026-d7e9e591271f".toJson // UUID
+            )))))
+
+  // Elasticsearch query requests
+  private val defaultPayload = JsObject(
+    "query" -> JsObject(
+      "query_string" -> JsObject("query" -> JsString(
+        s"_type: ${defaultConfig.schema.activationRecord} AND ${defaultConfig.schema.activationId}: $activationId"))),
+    "from" -> JsNumber(0)).compactPrint
+  private val defaultGetPayload = JsObject(
+    "query" -> JsObject(
+      "query_string" -> JsObject("query" -> JsString(
+        s"_type: ${defaultConfig.schema.activationRecord} AND ${defaultConfig.schema.activationId}: $activationId"))),
+    "from" -> JsNumber(0)).compactPrint
+  private val defaultCountPayload = JsObject(
+    "query" -> JsObject(
+      "bool" -> JsObject(
+        "must" -> JsArray(
+          JsObject("match" -> JsObject("_type" -> JsString(defaultConfig.schema.activationRecord))),
+          JsObject("match" -> JsObject(defaultConfig.schema.name -> JsString(name.name)))),
+        "filter" -> JsArray(
+          JsObject(
+            "range" -> JsObject(defaultConfig.schema.start -> JsObject("gt" -> JsString(since.toEpochMilli.toString)))),
+          JsObject("range" -> JsObject(
+            defaultConfig.schema.start -> JsObject("lt" -> JsString(upto.toEpochMilli.toString))))))),
+    "sort" -> JsArray(JsObject(defaultConfig.schema.start -> JsObject("order" -> JsString("desc")))),
+    "from" -> JsNumber(1)).compactPrint
+  private val defaultListEntityPayload = JsObject(
+    "query" -> JsObject(
+      "bool" -> JsObject(
+        "must" -> JsArray(
+          JsObject("match" -> JsObject("_type" -> JsString(defaultConfig.schema.activationRecord))),
+          JsObject("match" -> JsObject(defaultConfig.schema.name -> JsString(name.name)))),
+        "filter" -> JsArray(
+          JsObject(
+            "range" -> JsObject(defaultConfig.schema.start -> JsObject("gt" -> JsString(since.toEpochMilli.toString)))),
+          JsObject("range" -> JsObject(
+            defaultConfig.schema.start -> JsObject("lt" -> JsString(upto.toEpochMilli.toString))))))),
+    "sort" -> JsArray(JsObject(defaultConfig.schema.start -> JsObject("order" -> JsString("desc")))),
+    "size" -> JsNumber(2),
+    "from" -> JsNumber(1)).compactPrint
+  private val defaultListPayload = JsObject(
+    "query" -> JsObject(
+      "bool" -> JsObject(
+        "must" -> JsArray(
+          JsObject("match" -> JsObject("_type" -> JsString(defaultConfig.schema.activationRecord))),
+          JsObject("match" -> JsObject(defaultConfig.schema.subject -> JsString(user.namespace.name.asString)))),
+        "filter" -> JsArray(
+          JsObject(
+            "range" -> JsObject(defaultConfig.schema.start -> JsObject("gt" -> JsString(since.toEpochMilli.toString)))),
+          JsObject("range" -> JsObject(
+            defaultConfig.schema.start -> JsObject("lt" -> JsString(upto.toEpochMilli.toString))))))),
+    "sort" -> JsArray(JsObject(defaultConfig.schema.start -> JsObject("order" -> JsString("desc")))),
+    "size" -> JsNumber(2),
+    "from" -> JsNumber(1)).compactPrint
+
+  // Elasticsearch HTTP responses
+  private val defaultHttpResponse = HttpResponse(
+    StatusCodes.OK,
+    entity = HttpEntity(ContentTypes.`application/json`, defaultQueryResponse.compactPrint))
+  private val emptyHttpResponse = HttpResponse(
+    StatusCodes.OK,
+    entity = HttpEntity(
+      ContentTypes.`application/json`,
+      s"""{"took":2,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":0,"max_score":null,"hits":[]}}"""))
+
+  // Elasticsearch HTTP requests
+  private val defaultHttpRequest = HttpRequest(
+    POST,
+    Uri(s"/whisk_user_logs/_search"),
+    List(Accept(MediaTypes.`application/json`)),
+    HttpEntity(ContentTypes.`application/json`, defaultPayload))
+  private val defaultLogStoreHttpRequest =
+    HttpRequest(method = GET, uri = "https://some.url", entity = HttpEntity.Empty)
+
+  private def testFlow(httpResponse: HttpResponse = HttpResponse(), httpRequest: HttpRequest = HttpRequest())
+    : Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] =
+    Flow[(HttpRequest, Promise[HttpResponse])]
+      .mapAsyncUnordered(1) {
+        case (request, userContext) =>
+          //println(request)
+          //println(httpRequest)
+          request shouldBe httpRequest
+          Future.successful((Success(httpResponse), userContext))
+      }
+
+  private def await[T](awaitable: Future[T], timeout: FiniteDuration = 10.seconds) = Await.result(awaitable, timeout)
+
+  behavior of "ElasticSearch Activation Store"
+
+  it should "fail to connect to invalid host" in {
+    val esActivationStore =
+      new ArtifactElasticSearchActivationStore(system, materializer, logging, elasticSearchConfig = defaultConfig)
+
+    a[Throwable] should be thrownBy await(
+      esActivationStore.get(activation.activationId, UserContext(user, defaultLogStoreHttpRequest)))
+  }
+
+  it should "get an activation" in {
+    val httpRequest = HttpRequest(
+      POST,
+      Uri(s"/whisk_user_logs/_search"),
+      List(Accept(MediaTypes.`application/json`)),
+      HttpEntity(ContentTypes.`application/json`, defaultGetPayload))
+    val esActivationStore =
+      new ArtifactElasticSearchActivationStore(
+        system,
+        materializer,
+        logging,
+        Some(testFlow(defaultHttpResponse, httpRequest)),
+        elasticSearchConfig = defaultConfig)
+
+    await(esActivationStore.get(activationId, UserContext(user, defaultLogStoreHttpRequest))) shouldBe activation
+  }
+
+  it should "get an activation with error response" in {
+    val errorMessage = "message".toJson
+    val errorResponse = JsObject("error" -> errorMessage)
+    val activationResponses = Seq(
+      (1, ActivationResponse.applicationError(errorMessage)),
+      (2, ActivationResponse.containerError(errorMessage)),
+      (3, ActivationResponse.whiskError(errorMessage)))
+
+    activationResponses.foreach {
+      case (statusCode, activationResponse) =>
+        val content = JsObject(
+          "took" -> 4.toJson,
+          "timed_out" -> false.toJson,
+          "_shards" -> JsObject("total" -> 5.toJson, "successful" -> 5.toJson, "failed" -> 0.toJson),
+          "hits" -> JsObject(
+            "total" -> 1.toJson,
+            "max_score" -> 3.74084.toJson,
+            "hits" -> JsArray(JsObject(
+              "_index" -> "whisk_user_logs".toJson,
+              "_type" -> defaultConfig.schema.activationRecord.toJson,
+              "_id" -> "AWUQoCrVV6WHiq7A5LL8".toJson,
+              "_score" -> 3.74084.toJson,
+              "_source" -> JsObject(
+                defaultConfig.schema.statusCode -> statusCode.toJson,
+                defaultConfig.schema.duration -> 101.toJson,
+                defaultConfig.schema.name -> name.toJson,
+                defaultConfig.schema.subject -> subject.toJson,
+                "waitTime" -> 16.toJson,
+                defaultConfig.schema.activationId -> activationId.toJson,
+                defaultConfig.schema.result -> errorResponse.compactPrint.toJson,
+                defaultConfig.schema.version -> "0.0.1".toJson,
+                "cause" -> JsNull,
+                defaultConfig.schema.end -> end.toEpochMilli.toJson,
+                "kind" -> "nodejs:6".toJson,
+                "logs" -> logs.toJson,
+                defaultConfig.schema.start -> start.toEpochMilli.toJson,
+                "limits" -> JsObject("timeout" -> 60000.toJson, "memory" -> 256.toJson, "logs" -> 10.toJson),
+                "initTime" -> 44.toJson,
+                defaultConfig.schema.namespace -> namespace.toJson,
+                "@version" -> "1".toJson,
+                "type" -> defaultConfig.schema.activationRecord.toJson,
+                "ALCH_TENANT_ID" -> "9cfe57a0-7ac1-4bf4-9026-d7e9e591271f".toJson // UUID
+              )))))
+        val activationWithError = WhiskActivation(
+          namespace = namespace,
+          name = name,
+          subject,
+          activationId = activationId,
+          start = start,
+          end = end,
+          response = activationResponse,
+          logs = expectedLogs,
+          duration = Some(101L),
+          annotations = Parameters("kind", "nodejs:6") ++ Parameters(
+            "limits",
+            ActionLimits(TimeLimit(60.second), MemoryLimit(256.MB), LogLimit(10.MB)).toJson) ++
+            Parameters("waitTime", 16.toJson) ++
+            Parameters("initTime", 44.toJson))
+        val defaultHttpErrorResponse =
+          HttpResponse(StatusCodes.OK, entity = HttpEntity(ContentTypes.`application/json`, content.compactPrint))
+        val httpRequest = HttpRequest(
+          POST,
+          Uri(s"/whisk_user_logs/_search"),
+          List(Accept(MediaTypes.`application/json`)),
+          HttpEntity(ContentTypes.`application/json`, defaultGetPayload))
+        val esActivationStore =
+          new ArtifactElasticSearchActivationStore(
+            system,
+            materializer,
+            logging,
+            Some(testFlow(defaultHttpErrorResponse, httpRequest)),
+            elasticSearchConfig = defaultConfig)
+
+        await(esActivationStore.get(activationId, UserContext(user, defaultLogStoreHttpRequest))) shouldBe activationWithError
+    }
+  }
+
+  it should "error when getting an activation that does not exist" in {
+    val httpRequest = HttpRequest(
+      POST,
+      Uri(s"/whisk_user_logs/_search"),
+      List(Accept(MediaTypes.`application/json`)),
+      HttpEntity(ContentTypes.`application/json`, defaultGetPayload))
+    val esActivationStore =
+      new ArtifactElasticSearchActivationStore(
+        system,
+        materializer,
+        logging,
+        Some(testFlow(emptyHttpResponse, httpRequest)),
+        elasticSearchConfig = defaultConfig)
+
+    a[NoDocumentException] should be thrownBy await(
+      esActivationStore.get(activationId, UserContext(user, defaultLogStoreHttpRequest)))
+  }
+
+  it should "dynamically replace $UUID when getting an activation" in {
+    val dynamicPathConfig =
+      ElasticSearchActivationStoreConfig("https", "host", 443, "/elasticsearch/logstash-%s*/_search", defaultSchema)
+    val httpRequest = HttpRequest(
+      POST,
+      Uri(s"/elasticsearch/logstash-${user.namespace.uuid.asString}*/_search"),
+      List(Accept(MediaTypes.`application/json`)),
+      HttpEntity(ContentTypes.`application/json`, defaultGetPayload))
+
+    val esActivationStore =
+      new ArtifactElasticSearchActivationStore(
+        system,
+        materializer,
+        logging,
+        Some(testFlow(defaultHttpResponse, httpRequest)),
+        elasticSearchConfig = dynamicPathConfig)
+
+    await(esActivationStore.get(activation.activationId, UserContext(user, defaultLogStoreHttpRequest))) shouldBe activation
+  }
+
+  it should "count activations in namespace" in {
+    val httpRequest = HttpRequest(
+      POST,
+      Uri(s"/whisk_user_logs/_search"),
+      List(Accept(MediaTypes.`application/json`)),
+      HttpEntity(ContentTypes.`application/json`, defaultCountPayload))
+    val esActivationStore =
+      new ArtifactElasticSearchActivationStore(
+        system,
+        materializer,
+        logging,
+        Some(testFlow(defaultHttpResponse, httpRequest)),
+        elasticSearchConfig = defaultConfig)
+
+    await(
+      esActivationStore.countActivationsInNamespace(
+        user.namespace.name.toPath,
+        Some(name.toPath),
+        1,
+        since = Some(since),
+        upto = Some(upto),
+        UserContext(user, defaultLogStoreHttpRequest))) shouldBe JsObject("activations" -> JsNumber(1))
+  }
+
+  it should "count activations in namespace with no entity name" in {
+    val payload = JsObject(
+      "query" -> JsObject(
+        "bool" -> JsObject(
+          "must" -> JsArray(JsObject("match" -> JsObject("_type" -> JsString(defaultConfig.schema.activationRecord)))),
+          "filter" -> JsArray(
+            JsObject("range" -> JsObject(
+              defaultConfig.schema.start -> JsObject("gt" -> JsString(since.toEpochMilli.toString)))),
+            JsObject("range" -> JsObject(
+              defaultConfig.schema.start -> JsObject("lt" -> JsString(upto.toEpochMilli.toString))))))),
+      "sort" -> JsArray(JsObject(defaultConfig.schema.start -> JsObject("order" -> JsString("desc")))),
+      "from" -> JsNumber(1)).compactPrint
+    val httpRequest = HttpRequest(
+      POST,
+      Uri(s"/whisk_user_logs/_search"),
+      List(Accept(MediaTypes.`application/json`)),
+      HttpEntity(ContentTypes.`application/json`, payload))
+    val esActivationStore =
+      new ArtifactElasticSearchActivationStore(
+        system,
+        materializer,
+        logging,
+        Some(testFlow(defaultHttpResponse, httpRequest)),
+        elasticSearchConfig = defaultConfig)
+
+    await(
+      esActivationStore.countActivationsInNamespace(
+        user.namespace.name.toPath,
+        skip = 1,
+        since = Some(since),
+        upto = Some(upto),
+        context = UserContext(user, defaultLogStoreHttpRequest))) shouldBe JsObject("activations" -> JsNumber(1))
+  }
+
+  it should "count zero activations in when there are not any activations that match entity" in {
+    val httpRequest = HttpRequest(
+      POST,
+      Uri(s"/whisk_user_logs/_search"),
+      List(Accept(MediaTypes.`application/json`)),
+      HttpEntity(ContentTypes.`application/json`, defaultCountPayload))
+    val esActivationStore =
+      new ArtifactElasticSearchActivationStore(
+        system,
+        materializer,
+        logging,
+        Some(testFlow(emptyHttpResponse, httpRequest)),
+        elasticSearchConfig = defaultConfig)
+
+    await(
+      esActivationStore.countActivationsInNamespace(
+        user.namespace.name.toPath,
+        Some(name.toPath),
+        1,
+        since = Some(since),
+        upto = Some(upto),
+        UserContext(user, defaultLogStoreHttpRequest))) shouldBe JsObject("activations" -> JsNumber(0))
+  }
+
+  it should "dynamically replace $UUID in request when counting activations" in {
+    val dynamicPathConfig =
+      ElasticSearchActivationStoreConfig("https", "host", 443, "/elasticsearch/logstash-%s*/_search", defaultSchema)
+    val httpRequest = HttpRequest(
+      POST,
+      Uri(s"/elasticsearch/logstash-${user.namespace.uuid.asString}*/_search"),
+      List(Accept(MediaTypes.`application/json`)),
+      HttpEntity(ContentTypes.`application/json`, defaultCountPayload))
+
+    val esActivationStore =
+      new ArtifactElasticSearchActivationStore(
+        system,
+        materializer,
+        logging,
+        Some(testFlow(defaultHttpResponse, httpRequest)),
+        elasticSearchConfig = dynamicPathConfig)
+
+    await(
+      esActivationStore.countActivationsInNamespace(
+        user.namespace.name.toPath,
+        Some(name.toPath),
+        1,
+        since = Some(since),
+        upto = Some(upto),
+        UserContext(user, defaultLogStoreHttpRequest))) shouldBe JsObject("activations" -> JsNumber(1))
+  }
+
+  it should "list activations matching entity name" in {
+    val httpRequest = HttpRequest(
+      POST,
+      Uri(s"/whisk_user_logs/_search"),
+      List(Accept(MediaTypes.`application/json`)),
+      HttpEntity(ContentTypes.`application/json`, defaultListEntityPayload))
+    val esActivationStore =
+      new ArtifactElasticSearchActivationStore(
+        system,
+        materializer,
+        logging,
+        Some(testFlow(defaultHttpResponse, httpRequest)),
+        elasticSearchConfig = defaultConfig)
+
+    await(
+      esActivationStore.listActivationsMatchingName(
+        user.namespace.name.toPath,
+        name.toPath,
+        1,
+        2,
+        since = Some(since),
+        upto = Some(upto),
+        context = UserContext(user, defaultLogStoreHttpRequest))) shouldBe Right(List(activation, activation))
+  }
+
+  it should "display empty activations list when there are not any activations that match entity name" in {
+    val httpRequest = HttpRequest(
+      POST,
+      Uri(s"/whisk_user_logs/_search"),
+      List(Accept(MediaTypes.`application/json`)),
+      HttpEntity(ContentTypes.`application/json`, defaultListEntityPayload))
+    val esActivationStore =
+      new ArtifactElasticSearchActivationStore(
+        system,
+        materializer,
+        logging,
+        Some(testFlow(emptyHttpResponse, httpRequest)),
+        elasticSearchConfig = defaultConfig)
+
+    await(
+      esActivationStore.listActivationsMatchingName(
+        user.namespace.name.toPath,
+        name.toPath,
+        1,
+        2,
+        since = Some(since),
+        upto = Some(upto),
+        context = UserContext(user, defaultLogStoreHttpRequest))) shouldBe Right(List.empty)
+  }
+
+  it should "dynamically replace $UUID in request when getting activations matching entity name" in {
+    val dynamicPathConfig =
+      ElasticSearchActivationStoreConfig("https", "host", 443, "/elasticsearch/logstash-%s*/_search", defaultSchema)
+    val httpRequest = HttpRequest(
+      POST,
+      Uri(s"/elasticsearch/logstash-${user.namespace.uuid.asString}*/_search"),
+      List(Accept(MediaTypes.`application/json`)),
+      HttpEntity(ContentTypes.`application/json`, defaultListEntityPayload))
+
+    val esActivationStore =
+      new ArtifactElasticSearchActivationStore(
+        system,
+        materializer,
+        logging,
+        Some(testFlow(defaultHttpResponse, httpRequest)),
+        elasticSearchConfig = dynamicPathConfig)
+
+    await(
+      esActivationStore.listActivationsMatchingName(
+        user.namespace.name.toPath,
+        name.toPath,
+        1,
+        2,
+        since = Some(since),
+        upto = Some(upto),
+        context = UserContext(user, defaultLogStoreHttpRequest))) shouldBe Right(List(activation, activation))
+  }
+
+  it should "list activations in namespace" in {
+    val httpRequest = HttpRequest(
+      POST,
+      Uri(s"/whisk_user_logs/_search"),
+      List(Accept(MediaTypes.`application/json`)),
+      HttpEntity(ContentTypes.`application/json`, defaultListPayload))
+    val esActivationStore =
+      new ArtifactElasticSearchActivationStore(
+        system,
+        materializer,
+        logging,
+        Some(testFlow(defaultHttpResponse, httpRequest)),
+        elasticSearchConfig = defaultConfig)
+
+    await(
+      esActivationStore.listActivationsInNamespace(
+        user.namespace.name.toPath,
+        1,
+        2,
+        since = Some(since),
+        upto = Some(upto),
+        context = UserContext(user, defaultLogStoreHttpRequest))) shouldBe Right(List(activation, activation))
+  }
+
+  it should "display empty activations list when there are not any activations in namespace" in {
+    val httpRequest = HttpRequest(
+      POST,
+      Uri(s"/whisk_user_logs/_search"),
+      List(Accept(MediaTypes.`application/json`)),
+      HttpEntity(ContentTypes.`application/json`, defaultListPayload))
+    val esActivationStore =
+      new ArtifactElasticSearchActivationStore(
+        system,
+        materializer,
+        logging,
+        Some(testFlow(emptyHttpResponse, httpRequest)),
+        elasticSearchConfig = defaultConfig)
+
+    await(
+      esActivationStore.listActivationsInNamespace(
+        user.namespace.name.toPath,
+        1,
+        2,
+        since = Some(since),
+        upto = Some(upto),
+        context = UserContext(user, defaultLogStoreHttpRequest))) shouldBe Right(List.empty)
+  }
+
+  it should "dynamically replace $UUID in request when listing activations in namespace" in {
+    val dynamicPathConfig =
+      ElasticSearchActivationStoreConfig("https", "host", 443, "/elasticsearch/logstash-%s*/_search", defaultSchema)
+    val httpRequest = HttpRequest(
+      POST,
+      Uri(s"/elasticsearch/logstash-${user.namespace.uuid.asString}*/_search"),
+      List(Accept(MediaTypes.`application/json`)),
+      HttpEntity(ContentTypes.`application/json`, defaultListPayload))
+
+    val esActivationStore =
+      new ArtifactElasticSearchActivationStore(
+        system,
+        materializer,
+        logging,
+        Some(testFlow(defaultHttpResponse, httpRequest)),
+        elasticSearchConfig = dynamicPathConfig)
+
+    await(
+      esActivationStore.listActivationsInNamespace(
+        user.namespace.name.toPath,
+        1,
+        2,
+        since = Some(since),
+        upto = Some(upto),
+        context = UserContext(user, defaultLogStoreHttpRequest))) shouldBe Right(List(activation, activation))
+  }
+
+  it should "forward errors from Elasticsearch" in {
+    val httpResponse = HttpResponse(StatusCodes.InternalServerError)
+    val esActivationStore =
+      new ArtifactElasticSearchActivationStore(
+        system,
+        materializer,
+        logging,
+        Some(testFlow(httpResponse, defaultHttpRequest)),
+        elasticSearchConfig = defaultConfig)
+
+    a[RuntimeException] should be thrownBy await(
+      esActivationStore.get(activation.activationId, UserContext(user, defaultLogStoreHttpRequest)))
+    a[RuntimeException] should be thrownBy await(
+      esActivationStore
+        .listActivationsInNamespace(EntityPath(""), 0, 0, context = UserContext(user, defaultLogStoreHttpRequest)))
+    a[RuntimeException] should be thrownBy await(
+      esActivationStore.listActivationsMatchingName(
+        EntityPath(""),
+        EntityPath(""),
+        0,
+        0,
+        context = UserContext(user, defaultLogStoreHttpRequest)))
+    a[RuntimeException] should be thrownBy await(
+      esActivationStore
+        .countActivationsInNamespace(EntityPath(""), None, 0, context = UserContext(user, defaultLogStoreHttpRequest)))
+  }
+
+  it should "fail when loading out of box configs since whisk.activationstore.elasticsearch does not exist" in {
+    a[ConfigReaderException[_]] should be thrownBy new ArtifactElasticSearchActivationStore(
+      system,
+      materializer,
+      logging)
+  }
+
+  it should "error when configuration protocol is invalid" in {
+    val invalidHostConfig =
+      ElasticSearchActivationStoreConfig("protocol", "host", 443, "/whisk_user_logs", defaultSchema, Seq.empty)
+
+    a[IllegalArgumentException] should be thrownBy new ArtifactElasticSearchActivationStore(
+      system,
+      materializer,
+      logging,
+      elasticSearchConfig = invalidHostConfig)
+  }
+
+}


With regards,
Apache Git Services

Mime
View raw message