openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From csantan...@apache.org
Subject [incubator-openwhisk] branch master updated: Cache database attachments (#2855)
Date Thu, 10 May 2018 03:09:10 GMT
This is an automated email from the ASF dual-hosted git repository.

csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new d61748f  Cache database attachments (#2855)
d61748f is described below

commit d61748fd29f3ecd8caa0145cb3b9f6d73587fa66
Author: James Dubee <jwdubee@us.ibm.com>
AuthorDate: Wed May 9 23:09:05 2018 -0400

    Cache database attachments (#2855)
---
 .../whisk/core/database/DocumentFactory.scala      |  52 ++++--
 .../main/scala/whisk/core/entity/WhiskAction.scala |  19 +-
 .../core/controller/test/ActionsApiTests.scala     | 199 +++++++++++++++++++++
 .../scala/whisk/core/database/test/DbUtils.scala   |  18 ++
 4 files changed, 263 insertions(+), 25 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
index b6e346b..00f6669 100644
--- a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
@@ -120,12 +120,14 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
     }
   }
 
-  def attach[Wsuper >: W](
-    db: ArtifactStore[Wsuper],
-    doc: W,
-    attachmentName: String,
-    contentType: ContentType,
-    bytes: InputStream)(implicit transid: TransactionId, notifier: Option[CacheChangeNotification]):
Future[DocInfo] = {
+  def attach[Wsuper >: W](db: ArtifactStore[Wsuper],
+                          doc: W,
+                          attachmentName: String,
+                          contentType: ContentType,
+                          bytes: InputStream,
+                          postProcess: Option[W => W] = None)(
+    implicit transid: TransactionId,
+    notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
 
     Try {
       require(db != null, "db undefined")
@@ -137,10 +139,11 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
       val key = CacheKey(doc)
       val docInfo = doc.docinfo
       val src = StreamConverters.fromInputStream(() => bytes)
+      val cacheDoc = postProcess map { _(doc) } getOrElse doc
 
-      cacheUpdate(doc, key, db.attach(docInfo, attachmentName, contentType, src) map { newDocInfo
=>
-        doc.revision[W](newDocInfo.rev)
-        doc.docinfo
+      cacheUpdate(cacheDoc, key, db.attach(docInfo, attachmentName, contentType, src) map
{ newDocInfo =>
+        cacheDoc.revision[W](newDocInfo.rev)
+        cacheDoc.docinfo
       })
     } match {
       case Success(f) => f
@@ -202,26 +205,37 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
     }
   }
 
-  def getAttachment[Wsuper >: W](db: ArtifactStore[Wsuper],
-                                 doc: DocInfo,
-                                 attachmentName: String,
-                                 outputStream: OutputStream)(implicit transid: TransactionId):
Future[Unit] = {
+  def getAttachment[Wsuper >: W](
+    db: ArtifactStore[Wsuper],
+    doc: W,
+    attachmentName: String,
+    outputStream: OutputStream,
+    postProcess: Option[W => W] = None)(implicit transid: TransactionId, mw: Manifest[W]):
Future[W] = {
 
     implicit val ec = db.executionContext
+    implicit val notifier: Option[CacheChangeNotification] = None
 
     Try {
       require(db != null, "db defined")
       require(doc != null, "doc undefined")
     } map { _ =>
+      implicit val logger = db.logging
+      implicit val ec = db.executionContext
+
+      val docInfo = doc.docinfo
+      val key = CacheKey(docInfo)
       val sink = StreamConverters.fromOutputStream(() => outputStream)
-      db.readAttachment[IOResult](doc, attachmentName, sink).map {
-        case (_, r) =>
-          if (!r.wasSuccessful) {
-            // FIXME...
-            // Figure out whether OutputStreams are even a decent model.
+
+      db.readAttachment[IOResult](docInfo, attachmentName, sink).map {
+        case _ =>
+          val cacheDoc = postProcess map { _(doc) } getOrElse doc
+
+          cacheUpdate(cacheDoc, key, Future.successful(docInfo)) map { newDocInfo =>
+            cacheDoc.revision[W](newDocInfo.rev)
           }
-          ()
+          cacheDoc
       }
+
     } match {
       case Success(f) => f
       case Failure(t) => Future.failed(t)
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
index 97f98ae..1e380c2 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
@@ -337,8 +337,15 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
           val manifest = exec.manifest.attached.get
 
           for (i1 <- super.put(db, newDoc);
-               i2 <- attach[A](db, newDoc.revision(i1.rev), manifest.attachmentName, manifest.attachmentType,
stream))
-            yield i2
+               i2 <- attach[A](
+                 db,
+                 newDoc.revision(i1.rev),
+                 manifest.attachmentName,
+                 manifest.attachmentType,
+                 stream,
+                 Some { a: WhiskAction =>
+                   a.copy(exec = exec.inline(code.getBytes("UTF-8")))
+                 })) yield i2
 
         case _ =>
           super.put(db, doc)
@@ -366,12 +373,12 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
           val boas = new ByteArrayOutputStream()
           val b64s = Base64.getEncoder().wrap(boas)
 
-          getAttachment[A](db, action.docinfo, attachmentName, b64s).map { _ =>
+          getAttachment[A](db, action, attachmentName, b64s, Some { a: WhiskAction =>
             b64s.close()
-            val newAction = action.copy(exec = exec.inline(boas.toByteArray))
-            newAction.revision(action.rev)
+            val newAction = a.copy(exec = exec.inline(boas.toByteArray))
+            newAction.revision(a.rev)
             newAction
-          }
+          })
 
         case _ =>
           Future.successful(action)
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
index e20936a..609e773 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
@@ -40,6 +40,10 @@ import whisk.core.entitlement.Collection
 import whisk.http.ErrorResponse
 import whisk.http.Messages
 
+import java.io.ByteArrayInputStream
+import java.util.Base64
+import akka.stream.scaladsl._
+
 /**
  * Tests Actions API.
  *
@@ -775,6 +779,201 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi
{
     }
   }
 
+  it should "put and then get an action with attachment from cache" in {
+    val action =
+      WhiskAction(namespace, aname(), javaDefault("ZHViZWU=", Some("hello")), annotations
= Parameters("exec", "java"))
+    val content = WhiskActionPut(
+      Some(action.exec),
+      Some(action.parameters),
+      Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
+    val name = action.name
+    val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")", "\\)")
+    val expectedPutLog = Seq(
+      s"caching $cacheKey",
+      s"uploading attachment 'jarfile' of document 'id: ${action.namespace}/${action.name}",
+      s"caching $cacheKey").mkString("(?s).*")
+    val notExpectedGetLog = Seq(
+      s"finding document: 'id: ${action.namespace}/${action.name}",
+      s"finding attachment 'jarfile' of document 'id: ${action.namespace}/${action.name}").mkString("(?s).*")
+
+    // first request invalidates any previous entries and caches new result
+    Put(s"$collectionPath/$name", content) ~> Route.seal(routes(creds)(transid())) ~>
check {
+      status should be(OK)
+      val response = responseAs[WhiskAction]
+      response should be(
+        WhiskAction(
+          action.namespace,
+          action.name,
+          action.exec,
+          action.parameters,
+          action.limits,
+          action.version,
+          action.publish,
+          action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+    }
+
+    stream.toString should not include (s"invalidating ${CacheKey(action)} on delete")
+    stream.toString should include regex (expectedPutLog)
+    stream.reset()
+
+    // second request should fetch from cache
+    Get(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check
{
+      status should be(OK)
+      val response = responseAs[WhiskAction]
+      response should be(
+        WhiskAction(
+          action.namespace,
+          action.name,
+          action.exec,
+          action.parameters,
+          action.limits,
+          action.version,
+          action.publish,
+          action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+    }
+
+    stream.toString should include(s"serving from cache: ${CacheKey(action)}")
+    stream.toString should not include regex(notExpectedGetLog)
+    stream.reset()
+
+    // delete should invalidate cache
+    Delete(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check
{
+      status should be(OK)
+      val response = responseAs[WhiskAction]
+      response should be(
+        WhiskAction(
+          action.namespace,
+          action.name,
+          action.exec,
+          action.parameters,
+          action.limits,
+          action.version,
+          action.publish,
+          action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+    }
+    stream.toString should include(s"invalidating ${CacheKey(action)}")
+    stream.reset()
+  }
+
+  it should "get an action with attachment that is not cached" in {
+    implicit val tid = transid()
+    val code = "ZHViZWU="
+    val action =
+      WhiskAction(namespace, aname(), javaDefault(code, Some("hello")), annotations = Parameters("exec",
"java"))
+    val content = WhiskActionPut(
+      Some(action.exec),
+      Some(action.parameters),
+      Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
+    val name = action.name
+    val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")", "\\)")
+    val expectedGetLog = Seq(
+      s"finding document: 'id: ${action.namespace}/${action.name}",
+      s"finding attachment 'jarfile' of document 'id: ${action.namespace}/${action.name}").mkString("(?s).*")
+
+    action.exec match {
+      case exec @ CodeExecAsAttachment(_, _, _) =>
+        val newAction = action.copy(exec = exec.attach)
+        newAction.revision(action.rev)
+
+        val doc1 = put(entityStore, newAction, false)
+
+        val stream = new ByteArrayInputStream(Base64.getDecoder().decode(code))
+        val manifest = exec.manifest.attached.get
+        val src = StreamConverters.fromInputStream(() => stream)
+
+        attach(entityStore, doc1, manifest.attachmentName, manifest.attachmentType, src)
+
+      case _ =>
+    }
+
+    // second request should fetch from cache
+    Get(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check
{
+      status should be(OK)
+      val response = responseAs[WhiskAction]
+      response should be(
+        WhiskAction(
+          action.namespace,
+          action.name,
+          action.exec,
+          action.parameters,
+          action.limits,
+          action.version,
+          action.publish,
+          action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+    }
+
+    stream.toString should include regex (expectedGetLog)
+    stream.reset()
+  }
+
+  it should "update an existing action with attachment that is not cached" in {
+    implicit val tid = transid()
+    val code = "ZHViZWU="
+    val action =
+      WhiskAction(namespace, aname(), javaDefault(code, Some("hello")), annotations = Parameters("exec",
"java"))
+    val content = WhiskActionPut(
+      Some(action.exec),
+      Some(action.parameters),
+      Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
+    val name = action.name
+    val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")", "\\)")
+    val expectedPutLog = Seq(
+      s"caching $cacheKey",
+      s"uploading attachment 'jarfile' of document 'id: ${action.namespace}/${action.name}",
+      s"caching $cacheKey").mkString("(?s).*")
+
+    action.exec match {
+      case exec @ CodeExecAsAttachment(_, _, _) =>
+        val newAction = action.copy(exec = exec.attach)
+        newAction.revision(action.rev)
+
+        val doc = put(entityStore, newAction)
+
+        val stream = new ByteArrayInputStream(Base64.getDecoder().decode(code))
+        val manifest = exec.manifest.attached.get
+        val src = StreamConverters.fromInputStream(() => stream)
+
+        attach(entityStore, doc, manifest.attachmentName, manifest.attachmentType, src)
+
+      case _ =>
+    }
+
+    Put(s"$collectionPath/$name?overwrite=true", content) ~> Route.seal(routes(creds)(transid()))
~> check {
+      status should be(OK)
+      val response = responseAs[WhiskAction]
+      response should be(
+        WhiskAction(
+          action.namespace,
+          action.name,
+          action.exec,
+          action.parameters,
+          action.limits,
+          action.version.upPatch,
+          action.publish,
+          action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+    }
+    stream.toString should include regex (expectedPutLog)
+    stream.reset()
+
+    // delete should invalidate cache
+    Delete(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check
{
+      status should be(OK)
+      val response = responseAs[WhiskAction]
+      response should be(
+        WhiskAction(
+          action.namespace,
+          action.name,
+          action.exec,
+          action.parameters,
+          action.limits,
+          action.version.upPatch,
+          action.publish,
+          action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+    }
+    stream.toString should include(s"invalidating ${CacheKey(action)}")
+    stream.reset()
+  }
+
   it should "reject put with conflict for pre-existing action" in {
     implicit val tid = transid()
     val action = WhiskAction(namespace, aname(), jsDefault("??"), Parameters("x", "b"))
diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
index b341bae..fbcd1cf 100644
--- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
+++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
@@ -39,6 +39,10 @@ import whisk.core.entity._
 import whisk.core.entity.types.AuthStore
 import whisk.core.entity.types.EntityStore
 
+import akka.http.scaladsl.model.ContentType
+import akka.stream.scaladsl.Source
+import akka.util.ByteString
+
 /**
  * WARNING: the put/get/del operations in this trait operate directly on the datastore,
  * and in the presence of a cache, there will be inconsistencies if one mixes these
@@ -200,6 +204,20 @@ trait DbUtils {
     doc
   }
 
+  def attach[A, Au >: A](
+    db: ArtifactStore[Au],
+    doc: DocInfo,
+    name: String,
+    contentType: ContentType,
+    docStream: Source[ByteString, _],
+    garbageCollect: Boolean = true)(implicit transid: TransactionId, timeout: Duration =
10 seconds): DocInfo = {
+    val docFuture = db.attach(doc, name, contentType, docStream)
+    val newDoc = Await.result(docFuture, timeout)
+    assert(newDoc != null)
+    if (garbageCollect) docsToDelete += ((db, newDoc))
+    newDoc
+  }
+
   /**
    * Gets document by id from datastore, and add it to gc queue to delete after the test
completes.
    */

-- 
To stop receiving notification emails like this one, please contact
csantanapr@apache.org.

Mime
View raw message