openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tysonnor...@apache.org
Subject [incubator-openwhisk] branch master updated: Ensure cache gets properly updated with concurrent access for action with attachments (#4183)
Date Mon, 17 Dec 2018 19:16:29 GMT
This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bd21bc  Ensure cache gets properly updated with concurrent access for action with
attachments (#4183)
1bd21bc is described below

commit 1bd21bccc7ad90b0081add8e1a9cd736dc60ad47
Author: Chetan Mehrotra <chetanm@apache.org>
AuthorDate: Tue Dec 18 00:46:23 2018 +0530

    Ensure cache gets properly updated with concurrent access for action with attachments
(#4183)
---
 .../openwhisk/core/database/DocumentFactory.scala  | 155 +++++++--------------
 .../apache/openwhisk/core/entity/WhiskAction.scala |   5 +-
 .../core/controller/test/ActionsApiTests.scala     |  57 +++++++-
 3 files changed, 109 insertions(+), 108 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/DocumentFactory.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/DocumentFactory.scala
index b32ec9a..75f2d70 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/DocumentFactory.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/DocumentFactory.scala
@@ -21,9 +21,6 @@ import java.io.InputStream
 import java.io.OutputStream
 
 import scala.concurrent.{Future, Promise}
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Try
 import akka.http.scaladsl.model.ContentType
 import akka.stream.IOResult
 import akka.stream.scaladsl.StreamConverters
@@ -102,24 +99,12 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
   def put[Wsuper >: W](db: ArtifactStore[Wsuper], doc: W, old: Option[W])(
     implicit transid: TransactionId,
     notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
-    Try {
-      require(db != null, "db undefined")
-      require(doc != null, "doc undefined")
-    } map { _ =>
-      implicit val logger = db.logging
-      implicit val ec = db.executionContext
-
-      val key = CacheKey(doc)
-      val docInfo = doc.docinfo
-
-      cacheUpdate(doc, key, db.put(doc) map { newDocInfo =>
-        doc.revision[W](newDocInfo.rev)
-        doc.docinfo
-      })
-    } match {
-      case Success(f) => f
-      case Failure(t) => Future.failed(t)
-    }
+    implicit val logger = db.logging
+    implicit val ec = db.executionContext
+    cacheUpdate(doc, CacheKey(doc), db.put(doc) map { newDocInfo =>
+      doc.revision[W](newDocInfo.rev)
+      doc.docinfo
+    })
   }
 
   def putAndAttach[Wsuper >: W](db: ArtifactStore[Wsuper],
@@ -131,49 +116,31 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
                                 postProcess: Option[W => W] = None)(
     implicit transid: TransactionId,
     notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
+    implicit val logger = db.logging
+    implicit val ec = db.executionContext
 
-    Try {
-      require(db != null, "db undefined")
-      require(doc != null, "doc undefined")
-    } map { _ =>
-      implicit val logger = db.logging
-      implicit val ec = db.executionContext
-
-      val key = CacheKey(doc)
-      val src = StreamConverters.fromInputStream(() => bytes)
-
-      val p = Promise[W]
-      cacheUpdate(p.future, key, db.putAndAttach[W](doc, update, contentType, src, oldAttachment)
map {
-        case (newDocInfo, attached) =>
-          val newDoc = update(doc, attached)
-          val cacheDoc = postProcess map { _(newDoc) } getOrElse newDoc
-          cacheDoc.revision[W](newDocInfo.rev)
-          p.success(cacheDoc)
-          newDocInfo
-      })
-
-    } match {
-      case Success(f) => f
-      case Failure(t) => Future.failed(t)
-    }
+    val key = CacheKey(doc)
+    val src = StreamConverters.fromInputStream(() => bytes)
+
+    val p = Promise[W]
+    cacheUpdate(p.future, key, db.putAndAttach[W](doc, update, contentType, src, oldAttachment)
map {
+      case (newDocInfo, attached) =>
+        val newDoc = update(doc, attached)
+        val cacheDoc = postProcess map { _(newDoc) } getOrElse newDoc
+        cacheDoc.revision[W](newDocInfo.rev)
+        p.success(cacheDoc)
+        newDocInfo
+    })
   }
 
   def del[Wsuper >: W](db: ArtifactStore[Wsuper], doc: DocInfo)(
     implicit transid: TransactionId,
     notifier: Option[CacheChangeNotification]): Future[Boolean] = {
-    Try {
-      require(db != null, "db undefined")
-      require(doc != null, "doc undefined")
-    } map { _ =>
-      implicit val logger = db.logging
-      implicit val ec = db.executionContext
+    implicit val logger = db.logging
+    implicit val ec = db.executionContext
 
-      val key = CacheKey(doc.id.asDocInfo)
-      cacheInvalidate(key, db.del(doc))
-    } match {
-      case Success(f) => f
-      case Failure(t) => Future.failed(t)
-    }
+    val key = CacheKey(doc.id.asDocInfo)
+    cacheInvalidate(key, db.del(doc))
   }
 
   /**
@@ -198,77 +165,57 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
     doc: DocId,
     rev: DocRevision = DocRevision.empty,
     fromCache: Boolean = cacheEnabled)(implicit transid: TransactionId, mw: Manifest[W]):
Future[W] = {
-    getWithAttachment(db, doc, rev, fromCache, None)
+    implicit val logger = db.logging
+    implicit val ec = db.executionContext
+    val key = doc.asDocInfo(rev)
+    cacheLookup(CacheKey(key), db.get[W](key, None), fromCache)
   }
 
+  /**
+   *  Fetches document along with attachment. `postProcess` would be used to process the
fetched document
+   *  before adding it to cache. This ensures that for documents having attachment the cache
is updated only
+   *  post fetch of the attachment
+   */
   protected def getWithAttachment[Wsuper >: W](
     db: ArtifactStore[Wsuper],
     doc: DocId,
     rev: DocRevision = DocRevision.empty,
     fromCache: Boolean,
-    attachmentHandler: Option[(W, Attached) => W])(implicit transid: TransactionId, mw:
Manifest[W]): Future[W] = {
-    Try {
-      require(db != null, "db undefined")
-    } map {
-      implicit val logger = db.logging
-      implicit val ec = db.executionContext
-      val key = doc.asDocInfo(rev)
-      _ =>
-        cacheLookup(CacheKey(key), db.get[W](key, attachmentHandler), fromCache)
-    } match {
-      case Success(f) => f
-      case Failure(t) => Future.failed(t)
-    }
+    attachmentHandler: (W, Attached) => W,
+    postProcess: W => Future[W])(implicit transid: TransactionId, mw: Manifest[W]): Future[W]
= {
+    implicit val logger = db.logging
+    implicit val ec = db.executionContext
+    val key = doc.asDocInfo(rev)
+    cacheLookup(CacheKey(key), db.get[W](key, Some(attachmentHandler)).flatMap(postProcess),
fromCache)
   }
 
-  def getAttachment[Wsuper >: W](
+  protected def getAttachment[Wsuper >: W](
     db: ArtifactStore[Wsuper],
     doc: W,
     attached: Attached,
     outputStream: OutputStream,
     postProcess: Option[W => W] = None)(implicit transid: TransactionId, mw: Manifest[W]):
Future[W] = {
-
     implicit val ec = db.executionContext
     implicit val notifier: Option[CacheChangeNotification] = None
+    implicit val logger = db.logging
 
-    Try {
-      require(db != null, "db defined")
-      require(doc != null, "doc undefined")
-    } map { _ =>
-      implicit val logger = db.logging
-      implicit val ec = db.executionContext
-
-      val docInfo = doc.docinfo
-      val key = CacheKey(docInfo)
-      val sink = StreamConverters.fromOutputStream(() => outputStream)
+    val docInfo = doc.docinfo
+    val key = CacheKey(docInfo)
+    val sink = StreamConverters.fromOutputStream(() => outputStream)
 
-      db.readAttachment[IOResult](docInfo, attached, sink).map {
-        case _ =>
-          val cacheDoc = postProcess map { _(doc) } getOrElse doc
+    db.readAttachment[IOResult](docInfo, attached, sink).map { _ =>
+      val cacheDoc = postProcess.map(_(doc)).getOrElse(doc)
 
-          cacheUpdate(cacheDoc, key, Future.successful(docInfo)) map { newDocInfo =>
-            cacheDoc.revision[W](newDocInfo.rev)
-          }
-          cacheDoc
+      cacheUpdate(cacheDoc, key, Future.successful(docInfo)) map { newDocInfo =>
+        cacheDoc.revision[W](newDocInfo.rev)
       }
-
-    } match {
-      case Success(f) => f
-      case Failure(t) => Future.failed(t)
+      cacheDoc
     }
   }
 
   def deleteAttachments[Wsuper >: W](db: ArtifactStore[Wsuper], doc: DocInfo)(
     implicit transid: TransactionId): Future[Boolean] = {
-    Try {
-      require(db != null, "db defined")
-      require(doc != null, "doc undefined")
-    } map { _ =>
-      implicit val ec = db.executionContext
-      db.deleteAttachments(doc)
-    } match {
-      case Success(f) => f
-      case Failure(t) => Future.failed(t)
-    }
+    implicit val ec = db.executionContext
+    db.deleteAttachments(doc)
   }
 }
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala
index 65ddcf3..38d1a2f 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala
@@ -376,9 +376,7 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
 
     implicit val ec = db.executionContext
 
-    val fa = super.getWithAttachment(db, doc, rev, fromCache, Some(attachmentHandler _))
-
-    fa.flatMap { action =>
+    val inlineActionCode: WhiskAction => Future[WhiskAction] = { action =>
       def getWithAttachment(attached: Attached, binary: Boolean, exec: AttachedCode) = {
         val boas = new ByteArrayOutputStream()
         val wrapped = if (binary) Base64.getEncoder().wrap(boas) else boas
@@ -400,6 +398,7 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
           Future.successful(action)
       }
     }
+    super.getWithAttachment(db, doc, rev, fromCache, attachmentHandler, inlineActionCode)
   }
 
   def attachmentHandler(action: WhiskAction, attached: Attached): WhiskAction = {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
index d2d6387..ce8a1bf 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
@@ -36,8 +36,8 @@ import org.apache.openwhisk.core.entitlement.Collection
 import org.apache.openwhisk.http.ErrorResponse
 import org.apache.openwhisk.http.Messages
 import org.apache.openwhisk.core.database.UserContext
-
 import akka.http.scaladsl.model.headers.RawHeader
+import org.apache.commons.lang3.StringUtils
 import org.apache.openwhisk.core.entity.Attachments.Inline
 
 /**
@@ -1027,6 +1027,61 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi
{
     }
   }
 
+  it should "concurrently get an action with attachment that is not cached" in {
+    implicit val tid = transid()
+    val action = WhiskAction(namespace, aname(), jsDefault(nonInlinedCode(entityStore)),
Parameters("x", "b"))
+    val kind = NODEJS6
+
+    val content = WhiskActionPut(
+      Some(action.exec),
+      Some(action.parameters),
+      Some(
+        ActionLimitsOption(
+          Some(action.limits.timeout),
+          Some(action.limits.memory),
+          Some(action.limits.logs),
+          Some(action.limits.concurrency))))
+    val name = action.name
+    val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")", "\\)")
+    val expectedGetLog = Seq(
+      s"finding document: 'id: ${action.namespace}/${action.name}",
+      s"finding attachment '[\\w-/:]+' of document 'id: ${action.namespace}/${action.name}").mkString("(?s).*")
+
+    Put(s"$collectionPath/$name", content) ~> Route.seal(routes(creds)(transid())) ~>
check {
+      status should be(OK)
+    }
+
+    removeFromCache(action, WhiskAction)
+
+    stream.reset()
+
+    val expectedAction = WhiskAction(
+      action.namespace,
+      action.name,
+      action.exec,
+      action.parameters,
+      action.limits,
+      action.version,
+      action.publish,
+      action.annotations ++ Parameters(WhiskAction.execFieldName, kind))
+
+    (0 until 5).par.map { i =>
+      Get(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check
{
+        status should be(OK)
+        val response = responseAs[WhiskAction]
+        response should be(expectedAction)
+      }
+    }
+
+    //Loading action with attachment concurrently should load only attachment once
+    val logs = stream.toString
+    withClue(s"db logs $logs") {
+      StringUtils.countMatches(logs, "finding document") shouldBe 1
+      StringUtils.countMatches(logs, "finding attachment") shouldBe 1
+    }
+    stream.reset()
+  }
+
   it should "update an existing action with attachment that is not cached" in {
     implicit val tid = transid()
     val nodeAction = WhiskAction(namespace, aname(), jsDefault(nonInlinedCode(entityStore)),
Parameters("x", "b"))


Mime
View raw message