openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chet...@apache.org
Subject [incubator-openwhisk] branch master updated: Inlined attachments (#3709)
Date Fri, 08 Jun 2018 05:13:31 GMT
This is an automated email from the ASF dual-hosted git repository.

chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new c5f05d9  Inlined attachments (#3709)
c5f05d9 is described below

commit c5f05d972d0c20bc2ea1f75680862c9cf08c7daa
Author: Chetan Mehrotra <chetanm@apache.org>
AuthorDate: Fri Jun 8 10:43:16 2018 +0530

    Inlined attachments (#3709)
    
    Adds support for inlining attachment content as part of attachment name for smaller attachments.
    
    Inlined attachment contents are stored in Base64 encoded form as part of attachment name
itself.
    
    With this change “ArtifactStore#putAndAttach” would
       1. Read Source upto “max-inline-size” and store the rest in a tail Source
       2. If tail is empty then Base64 encode the bytes read so far and return that as attachment
name
       3. If tail is not empty then create a combined source and store that as attachment
          and follow attachment storage flow
    
    Introduces 2 new config settings
       max-inline-size - Size limit for inlined attachments
       chunk-size - Chunk sized for converting source of bytes to ByteString
---
 common/scala/src/main/resources/application.conf   |   8 ++
 .../scala/whisk/core/database/ArtifactStore.scala  |   4 +-
 .../whisk/core/database/AttachmentInliner.scala    | 121 +++++++++++++++++++
 .../whisk/core/database/CouchDbRestStore.scala     |  50 +++++---
 .../whisk/core/database/CouchDbStoreProvider.scala |   6 +-
 .../whisk/core/database/DocumentFactory.scala      |   4 +-
 .../core/database/memory/MemoryArtifactStore.scala |  90 +++++++-------
 .../main/scala/whisk/core/entity/WhiskAction.scala |   4 +-
 .../core/controller/test/ActionsApiTests.scala     |  86 +++++++++++++-
 .../database/test/AttachmentInlinerTests.scala     |  59 ++++++++++
 .../whisk/core/database/test/AttachmentTests.scala | 131 ---------------------
 .../scala/whisk/core/database/test/DbUtils.scala   |  46 +++++++-
 .../ArtifactStoreAttachmentBehaviors.scala         |  63 +++++++---
 13 files changed, 454 insertions(+), 218 deletions(-)

diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index feaa960..dbc70c5 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -114,6 +114,14 @@ whisk {
         actions-ddoc = "whisks.v2.1.0"
         activations-ddoc = "whisks.v2.1.0"
         activations-filter-ddoc = "whisks-filters.v2.1.0"
+
+        # Size limit for inlined attachments. Attachments having size less than this would
+        # be inlined with there content encoded in attachmentName
+        max-inline-size = 16 k
+
+        # Chunk sized for converting source of bytes to ByteString as part of attachment
+        # upload flow
+        chunk-size = 8 k
     }
 
     # CouchDB related configuration
diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala
index 3750398..4a70e40 100644
--- a/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala
@@ -137,8 +137,8 @@ trait ArtifactStore[DocumentAbstraction] {
   /**
    * Retrieves a saved attachment, streaming it into the provided Sink.
    */
-  protected[core] def readAttachment[T](doc: DocInfo, name: String, sink: Sink[ByteString,
Future[T]])(
-    implicit transid: TransactionId): Future[(ContentType, T)]
+  protected[core] def readAttachment[T](doc: DocInfo, attached: Attached, sink: Sink[ByteString,
Future[T]])(
+    implicit transid: TransactionId): Future[T]
 
   /**
    * Deletes all attachments linked to given document
diff --git a/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala b/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala
new file mode 100644
index 0000000..14eb192
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database
+
+import java.security.MessageDigest
+import java.util.Base64
+
+import akka.NotUsed
+import akka.http.scaladsl.model.Uri
+import akka.stream.Materializer
+import akka.stream.scaladsl.{Concat, Sink, Source}
+import akka.util.{ByteString, ByteStringBuilder}
+import whisk.core.database.AttachmentInliner.MemScheme
+import whisk.core.entity.ByteSize
+
+import scala.collection.immutable
+import scala.concurrent.Future
+
+object AttachmentInliner {
+
+  /**
+   * Scheme name for attachments which are inlined
+   */
+  val MemScheme: String = "mem"
+}
+
+case class InliningConfig(maxInlineSize: ByteSize, chunkSize: ByteSize)
+
+/**
+ * Provides support for inlining small attachments. Inlined attachment contents are encoded
as part of attachment
+ * name itself.
+ */
+trait AttachmentInliner {
+  private val digestAlgo = "SHA-256"
+  private val encodedAlgoName = digestAlgo.toLowerCase.replaceAllLiterally("-", "")
+
+  /** Materializer required for stream processing */
+  protected[core] implicit val materializer: Materializer
+
+  protected[database] def inlineAndTail(
+    docStream: Source[ByteString, _]): Future[(immutable.Seq[Byte], Source[Byte, _])] = {
+    docStream
+      .mapConcat(_.seq)
+      .prefixAndTail(maxInlineSize.toBytes.toInt)
+      .runWith(Sink.head[(immutable.Seq[Byte], Source[Byte, _])])
+  }
+
+  protected[database] def uriOf(bytes: Seq[Byte], path: => String): Uri = {
+    //For less than case its definitive that tail source would be empty
+    //for equal case it cannot be determined if tail source is empty. Actual max inline size
+    //would be inlineSize - 1
+    if (bytes.size < maxInlineSize.toBytes) {
+      Uri.from(scheme = MemScheme, path = encode(bytes))
+    } else {
+      Uri.from(scheme = attachmentScheme, path = path)
+    }
+  }
+
+  /**
+   * Constructs a combined source based on attachment content read so far and rest of unread
content.
+   * Emitted elements are up to `chunkSize` sized [[akka.util.ByteString]] elements.
+   */
+  protected[database] def combinedSource(inlinedBytes: immutable.Seq[Byte],
+                                         tailSource: Source[Byte, _]): Source[ByteString,
NotUsed] =
+    Source
+      .combine(Source(inlinedBytes), tailSource)(Concat[Byte])
+      .batch[ByteStringBuilder](chunkSize.toBytes, b => { val bb = new ByteStringBuilder();
bb += b })((bb, b) =>
+        bb += b)
+      .map(_.result())
+
+  /**
+   * Constructs a source from inlined attachment contents
+   */
+  protected[database] def memorySource(uri: Uri): Source[ByteString, NotUsed] = {
+    require(uri.scheme == MemScheme, s"URI $uri scheme is not $MemScheme")
+    Source.single(ByteString(decode(uri)))
+  }
+
+  protected[database] def isInlined(uri: Uri): Boolean = uri.scheme == MemScheme
+
+  protected[database] def digest(bytes: TraversableOnce[Byte]): String = {
+    val digester = MessageDigest.getInstance(digestAlgo)
+    digester.update(bytes.toArray)
+    val digest = digester.digest().map("%02x".format(_)).mkString
+    s"$encodedAlgoName-$digest"
+  }
+
+  /**
+   * Attachments having size less than this would be inlined
+   */
+  def maxInlineSize: ByteSize = inliningConfig.maxInlineSize
+
+  def chunkSize: ByteSize = inliningConfig.chunkSize
+
+  protected def inliningConfig: InliningConfig
+
+  protected def attachmentScheme: String
+
+  private def encode(bytes: Seq[Byte]): String = {
+    Base64.getUrlEncoder.encodeToString(bytes.toArray)
+  }
+
+  private def decode(uri: Uri): Array[Byte] = {
+    Base64.getUrlDecoder.decode(uri.path.toString())
+  }
+}
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index 8c401bd..d0f13c9 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -30,8 +30,8 @@ import whisk.core.entity.Attachments.Attached
 import whisk.core.entity.{BulkEntityResult, DocInfo, DocumentReader, UUID}
 import whisk.http.Messages
 
-import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
 import scala.util.Try
 
 /**
@@ -51,18 +51,21 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol:
St
                                                                   dbUsername: String,
                                                                   dbPassword: String,
                                                                   dbName: String,
-                                                                  useBatching: Boolean =
false)(
+                                                                  useBatching: Boolean =
false,
+                                                                  val inliningConfig: InliningConfig)(
   implicit system: ActorSystem,
   val logging: Logging,
   jsonFormat: RootJsonFormat[DocumentAbstraction],
-  materializer: ActorMaterializer,
+  val materializer: ActorMaterializer,
   docReader: DocumentReader)
     extends ArtifactStore[DocumentAbstraction]
-    with DefaultJsonProtocol {
+    with DefaultJsonProtocol
+    with AttachmentInliner {
 
   protected[core] implicit val executionContext = system.dispatcher
 
-  private val attachmentScheme = "couch"
+  val attachmentScheme: String = "couch"
+
   private val client: CouchDbRestClient =
     new CouchDbRestClient(dbProtocol, dbHost, dbPort.toInt, dbUsername, dbPassword, dbName)
 
@@ -354,13 +357,21 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol:
St
     docStream: Source[ByteString, _],
     oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)]
= {
 
-    val attachmentUri = Uri.from(scheme = attachmentScheme, path = UUID().asString)
-    val attached = Attached(attachmentUri.toString(), contentType)
-    val updatedDoc = update(d, attached)
-
     for {
-      i1 <- put(updatedDoc)
-      i2 <- attach(i1, attachmentUri.path.toString(), attached.attachmentType, docStream)
+      (bytes, tailSource) <- inlineAndTail(docStream)
+      uri <- Future.successful(uriOf(bytes, UUID().asString))
+      attached <- {
+        val a = if (isInlined(uri)) {
+          Attached(uri.toString, contentType, Some(bytes.size), Some(digest(bytes)))
+        } else {
+          Attached(uri.toString, contentType)
+        }
+        Future.successful(a)
+      }
+      i1 <- put(update(d, attached))
+      i2 <- if (isInlined(uri)) { Future.successful(i1) } else {
+        attach(i1, uri.path.toString, attached.attachmentType, combinedSource(bytes, tailSource))
+      }
     } yield (i2, attached)
   }
 
@@ -408,9 +419,10 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol:
St
           ErrorLevel))
   }
 
-  override protected[core] def readAttachment[T](doc: DocInfo, name: String, sink: Sink[ByteString,
Future[T]])(
-    implicit transid: TransactionId): Future[(ContentType, T)] = {
+  override protected[core] def readAttachment[T](doc: DocInfo, attached: Attached, sink:
Sink[ByteString, Future[T]])(
+    implicit transid: TransactionId): Future[T] = {
 
+    val name = attached.attachmentName
     val start = transid.started(
       this,
       LoggingMarkers.DATABASE_ATT_GET,
@@ -420,12 +432,14 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol:
St
     require(doc.rev.rev != null, "doc revision must be specified")
 
     val attachmentUri = Uri(name)
-    val f = client.getAttachment[T](doc.id.id, doc.rev.rev, attachmentUri.path.toString(),
sink)
-    val g = f.map { e =>
-      e match {
-        case Right((contentType, result)) =>
+    val g = if (isInlined(attachmentUri)) {
+      memorySource(attachmentUri).runWith(sink)
+    } else {
+      val f = client.getAttachment[T](doc.id.id, doc.rev.rev, attachmentUri.path.toString,
sink)
+      f.map {
+        case Right((_, result)) =>
           transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found attachment
'$name' of document '$doc'")
-          (contentType, result)
+          result
 
         case Left(StatusCodes.NotFound) =>
           transid.finished(
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
index c10436b..df6a374 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
@@ -23,6 +23,7 @@ import spray.json.RootJsonFormat
 import whisk.common.Logging
 import whisk.core.ConfigKeys
 import whisk.core.entity.DocumentReader
+import whisk.core.entity.size._
 import pureconfig._
 
 import scala.reflect.ClassTag
@@ -58,6 +59,8 @@ object CouchDbStoreProvider extends ArtifactStoreProvider {
       dbConfig.provider == "Cloudant" || dbConfig.provider == "CouchDB",
       s"Unsupported db.provider: ${dbConfig.provider}")
 
+    val inliningConfig = loadConfigOrThrow[InliningConfig](ConfigKeys.db)
+
     new CouchDbRestStore[D](
       dbConfig.protocol,
       dbConfig.host,
@@ -65,6 +68,7 @@ object CouchDbStoreProvider extends ArtifactStoreProvider {
       dbConfig.username,
       dbConfig.password,
       dbConfig.databaseFor[D],
-      useBatching)
+      useBatching,
+      inliningConfig)
   }
 }
diff --git a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
index 38827b3..f05d39c 100644
--- a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
@@ -224,7 +224,7 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
   def getAttachment[Wsuper >: W](
     db: ArtifactStore[Wsuper],
     doc: W,
-    attachmentName: String,
+    attached: Attached,
     outputStream: OutputStream,
     postProcess: Option[W => W] = None)(implicit transid: TransactionId, mw: Manifest[W]):
Future[W] = {
 
@@ -242,7 +242,7 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
       val key = CacheKey(docInfo)
       val sink = StreamConverters.fromOutputStream(() => outputStream)
 
-      db.readAttachment[IOResult](docInfo, attachmentName, sink).map {
+      db.readAttachment[IOResult](docInfo, attached, sink).map {
         case _ =>
           val cacheDoc = postProcess map { _(doc) } getOrElse doc
 
diff --git a/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
b/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
index e127599..75973ff 100644
--- a/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
@@ -17,20 +17,20 @@
 
 package whisk.core.database.memory
 
-import java.security.MessageDigest
-import java.util.Base64
-
 import akka.actor.ActorSystem
 import akka.http.scaladsl.model.{ContentType, Uri}
 import akka.stream.ActorMaterializer
 import akka.stream.scaladsl.{Keep, Sink, Source}
 import akka.util.{ByteString, ByteStringBuilder}
+import pureconfig.loadConfigOrThrow
 import spray.json.{DefaultJsonProtocol, DeserializationException, JsObject, JsString, RootJsonFormat}
 import whisk.common.{Logging, LoggingMarkers, TransactionId}
+import whisk.core.ConfigKeys
 import whisk.core.database.StoreUtils._
 import whisk.core.database._
 import whisk.core.entity.Attachments.Attached
 import whisk.core.entity._
+import whisk.core.entity.size._
 import whisk.http.Messages
 
 import scala.collection.concurrent.TrieMap
@@ -48,8 +48,8 @@ object MemoryArtifactStoreProvider extends ArtifactStoreProvider {
 
     val classTag = implicitly[ClassTag[D]]
     val (dbName, handler, viewMapper) = handlerAndMapper(classTag)
-
-    new MemoryArtifactStore(dbName, handler, viewMapper)
+    val inliningConfig = loadConfigOrThrow[InliningConfig](ConfigKeys.db)
+    new MemoryArtifactStore(dbName, handler, viewMapper, inliningConfig)
   }
 
   private def handlerAndMapper[D](entityType: ClassTag[D])(
@@ -74,15 +74,17 @@ object MemoryArtifactStoreProvider extends ArtifactStoreProvider {
  */
 class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: String,
                                                                      documentHandler: DocumentHandler,
-                                                                     viewMapper: MemoryViewMapper)(
+                                                                     viewMapper: MemoryViewMapper,
+                                                                     val inliningConfig:
InliningConfig)(
   implicit system: ActorSystem,
   val logging: Logging,
   jsonFormat: RootJsonFormat[DocumentAbstraction],
-  materializer: ActorMaterializer,
+  val materializer: ActorMaterializer,
   docReader: DocumentReader)
     extends ArtifactStore[DocumentAbstraction]
     with DefaultJsonProtocol
-    with DocumentProvider {
+    with DocumentProvider
+    with AttachmentInliner {
 
   override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher
 
@@ -90,7 +92,7 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName:
Str
 
   private val _id = "_id"
   private val _rev = "_rev"
-  private val attachmentScheme = "mem"
+  val attachmentScheme = "mems"
 
   override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId):
Future[DocInfo] = {
     val asJson = d.toDocumentRecord
@@ -244,23 +246,29 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName:
Str
     f.map(_.size)
   }
 
-  override protected[core] def readAttachment[T](doc: DocInfo, name: String, sink: Sink[ByteString,
Future[T]])(
-    implicit transid: TransactionId): Future[(ContentType, T)] = {
+  override protected[core] def readAttachment[T](doc: DocInfo, attached: Attached, sink:
Sink[ByteString, Future[T]])(
+    implicit transid: TransactionId): Future[T] = {
     //TODO Temporary implementation till MemoryAttachmentStore PR is merged
+    val name = attached.attachmentName
     val start = transid.started(
       this,
       LoggingMarkers.DATABASE_ATT_GET,
       s"[ATT_GET] '$dbName' finding attachment '$name' of document '$doc'")
 
-    val storedName = Uri(name).path.toString()
-    artifacts.get(doc.id.id) match {
-      case Some(a: Artifact) if a.attachments.contains(storedName) =>
-        val attachment = a.attachments(storedName)
-        val r = Source.single(attachment.bytes).toMat(sink)(Keep.right).run
-        transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found attachment '$name'
of document '$doc'")
-        r.map(t => (attachment.contentType, t))
-      case None =>
-        Future.failed(NoDocumentException("Not found on 'readAttachment'."))
+    val attachmentUri = Uri(name)
+    if (isInlined(attachmentUri)) {
+      memorySource(attachmentUri).runWith(sink)
+    } else {
+      val storedName = attachmentUri.path.toString()
+      artifacts.get(doc.id.id) match {
+        case Some(a: Artifact) if a.attachments.contains(storedName) =>
+          val attachment = a.attachments(storedName)
+          val r = Source.single(attachment.bytes).toMat(sink)(Keep.right).run
+          transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found attachment
'$name' of document '$doc'")
+          r
+        case None =>
+          Future.failed(NoDocumentException("Not found on 'readAttachment'."))
+      }
     }
   }
 
@@ -275,19 +283,28 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName:
Str
     docStream: Source[ByteString, _],
     oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)]
= {
 
-    val attachmentUri = Uri.from(scheme = attachmentScheme, path = UUID().asString)
-
+    //Inlined attachment with Memory storage is not required. However to validate the constructs
+    //inlined support is implemented
     for {
-      bytes <- toByteString(docStream)
-      attached <- Future.successful(
-        Attached(attachmentUri.toString(), contentType, Some(bytes.size), Some(digest(bytes))))
-      updatedDoc <- Future.successful(update(d, attached))
-      i1 <- put(updatedDoc)
-      i2 <- attach(i1, attachmentUri.path.toString(), attached.attachmentType, bytes)
+      allBytes <- toByteString(docStream)
+      (bytes, tailSource) <- inlineAndTail(Source.single(allBytes))
+      uri <- Future.successful(uriOf(bytes, UUID().asString))
+      attached <- {
+        val a = if (isInlined(uri)) {
+          Attached(uri.toString(), contentType, Some(bytes.size), Some(digest(bytes)))
+        } else {
+          Attached(uri.toString(), contentType, Some(allBytes.size), Some(digest(allBytes)))
+        }
+        Future.successful(a)
+      }
+      i1 <- put(update(d, attached))
+      i2 <- if (isInlined(uri)) { Future.successful(i1) } else {
+        attach(i1, uri.path.toString(), attached.attachmentType, toByteString(combinedSource(bytes,
tailSource)))
+      }
     } yield (i2, attached)
   }
 
-  private def attach(doc: DocInfo, name: String, contentType: ContentType, bytes: ByteString)(
+  private def attach(doc: DocInfo, name: String, contentType: ContentType, bytes: Future[ByteString])(
     implicit transid: TransactionId): Future[DocInfo] = {
 
     val start = transid.started(
@@ -296,11 +313,11 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName:
Str
       s"[ATT_PUT] '$dbName' uploading attachment '$name' of document '$doc'")
 
     //TODO Temporary implementation till MemoryAttachmentStore PR is merged
-    val g =
+    bytes.map { b =>
       artifacts.get(doc.id.id) match {
         case Some(a) =>
           val existing = Artifact(doc, a.doc, a.computed)
-          val updated = existing.attach(name, Attachment(bytes, contentType))
+          val updated = existing.attach(name, Attachment(b, contentType))
           if (artifacts.replace(doc.id.id, existing, updated)) {
             transid
               .finished(this, start, s"[ATT_PUT] '$dbName' completed uploading attachment
'$name' of document '$doc'")
@@ -311,7 +328,7 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName:
Str
         case None =>
           throw DocumentConflictException("conflict on 'put'")
       }
-    Future.successful(g)
+    }
   }
 
   override def shutdown(): Unit = {
@@ -337,16 +354,9 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName:
Str
     reportFailure(f, start, failure => s"[GET] '$dbName' internal error, doc: '$id', failure:
'${failure.getMessage}'")
   }
 
-  private def toByteString(docStream: Source[ByteString, _]) =
+  private def toByteString(docStream: Source[Traversable[Byte], _]) =
     docStream.runFold(new ByteStringBuilder)((builder, b) => builder ++= b).map(_.result().compact)
 
-  private def digest(bytes: ByteString) = {
-    val digestBytes = MessageDigest
-      .getInstance("MD5")
-      .digest(bytes.toArray)
-    s"md5-${Base64.getUrlEncoder.encodeToString(digestBytes)}"
-  }
-
   private def getRevision(asJson: JsObject) = {
     asJson.fields.get(_rev) match {
       case Some(JsString(r)) => r.toInt
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
index a0011ff..ac6b2d0 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
@@ -371,11 +371,11 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
 
     fa.flatMap { action =>
       action.exec match {
-        case exec @ CodeExecAsAttachment(_, Attached(attachmentName, _, _, _), _) =>
+        case exec @ CodeExecAsAttachment(_, attached: Attached, _) =>
           val boas = new ByteArrayOutputStream()
           val b64s = Base64.getEncoder().wrap(boas)
 
-          getAttachment[A](db, action, attachmentName, b64s, Some { a: WhiskAction =>
+          getAttachment[A](db, action, attached, b64s, Some { a: WhiskAction =>
             b64s.close()
             val newAction = a.copy(exec = exec.inline(boas.toByteArray))
             newAction.revision(a.rev)
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
index c8d360e..4842a11 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
@@ -781,7 +781,11 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi
{
 
   it should "put and then get an action with attachment from cache" in {
     val action =
-      WhiskAction(namespace, aname(), javaDefault("ZHViZWU=", Some("hello")), annotations
= Parameters("exec", "java"))
+      WhiskAction(
+        namespace,
+        aname(),
+        javaDefault(nonInlinedCode(entityStore), Some("hello")),
+        annotations = Parameters("exec", "java"))
     val content = WhiskActionPut(
       Some(action.exec),
       Some(action.parameters),
@@ -854,9 +858,85 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi
{
     stream.reset()
   }
 
+  it should "put and then get an action with inlined attachment" in {
+    val action =
+      WhiskAction(
+        namespace,
+        aname(),
+        javaDefault(encodedRandomBytes(inlinedAttachmentSize(entityStore)), Some("hello")),
+        annotations = Parameters("exec", "java"))
+    val content = WhiskActionPut(
+      Some(action.exec),
+      Some(action.parameters),
+      Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
+    val name = action.name
+    val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")", "\\)")
+    val notExpectedGetLog = Seq(
+      s"finding document: 'id: ${action.namespace}/${action.name}",
+      s"finding attachment '[\\w-/:]+' of document 'id: ${action.namespace}/${action.name}").mkString("(?s).*")
+
+    // first request invalidates any previous entries and caches new result
+    Put(s"$collectionPath/$name", content) ~> Route.seal(routes(creds)(transid())) ~>
check {
+      status should be(OK)
+      val response = responseAs[WhiskAction]
+      response should be(
+        WhiskAction(
+          action.namespace,
+          action.name,
+          action.exec,
+          action.parameters,
+          action.limits,
+          action.version,
+          action.publish,
+          action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+    }
+
+    stream.toString should not include (s"invalidating ${CacheKey(action)} on delete")
+    stream.toString should not include ("uploading attachment")
+    stream.reset()
+
+    // second request should fetch from cache
+    Get(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check
{
+      status should be(OK)
+      val response = responseAs[WhiskAction]
+      response should be(
+        WhiskAction(
+          action.namespace,
+          action.name,
+          action.exec,
+          action.parameters,
+          action.limits,
+          action.version,
+          action.publish,
+          action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+    }
+
+    stream.toString should include(s"serving from cache: ${CacheKey(action)}")
+    stream.toString should not include regex(notExpectedGetLog)
+    stream.reset()
+
+    // delete should invalidate cache
+    Delete(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check
{
+      status should be(OK)
+      val response = responseAs[WhiskAction]
+      response should be(
+        WhiskAction(
+          action.namespace,
+          action.name,
+          action.exec,
+          action.parameters,
+          action.limits,
+          action.version,
+          action.publish,
+          action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+    }
+    stream.toString should include(s"invalidating ${CacheKey(action)}")
+    stream.reset()
+  }
+
   it should "get an action with attachment that is not cached" in {
     implicit val tid = transid()
-    val code = "ZHViZWU="
+    val code = nonInlinedCode(entityStore)
     val action =
       WhiskAction(namespace, aname(), javaDefault(code, Some("hello")), annotations = Parameters("exec",
"java"))
     val content = WhiskActionPut(
@@ -907,7 +987,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi
{
 
   it should "update an existing action with attachment that is not cached" in {
     implicit val tid = transid()
-    val code = "ZHViZWU="
+    val code = nonInlinedCode(entityStore)
     val action =
       WhiskAction(namespace, aname(), javaDefault(code, Some("hello")), annotations = Parameters("exec",
"java"))
     val content = WhiskActionPut(
diff --git a/tests/src/test/scala/whisk/core/database/test/AttachmentInlinerTests.scala b/tests/src/test/scala/whisk/core/database/test/AttachmentInlinerTests.scala
new file mode 100644
index 0000000..783d57e
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/test/AttachmentInlinerTests.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.test
+
+import akka.http.scaladsl.model.Uri
+import akka.stream.scaladsl.Source
+import akka.stream.{ActorMaterializer, Materializer}
+import akka.util.{ByteStringBuilder, CompactByteString}
+import common.WskActorSystem
+import org.junit.runner.RunWith
+import whisk.core.entity.size._
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import whisk.core.database.{AttachmentInliner, InliningConfig}
+
+@RunWith(classOf[JUnitRunner])
+class AttachmentInlinerTests extends FlatSpec with Matchers with ScalaFutures with WskActorSystem
{
+
+  behavior of "Attachment inlining"
+
+  implicit val materializer: Materializer = ActorMaterializer()
+
+  it should "not inline if maxInlineSize set to zero" in {
+    val inliner = new TestInliner(InliningConfig(maxInlineSize = 0.KB, chunkSize = 8.KB))
+    val bs = CompactByteString("hello world")
+
+    val (head, tail) = inliner.inlineAndTail(Source.single(bs)).futureValue
+    val uri = inliner.uriOf(head, "foo")
+
+    uri shouldBe Uri("test:foo")
+
+    val bsResult = toByteString(inliner.combinedSource(head, tail)).futureValue
+    bsResult shouldBe bs
+  }
+
+  private def toByteString(docStream: Source[Traversable[Byte], _]) =
+    docStream.runFold(new ByteStringBuilder)((builder, b) => builder ++= b).map(_.result().compact)
+
+  class TestInliner(val inliningConfig: InliningConfig) extends AttachmentInliner {
+    override protected[core] implicit val materializer: Materializer = ActorMaterializer()
+    override protected def attachmentScheme: String = "test"
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/database/test/AttachmentTests.scala b/tests/src/test/scala/whisk/core/database/test/AttachmentTests.scala
deleted file mode 100644
index bde3ee5..0000000
--- a/tests/src/test/scala/whisk/core/database/test/AttachmentTests.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.database.test
-
-import java.util.Base64
-
-import akka.http.scaladsl.model.Uri
-import akka.stream.ActorMaterializer
-import common.{StreamLogging, WskActorSystem}
-import org.junit.runner.RunWith
-import org.scalatest.concurrent.ScalaFutures
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec}
-import whisk.common.TransactionId
-import whisk.core.controller.test.WhiskAuthHelpers
-import whisk.core.database.CacheChangeNotification
-import whisk.core.entity.Attachments.{Attached, Attachment, Inline}
-import whisk.core.entity._
-import whisk.core.entity.test.ExecHelpers
-
-import scala.util.Random
-
-@RunWith(classOf[JUnitRunner])
-class AttachmentTests
-    extends FlatSpec
-    with BeforeAndAfterEach
-    with BeforeAndAfterAll
-    with WskActorSystem
-    with DbUtils
-    with ExecHelpers
-    with ScalaFutures
-    with StreamLogging {
-
-  implicit val materializer = ActorMaterializer()
-  private val namespace = EntityPath(WhiskAuthHelpers.newIdentity().subject.asString)
-  private val datastore = WhiskEntityStore.datastore()
-  private val attachmentHandler = Some(WhiskAction.attachmentHandler _)
-
-  implicit val cacheUpdateNotifier: Option[CacheChangeNotification] = None
-  implicit override val patienceConfig: PatienceConfig = PatienceConfig(timeout = dbOpTimeout)
-
-  override def afterEach() = {
-    cleanup()
-  }
-
-  override def afterAll() = {
-    datastore.shutdown()
-    super.afterAll()
-  }
-
-  behavior of "Datastore"
-
-  it should "generate different attachment name on update" in {
-    implicit val tid: TransactionId = transid()
-    val exec = javaDefault("ZHViZWU=", Some("hello"))
-    val javaAction =
-      WhiskAction(namespace, EntityName("attachment_unique"), exec)
-
-    val i1 = WhiskAction.put(datastore, javaAction, old = None).futureValue
-    val action2 = datastore.get[WhiskAction](i1, attachmentHandler).futureValue
-
-    //Change attachment to inline one otherwise WhiskAction would not go for putAndAttach
-    val action2Updated = action2.copy(exec = exec).revision[WhiskAction](i1.rev)
-    val i2 = WhiskAction.put(datastore, action2Updated, old = Some(action2)).futureValue
-    val action3 = datastore.get[WhiskAction](i2, attachmentHandler).futureValue
-
-    docsToDelete += ((datastore, i2))
-
-    attached(action2).attachmentName should not be attached(action3).attachmentName
-
-    //Check that attachment name is actually a uri
-    val attachmentUri = Uri(attached(action2).attachmentName)
-    attachmentUri.isAbsolute shouldBe true
-  }
-
-  it should "put and read same attachment" in {
-    implicit val tid: TransactionId = transid()
-    val size = 4000
-    val bytes = randomBytes(size)
-    val base64 = Base64.getEncoder.encodeToString(bytes)
-
-    val exec = javaDefault(base64, Some("hello"))
-    val javaAction =
-      WhiskAction(namespace, EntityName("attachment_unique"), exec)
-
-    val i1 = WhiskAction.put(datastore, javaAction, old = None).futureValue
-    val action2 = datastore.get[WhiskAction](i1, attachmentHandler).futureValue
-    val action3 = WhiskAction.get(datastore, i1.id, i1.rev).futureValue
-
-    docsToDelete += ((datastore, i1))
-
-    attached(action2).attachmentType shouldBe ExecManifest.runtimesManifest
-      .resolveDefaultRuntime(JAVA_DEFAULT)
-      .get
-      .attached
-      .get
-      .attachmentType
-    attached(action2).length shouldBe Some(size)
-    attached(action2).digest should not be empty
-
-    action3.exec shouldBe exec
-    inlined(action3).value shouldBe base64
-  }
-
-  private def attached(a: WhiskAction): Attached =
-    a.exec.asInstanceOf[CodeExec[Attachment[Nothing]]].code.asInstanceOf[Attached]
-
-  private def inlined(a: WhiskAction): Inline[String] =
-    a.exec.asInstanceOf[CodeExec[Attachment[String]]].code.asInstanceOf[Inline[String]]
-
-  private def randomBytes(size: Int): Array[Byte] = {
-    val arr = new Array[Byte](size)
-    Random.nextBytes(arr)
-    arr
-  }
-}
diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
index 9e2f875..39ddad3 100644
--- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
+++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
@@ -17,6 +17,7 @@
 
 package whisk.core.database.test
 
+import java.util.Base64
 import java.util.concurrent.TimeoutException
 import java.util.concurrent.atomic.AtomicInteger
 
@@ -27,9 +28,7 @@ import scala.concurrent.Future
 import scala.concurrent.duration.Duration
 import scala.concurrent.duration.DurationInt
 import scala.language.postfixOps
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Try
+import scala.util.{Failure, Random, Success, Try}
 import spray.json._
 import spray.json.DefaultJsonProtocol._
 import whisk.common.TransactionId
@@ -284,6 +283,47 @@ trait DbUtils {
     docsToDelete.clear()
   }
 
+  /**
+   * Generates a Base64 string for code which would not be inlined by the ArtifactStore
+   */
+  def nonInlinedCode(db: ArtifactStore[_]): String = {
+    encodedRandomBytes(nonInlinedAttachmentSize(db))
+  }
+
+  /**
+   * Size in bytes for attachments which would always be inlined.
+   */
+  def inlinedAttachmentSize(db: ArtifactStore[_]): Int = {
+    db match {
+      case inliner: AttachmentInliner =>
+        inliner.maxInlineSize.toBytes.toInt - 1
+      case _ =>
+        throw new IllegalStateException(s"ArtifactStore does not support attachment inlining
$db")
+    }
+  }
+
+  /**
+   * Size in bytes for attachments which would never be inlined.
+   */
+  def nonInlinedAttachmentSize(db: ArtifactStore[_]): Int = {
+    db match {
+      case inliner: AttachmentInliner =>
+        val inlineSize = inliner.maxInlineSize.toBytes.toInt
+        val chunkSize = inliner.chunkSize.toBytes.toInt
+        Math.max(inlineSize, chunkSize) * 2
+      case _ =>
+        42
+    }
+  }
+
+  protected def encodedRandomBytes(size: Int): String = Base64.getEncoder.encodeToString(randomBytes(size))
+
   def isMemoryStore(store: ArtifactStore[_]): Boolean = store.isInstanceOf[MemoryArtifactStore[_]]
   def isCouchStore(store: ArtifactStore[_]): Boolean = store.isInstanceOf[CouchDbRestStore[_]]
+
+  private def randomBytes(size: Int): Array[Byte] = {
+    val arr = new Array[Byte](size)
+    Random.nextBytes(arr)
+    arr
+  }
 }
diff --git a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
index 62ae000..18083c3 100644
--- a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
+++ b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
@@ -17,16 +17,16 @@
 
 package whisk.core.database.test.behavior
 
-import java.util.Base64
+import java.io.ByteArrayOutputStream
 
-import akka.http.scaladsl.model.Uri
+import akka.http.scaladsl.model.{ContentTypes, Uri}
+import akka.stream.IOResult
+import akka.stream.scaladsl.StreamConverters
 import whisk.common.TransactionId
-import whisk.core.database.CacheChangeNotification
+import whisk.core.database.{AttachmentInliner, CacheChangeNotification, NoDocumentException}
 import whisk.core.entity.Attachments.{Attached, Attachment, Inline}
 import whisk.core.entity.test.ExecHelpers
-import whisk.core.entity.{CodeExec, EntityName, ExecManifest, WhiskAction}
-
-import scala.util.Random
+import whisk.core.entity.{CodeExec, DocInfo, EntityName, ExecManifest, WhiskAction}
 
 trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase with ExecHelpers
{
   behavior of "Attachments"
@@ -37,7 +37,7 @@ trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase
with Ex
 
   it should "generate different attachment name on update" in {
     implicit val tid: TransactionId = transid()
-    val exec = javaDefault("ZHViZWU=", Some("hello"))
+    val exec = javaDefault(nonInlinedCode(entityStore), Some("hello"))
     val javaAction =
       WhiskAction(namespace, EntityName("attachment_unique"), exec)
 
@@ -60,9 +60,8 @@ trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase
with Ex
 
   it should "put and read same attachment" in {
     implicit val tid: TransactionId = transid()
-    val size = 4000
-    val bytes = randomBytes(size)
-    val base64 = Base64.getEncoder.encodeToString(bytes)
+    val size = nonInlinedAttachmentSize(entityStore)
+    val base64 = encodedRandomBytes(size)
 
     val exec = javaDefault(base64, Some("hello"))
     val javaAction =
@@ -87,15 +86,47 @@ trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase
with Ex
     inlined(action3).value shouldBe base64
   }
 
+  it should "inline small attachments" in {
+    implicit val tid: TransactionId = transid()
+    val attachmentSize = inlinedAttachmentSize(entityStore) - 1
+    val base64 = encodedRandomBytes(attachmentSize)
+
+    val exec = javaDefault(base64, Some("hello"))
+    val javaAction = WhiskAction(namespace, EntityName("attachment_inline"), exec)
+
+    val i1 = WhiskAction.put(entityStore, javaAction, old = None).futureValue
+    val action2 = entityStore.get[WhiskAction](i1, attachmentHandler).futureValue
+    val action3 = WhiskAction.get(entityStore, i1.id, i1.rev).futureValue
+
+    docsToDelete += ((entityStore, i1))
+
+    action3.exec shouldBe exec
+    inlined(action3).value shouldBe base64
+
+    val a = attached(action2)
+
+    val attachmentUri = Uri(a.attachmentName)
+    attachmentUri.scheme shouldBe AttachmentInliner.MemScheme
+    a.length shouldBe Some(attachmentSize)
+    a.digest should not be empty
+  }
+
+  it should "throw NoDocumentException for non existing attachment" in {
+    implicit val tid: TransactionId = transid()
+
+    val sink = StreamConverters.fromOutputStream(() => new ByteArrayOutputStream())
+    entityStore
+      .readAttachment[IOResult](
+        DocInfo ! ("non-existing-doc", "42"),
+        Attached("foo", ContentTypes.`application/octet-stream`),
+        sink)
+      .failed
+      .futureValue shouldBe a[NoDocumentException]
+  }
+
   private def attached(a: WhiskAction): Attached =
     a.exec.asInstanceOf[CodeExec[Attachment[Nothing]]].code.asInstanceOf[Attached]
 
   private def inlined(a: WhiskAction): Inline[String] =
     a.exec.asInstanceOf[CodeExec[Attachment[String]]].code.asInstanceOf[Inline[String]]
-
-  private def randomBytes(size: Int): Array[Byte] = {
-    val arr = new Array[Byte](size)
-    Random.nextBytes(arr)
-    arr
-  }
 }

-- 
To stop receiving notification emails like this one, please contact
chetanm@apache.org.

Mime
View raw message