This is an automated email from the ASF dual-hosted git repository.
csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new d61748f Cache database attachments (#2855)
d61748f is described below
commit d61748fd29f3ecd8caa0145cb3b9f6d73587fa66
Author: James Dubee <jwdubee@us.ibm.com>
AuthorDate: Wed May 9 23:09:05 2018 -0400
Cache database attachments (#2855)
---
.../whisk/core/database/DocumentFactory.scala | 52 ++++--
.../main/scala/whisk/core/entity/WhiskAction.scala | 19 +-
.../core/controller/test/ActionsApiTests.scala | 199 +++++++++++++++++++++
.../scala/whisk/core/database/test/DbUtils.scala | 18 ++
4 files changed, 263 insertions(+), 25 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
index b6e346b..00f6669 100644
--- a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
@@ -120,12 +120,14 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
}
}
- def attach[Wsuper >: W](
- db: ArtifactStore[Wsuper],
- doc: W,
- attachmentName: String,
- contentType: ContentType,
- bytes: InputStream)(implicit transid: TransactionId, notifier: Option[CacheChangeNotification]):
Future[DocInfo] = {
+ def attach[Wsuper >: W](db: ArtifactStore[Wsuper],
+ doc: W,
+ attachmentName: String,
+ contentType: ContentType,
+ bytes: InputStream,
+ postProcess: Option[W => W] = None)(
+ implicit transid: TransactionId,
+ notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
Try {
require(db != null, "db undefined")
@@ -137,10 +139,11 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
val key = CacheKey(doc)
val docInfo = doc.docinfo
val src = StreamConverters.fromInputStream(() => bytes)
+ val cacheDoc = postProcess map { _(doc) } getOrElse doc
- cacheUpdate(doc, key, db.attach(docInfo, attachmentName, contentType, src) map { newDocInfo
=>
- doc.revision[W](newDocInfo.rev)
- doc.docinfo
+ cacheUpdate(cacheDoc, key, db.attach(docInfo, attachmentName, contentType, src) map
{ newDocInfo =>
+ cacheDoc.revision[W](newDocInfo.rev)
+ cacheDoc.docinfo
})
} match {
case Success(f) => f
@@ -202,26 +205,37 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
}
}
- def getAttachment[Wsuper >: W](db: ArtifactStore[Wsuper],
- doc: DocInfo,
- attachmentName: String,
- outputStream: OutputStream)(implicit transid: TransactionId):
Future[Unit] = {
+ def getAttachment[Wsuper >: W](
+ db: ArtifactStore[Wsuper],
+ doc: W,
+ attachmentName: String,
+ outputStream: OutputStream,
+ postProcess: Option[W => W] = None)(implicit transid: TransactionId, mw: Manifest[W]):
Future[W] = {
implicit val ec = db.executionContext
+ implicit val notifier: Option[CacheChangeNotification] = None
Try {
require(db != null, "db defined")
require(doc != null, "doc undefined")
} map { _ =>
+ implicit val logger = db.logging
+ implicit val ec = db.executionContext
+
+ val docInfo = doc.docinfo
+ val key = CacheKey(docInfo)
val sink = StreamConverters.fromOutputStream(() => outputStream)
- db.readAttachment[IOResult](doc, attachmentName, sink).map {
- case (_, r) =>
- if (!r.wasSuccessful) {
- // FIXME...
- // Figure out whether OutputStreams are even a decent model.
+
+ db.readAttachment[IOResult](docInfo, attachmentName, sink).map {
+ case _ =>
+ val cacheDoc = postProcess map { _(doc) } getOrElse doc
+
+ cacheUpdate(cacheDoc, key, Future.successful(docInfo)) map { newDocInfo =>
+ cacheDoc.revision[W](newDocInfo.rev)
}
- ()
+ cacheDoc
}
+
} match {
case Success(f) => f
case Failure(t) => Future.failed(t)
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
index 97f98ae..1e380c2 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
@@ -337,8 +337,15 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
val manifest = exec.manifest.attached.get
for (i1 <- super.put(db, newDoc);
- i2 <- attach[A](db, newDoc.revision(i1.rev), manifest.attachmentName, manifest.attachmentType,
stream))
- yield i2
+ i2 <- attach[A](
+ db,
+ newDoc.revision(i1.rev),
+ manifest.attachmentName,
+ manifest.attachmentType,
+ stream,
+ Some { a: WhiskAction =>
+ a.copy(exec = exec.inline(code.getBytes("UTF-8")))
+ })) yield i2
case _ =>
super.put(db, doc)
@@ -366,12 +373,12 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
val boas = new ByteArrayOutputStream()
val b64s = Base64.getEncoder().wrap(boas)
- getAttachment[A](db, action.docinfo, attachmentName, b64s).map { _ =>
+ getAttachment[A](db, action, attachmentName, b64s, Some { a: WhiskAction =>
b64s.close()
- val newAction = action.copy(exec = exec.inline(boas.toByteArray))
- newAction.revision(action.rev)
+ val newAction = a.copy(exec = exec.inline(boas.toByteArray))
+ newAction.revision(a.rev)
newAction
- }
+ })
case _ =>
Future.successful(action)
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
index e20936a..609e773 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
@@ -40,6 +40,10 @@ import whisk.core.entitlement.Collection
import whisk.http.ErrorResponse
import whisk.http.Messages
+import java.io.ByteArrayInputStream
+import java.util.Base64
+import akka.stream.scaladsl._
+
/**
* Tests Actions API.
*
@@ -775,6 +779,201 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi
{
}
}
+ it should "put and then get an action with attachment from cache" in {
+ val action =
+ WhiskAction(namespace, aname(), javaDefault("ZHViZWU=", Some("hello")), annotations
= Parameters("exec", "java"))
+ val content = WhiskActionPut(
+ Some(action.exec),
+ Some(action.parameters),
+ Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
+ val name = action.name
+ val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")", "\\)")
+ val expectedPutLog = Seq(
+ s"caching $cacheKey",
+ s"uploading attachment 'jarfile' of document 'id: ${action.namespace}/${action.name}",
+ s"caching $cacheKey").mkString("(?s).*")
+ val notExpectedGetLog = Seq(
+ s"finding document: 'id: ${action.namespace}/${action.name}",
+ s"finding attachment 'jarfile' of document 'id: ${action.namespace}/${action.name}").mkString("(?s).*")
+
+ // first request invalidates any previous entries and caches new result
+ Put(s"$collectionPath/$name", content) ~> Route.seal(routes(creds)(transid())) ~>
check {
+ status should be(OK)
+ val response = responseAs[WhiskAction]
+ response should be(
+ WhiskAction(
+ action.namespace,
+ action.name,
+ action.exec,
+ action.parameters,
+ action.limits,
+ action.version,
+ action.publish,
+ action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+ }
+
+ stream.toString should not include (s"invalidating ${CacheKey(action)} on delete")
+ stream.toString should include regex (expectedPutLog)
+ stream.reset()
+
+ // second request should fetch from cache
+ Get(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check
{
+ status should be(OK)
+ val response = responseAs[WhiskAction]
+ response should be(
+ WhiskAction(
+ action.namespace,
+ action.name,
+ action.exec,
+ action.parameters,
+ action.limits,
+ action.version,
+ action.publish,
+ action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+ }
+
+ stream.toString should include(s"serving from cache: ${CacheKey(action)}")
+ stream.toString should not include regex(notExpectedGetLog)
+ stream.reset()
+
+ // delete should invalidate cache
+ Delete(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check
{
+ status should be(OK)
+ val response = responseAs[WhiskAction]
+ response should be(
+ WhiskAction(
+ action.namespace,
+ action.name,
+ action.exec,
+ action.parameters,
+ action.limits,
+ action.version,
+ action.publish,
+ action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+ }
+ stream.toString should include(s"invalidating ${CacheKey(action)}")
+ stream.reset()
+ }
+
+ it should "get an action with attachment that is not cached" in {
+ implicit val tid = transid()
+ val code = "ZHViZWU="
+ val action =
+ WhiskAction(namespace, aname(), javaDefault(code, Some("hello")), annotations = Parameters("exec",
"java"))
+ val content = WhiskActionPut(
+ Some(action.exec),
+ Some(action.parameters),
+ Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
+ val name = action.name
+ val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")", "\\)")
+ val expectedGetLog = Seq(
+ s"finding document: 'id: ${action.namespace}/${action.name}",
+ s"finding attachment 'jarfile' of document 'id: ${action.namespace}/${action.name}").mkString("(?s).*")
+
+ action.exec match {
+ case exec @ CodeExecAsAttachment(_, _, _) =>
+ val newAction = action.copy(exec = exec.attach)
+ newAction.revision(action.rev)
+
+ val doc1 = put(entityStore, newAction, false)
+
+ val stream = new ByteArrayInputStream(Base64.getDecoder().decode(code))
+ val manifest = exec.manifest.attached.get
+ val src = StreamConverters.fromInputStream(() => stream)
+
+ attach(entityStore, doc1, manifest.attachmentName, manifest.attachmentType, src)
+
+ case _ =>
+ }
+
+ // second request should fetch from cache
+ Get(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check
{
+ status should be(OK)
+ val response = responseAs[WhiskAction]
+ response should be(
+ WhiskAction(
+ action.namespace,
+ action.name,
+ action.exec,
+ action.parameters,
+ action.limits,
+ action.version,
+ action.publish,
+ action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+ }
+
+ stream.toString should include regex (expectedGetLog)
+ stream.reset()
+ }
+
+ it should "update an existing action with attachment that is not cached" in {
+ implicit val tid = transid()
+ val code = "ZHViZWU="
+ val action =
+ WhiskAction(namespace, aname(), javaDefault(code, Some("hello")), annotations = Parameters("exec",
"java"))
+ val content = WhiskActionPut(
+ Some(action.exec),
+ Some(action.parameters),
+ Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
+ val name = action.name
+ val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")", "\\)")
+ val expectedPutLog = Seq(
+ s"caching $cacheKey",
+ s"uploading attachment 'jarfile' of document 'id: ${action.namespace}/${action.name}",
+ s"caching $cacheKey").mkString("(?s).*")
+
+ action.exec match {
+ case exec @ CodeExecAsAttachment(_, _, _) =>
+ val newAction = action.copy(exec = exec.attach)
+ newAction.revision(action.rev)
+
+ val doc = put(entityStore, newAction)
+
+ val stream = new ByteArrayInputStream(Base64.getDecoder().decode(code))
+ val manifest = exec.manifest.attached.get
+ val src = StreamConverters.fromInputStream(() => stream)
+
+ attach(entityStore, doc, manifest.attachmentName, manifest.attachmentType, src)
+
+ case _ =>
+ }
+
+ Put(s"$collectionPath/$name?overwrite=true", content) ~> Route.seal(routes(creds)(transid()))
~> check {
+ status should be(OK)
+ val response = responseAs[WhiskAction]
+ response should be(
+ WhiskAction(
+ action.namespace,
+ action.name,
+ action.exec,
+ action.parameters,
+ action.limits,
+ action.version.upPatch,
+ action.publish,
+ action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+ }
+ stream.toString should include regex (expectedPutLog)
+ stream.reset()
+
+ // delete should invalidate cache
+ Delete(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check
{
+ status should be(OK)
+ val response = responseAs[WhiskAction]
+ response should be(
+ WhiskAction(
+ action.namespace,
+ action.name,
+ action.exec,
+ action.parameters,
+ action.limits,
+ action.version.upPatch,
+ action.publish,
+ action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+ }
+ stream.toString should include(s"invalidating ${CacheKey(action)}")
+ stream.reset()
+ }
+
it should "reject put with conflict for pre-existing action" in {
implicit val tid = transid()
val action = WhiskAction(namespace, aname(), jsDefault("??"), Parameters("x", "b"))
diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
index b341bae..fbcd1cf 100644
--- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
+++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
@@ -39,6 +39,10 @@ import whisk.core.entity._
import whisk.core.entity.types.AuthStore
import whisk.core.entity.types.EntityStore
+import akka.http.scaladsl.model.ContentType
+import akka.stream.scaladsl.Source
+import akka.util.ByteString
+
/**
* WARNING: the put/get/del operations in this trait operate directly on the datastore,
* and in the presence of a cache, there will be inconsistencies if one mixes these
@@ -200,6 +204,20 @@ trait DbUtils {
doc
}
+ def attach[A, Au >: A](
+ db: ArtifactStore[Au],
+ doc: DocInfo,
+ name: String,
+ contentType: ContentType,
+ docStream: Source[ByteString, _],
+ garbageCollect: Boolean = true)(implicit transid: TransactionId, timeout: Duration =
10 seconds): DocInfo = {
+ val docFuture = db.attach(doc, name, contentType, docStream)
+ val newDoc = Await.result(docFuture, timeout)
+ assert(newDoc != null)
+ if (garbageCollect) docsToDelete += ((db, newDoc))
+ newDoc
+ }
+
/**
* Gets document by id from datastore, and add it to gc queue to delete after the test
completes.
*/
--
To stop receiving notification emails like this one, please contact
csantanapr@apache.org.
|