openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rab...@apache.org
Subject [incubator-openwhisk] branch master updated: ArtifactStore implementation for CosmosDB (#3562)
Date Thu, 12 Jul 2018 03:27:15 GMT
This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new f97079a  ArtifactStore implementation for CosmosDB (#3562)
f97079a is described below

commit f97079a5d935db1ecd6289d3ff26c555c325323f
Author: Chetan Mehrotra <chetanm@apache.org>
AuthorDate: Thu Jul 12 08:57:12 2018 +0530

    ArtifactStore implementation for CosmosDB (#3562)
    
    This commit provides a CosmosDB based implementation for ArtifactStore SPI. Given the complexity involved in performing various operations against CosmosDB this commit uses the Java SDK to simplify and speed up the implementation - because compared to CouchDB, performing queries with CosmosDB requires client side computation, which involves sending queries to each partition, then collecting and merging the result set. The Async Java SDK takes care of all these interactions and provide [...]
---
 common/scala/build.gradle                          |   3 +
 common/scala/src/main/resources/application.conf   |   8 +
 .../src/main/scala/whisk/core/WhiskConfig.scala    |   1 +
 .../database/cosmosdb/CosmosDBArtifactStore.scala  | 495 +++++++++++++++++++++
 .../cosmosdb/CosmosDBArtifactStoreProvider.scala   | 114 +++++
 .../core/database/cosmosdb/CosmosDBSupport.scala   |  88 ++++
 .../core/database/cosmosdb/CosmosDBUtil.scala      | 108 +++++
 .../database/cosmosdb/CosmosDBViewMapper.scala     | 319 +++++++++++++
 .../core/database/cosmosdb/IndexingPolicy.scala    | 123 +++++
 .../core/database/cosmosdb/ReferenceCounted.scala  |  50 +++
 .../database/cosmosdb/RxObservableImplicits.scala  |  55 +++
 tests/src/test/resources/application.conf.j2       |   6 +
 tests/src/test/scala/common/TestUtils.java         |  18 +-
 .../cosmosdb/CosmosDBArtifactStoreTests.scala      |  29 ++
 .../cosmosdb/CosmosDBAttachmentStoreTests.scala    |  36 ++
 .../cosmosdb/CosmosDBStoreBehaviorBase.scala       |  65 +++
 .../database/cosmosdb/CosmosDBSupportTests.scala   |  69 +++
 .../database/cosmosdb/CosmosDBTestSupport.scala    |  60 +++
 .../core/database/cosmosdb/CosmosDBUtilTest.scala  |  83 ++++
 .../database/cosmosdb/IndexingPolicyTests.scala    |  83 ++++
 .../database/cosmosdb/ReferenceCountedTests.scala  |  91 ++++
 .../ArtifactStoreAttachmentBehaviors.scala         |  11 +-
 .../test/behavior/ArtifactStoreBehaviorBase.scala  |  22 +-
 .../test/behavior/ArtifactStoreTestUtil.scala      |  36 ++
 tools/db/cosmosDbUtil.py                           | 194 ++++++++
 tools/travis/setup.sh                              |   3 +
 26 files changed, 2162 insertions(+), 8 deletions(-)

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 0983fd8..9669b58 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -72,6 +72,9 @@ dependencies {
     compile 'io.zipkin.reporter2:zipkin-sender-okhttp3:2.6.1'
     compile 'io.zipkin.reporter2:zipkin-reporter:2.6.1'
 
+    compile 'io.reactivex:rxscala_2.11:0.26.5'
+    compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
+    compile 'com.microsoft.azure:azure-cosmosdb:2.0.0'
     scoverage gradle.scoverage.deps
 }
 
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index bbac5f4..86e5213 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -136,6 +136,14 @@ whisk {
     #     }
     #}
 
+    # CosmosDB related configuration
+    # For example:
+    # cosmosdb {
+    #     endpoint =               # Endpoint URL like https://<account>.documents.azure.com:443/
+    #     key      =               # Access key
+    #     db       =               # Database name
+    #}
+
     # transaction ID related configuration
     transactions {
         header = "X-Request-ID"
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index be9f7b0..fd6eeec 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -200,6 +200,7 @@ object ConfigKeys {
   val buildInformation = "whisk.info"
 
   val couchdb = "whisk.couchdb"
+  val cosmosdb = "whisk.cosmosdb"
   val kafka = "whisk.kafka"
   val kafkaCommon = s"$kafka.common"
   val kafkaProducer = s"$kafka.producer"
diff --git a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
new file mode 100644
index 0000000..fac08f3
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.cosmosdb
+
+import java.io.ByteArrayInputStream
+
+import _root_.rx.RxReactiveStreams
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.{ContentType, StatusCodes, Uri}
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{Sink, Source, StreamConverters}
+import akka.util.{ByteString, ByteStringBuilder}
+import com.microsoft.azure.cosmosdb._
+import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
+import spray.json.{DefaultJsonProtocol, JsObject, JsString, JsValue, RootJsonFormat, _}
+import whisk.common.{Logging, LoggingMarkers, TransactionId}
+import whisk.core.database.StoreUtils.{checkDocHasRevision, deserialize, reportFailure}
+import whisk.core.database._
+import whisk.core.database.cosmosdb.CosmosDBArtifactStoreProvider.DocumentClientRef
+import whisk.core.database.cosmosdb.CosmosDBConstants._
+import whisk.core.entity.Attachments.Attached
+import whisk.core.entity._
+import whisk.http.Messages
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected val collName: String,
+                                                                       protected val config: CosmosDBConfig,
+                                                                       clientRef: DocumentClientRef,
+                                                                       documentHandler: DocumentHandler,
+                                                                       protected val viewMapper: CosmosDBViewMapper,
+                                                                       val inliningConfig: InliningConfig,
+                                                                       val attachmentStore: Option[AttachmentStore])(
+  implicit system: ActorSystem,
+  val logging: Logging,
+  jsonFormat: RootJsonFormat[DocumentAbstraction],
+  val materializer: ActorMaterializer,
+  docReader: DocumentReader)
+    extends ArtifactStore[DocumentAbstraction]
+    with DefaultJsonProtocol
+    with DocumentProvider
+    with CosmosDBSupport
+    with AttachmentSupport[DocumentAbstraction] {
+
+  private val cosmosScheme = "cosmos"
+  val attachmentScheme: String = attachmentStore.map(_.scheme).getOrElse(cosmosScheme)
+
+  protected val client: AsyncDocumentClient = clientRef.get.client
+  private val (database, collection) = initialize()
+
+  private val _id = "_id"
+  private val _rev = "_rev"
+
+  override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher
+
+  override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = {
+    val asJson = d.toDocumentRecord
+
+    val doc = toCosmosDoc(asJson)
+    val id = doc.getId
+    val docinfoStr = s"id: $id, rev: ${doc.getETag}"
+    val start = transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] '$collName' saving document: '$docinfoStr'")
+
+    val o = if (doc.getETag == null) {
+      client.createDocument(collection.getSelfLink, doc, newRequestOption(id), true)
+    } else {
+      client.replaceDocument(doc, matchRevOption(id, doc.getETag))
+    }
+
+    val f = o
+      .head()
+      .transform(
+        { r =>
+          transid.finished(this, start, s"[PUT] '$collName' completed document: '$docinfoStr'")
+          toDocInfo(r.getResource)
+        }, {
+          case e: DocumentClientException if isConflict(e) =>
+            transid.finished(this, start, s"[PUT] '$collName', document: '$docinfoStr'; conflict.")
+            DocumentConflictException("conflict on 'put'")
+          case e => e
+        })
+
+    reportFailure(f, start, failure => s"[PUT] '$collName' internal error, failure: '${failure.getMessage}'")
+  }
+
+  override protected[database] def del(doc: DocInfo)(implicit transid: TransactionId): Future[Boolean] = {
+    checkDocHasRevision(doc)
+    val start = transid.started(this, LoggingMarkers.DATABASE_DELETE, s"[DEL] '$collName' deleting document: '$doc'")
+    val f = client
+      .deleteDocument(selfLinkOf(doc.id), matchRevOption(doc))
+      .head()
+      .transform(
+        { _ =>
+          transid.finished(this, start, s"[DEL] '$collName' completed document: '$doc'")
+          true
+        }, {
+          case e: DocumentClientException if isNotFound(e) =>
+            transid.finished(this, start, s"[DEL] '$collName', document: '$doc'; not found.")
+            NoDocumentException("not found on 'delete'")
+          case e: DocumentClientException if isConflict(e) =>
+            transid.finished(this, start, s"[DEL] '$collName', document: '$doc'; conflict.")
+            DocumentConflictException("conflict on 'delete'")
+          case e => e
+        })
+
+    reportFailure(
+      f,
+      start,
+      failure => s"[DEL] '$collName' internal error, doc: '$doc', failure: '${failure.getMessage}'")
+  }
+
+  override protected[database] def get[A <: DocumentAbstraction](doc: DocInfo,
+                                                                 attachmentHandler: Option[(A, Attached) => A] = None)(
+    implicit transid: TransactionId,
+    ma: Manifest[A]): Future[A] = {
+    val start = transid.started(this, LoggingMarkers.DATABASE_GET, s"[GET] '$collName' finding document: '$doc'")
+
+    require(doc != null, "doc undefined")
+    val f = client
+      .readDocument(selfLinkOf(doc.id), newRequestOption(doc.id))
+      .head()
+      .transform(
+        { rr =>
+          val js = getResultToWhiskJsonDoc(rr.getResource)
+          transid.finished(this, start, s"[GET] '$collName' completed: found document '$doc'")
+          deserialize[A, DocumentAbstraction](doc, js)
+        }, {
+          case e: DocumentClientException if isNotFound(e) =>
+            transid.finished(this, start, s"[GET] '$collName', document: '$doc'; not found.")
+            // for compatibility
+            throw NoDocumentException("not found on 'get'")
+          case e => e
+        })
+      .recoverWith {
+        case _: DeserializationException => throw DocumentUnreadable(Messages.corruptedEntity)
+      }
+
+    reportFailure(
+      f,
+      start,
+      failure => s"[GET] '$collName' internal error, doc: '$doc', failure: '${failure.getMessage}'")
+
+  }
+
+  override protected[database] def get(id: DocId)(implicit transid: TransactionId): Future[Option[JsObject]] = {
+    val start = transid.started(this, LoggingMarkers.DATABASE_GET, s"[GET_BY_ID] '$collName' finding document: '$id'")
+
+    val f = client
+      .readDocument(selfLinkOf(id), newRequestOption(id))
+      .head()
+      .map { rr =>
+        val js = getResultToWhiskJsonDoc(rr.getResource)
+        transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: found document '$id'")
+        Some(js)
+      }
+      .recoverWith {
+        case e: DocumentClientException if isNotFound(e) => Future.successful(None)
+      }
+
+    reportFailure(
+      f,
+      start,
+      failure => s"[GET_BY_ID] '$collName' internal error, doc: '$id', failure: '${failure.getMessage}'")
+  }
+
+  override protected[core] def query(table: String,
+                                     startKey: List[Any],
+                                     endKey: List[Any],
+                                     skip: Int,
+                                     limit: Int,
+                                     includeDocs: Boolean,
+                                     descending: Boolean,
+                                     reduce: Boolean,
+                                     stale: StaleParameter)(implicit transid: TransactionId): Future[List[JsObject]] = {
+    require(!(reduce && includeDocs), "reduce and includeDocs cannot both be true")
+    require(!reduce, "Reduce scenario not supported") //TODO Investigate reduce
+    require(skip >= 0, "skip should be non negative")
+    require(limit >= 0, "limit should be non negative")
+    documentHandler.checkIfTableSupported(table)
+
+    val Array(ddoc, viewName) = table.split("/")
+
+    val start = transid.started(this, LoggingMarkers.DATABASE_QUERY, s"[QUERY] '$collName' searching '$table'")
+    val realIncludeDocs = includeDocs | documentHandler.shouldAlwaysIncludeDocs(ddoc, viewName)
+    val realLimit = if (limit > 0) skip + limit else limit
+
+    val querySpec = viewMapper.prepareQuery(ddoc, viewName, startKey, endKey, realLimit, realIncludeDocs, descending)
+
+    val publisher =
+      RxReactiveStreams.toPublisher(client.queryDocuments(collection.getSelfLink, querySpec, newFeedOptions()))
+    val f = Source
+      .fromPublisher(publisher)
+      .mapConcat(asSeq)
+      .drop(skip)
+      .map(queryResultToWhiskJsonDoc)
+      .map(js =>
+        documentHandler
+          .transformViewResult(ddoc, viewName, startKey, endKey, realIncludeDocs, js, CosmosDBArtifactStore.this))
+      .mapAsync(1)(identity)
+      .mapConcat(identity)
+      .runWith(Sink.seq)
+      .map(_.toList)
+
+    f.onSuccess({
+      case out => transid.finished(this, start, s"[QUERY] '$collName' completed: matched ${out.size}")
+    })
+    reportFailure(f, start, failure => s"[QUERY] '$collName' internal error, failure: '${failure.getMessage}'")
+  }
+
+  override protected[core] def count(table: String,
+                                     startKey: List[Any],
+                                     endKey: List[Any],
+                                     skip: Int,
+                                     stale: StaleParameter)(implicit transid: TransactionId): Future[Long] = {
+    require(skip >= 0, "skip should be non negative")
+    val Array(ddoc, viewName) = table.split("/")
+
+    val start = transid.started(this, LoggingMarkers.DATABASE_QUERY, s"[COUNT] '$collName' searching '$table")
+    val querySpec = viewMapper.prepareCountQuery(ddoc, viewName, startKey, endKey)
+
+    //For aggregates the value is in _aggregates fields
+    val f = client
+      .queryDocuments(collection.getSelfLink, querySpec, newFeedOptions())
+      .head()
+      .map { r =>
+        val count = r.getResults.asScala.head.getLong(aggregate).longValue()
+        transid.finished(this, start, s"[COUNT] '$collName' completed: count $count")
+        if (count > skip) count - skip else 0L
+      }
+
+    reportFailure(f, start, failure => s"[COUNT] '$collName' internal error, failure: '${failure.getMessage}'")
+  }
+
+  override protected[database] def putAndAttach[A <: DocumentAbstraction](
+    doc: A,
+    update: (A, Attached) => A,
+    contentType: ContentType,
+    docStream: Source[ByteString, _],
+    oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {
+
+    val asJson = doc.toDocumentRecord
+    val id = asJson.fields("_id").convertTo[String].trim
+
+    attachmentStore match {
+      case Some(as) =>
+        attachToExternalStore(doc, update, contentType, docStream, oldAttachment, as)
+      case None =>
+        attachToCosmos(id, doc, update, contentType, docStream, oldAttachment)
+    }
+  }
+
+  private def attachToCosmos[A <: DocumentAbstraction](
+    id: String,
+    doc: A,
+    update: (A, Attached) => A,
+    contentType: ContentType,
+    docStream: Source[ByteString, _],
+    oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {
+    //Convert Source to ByteString as Cosmos API works with InputStream only
+    for {
+      allBytes <- toByteString(docStream)
+      bytesOrSource <- inlineOrAttach(Source.single(allBytes))
+      uri = uriOf(bytesOrSource, UUID().asString)
+      attached <- {
+        val a = bytesOrSource match {
+          case Left(bytes) => Attached(uri.toString, contentType, Some(bytes.size), Some(digest(bytes)))
+          case Right(_)    => Attached(uri.toString, contentType, Some(allBytes.size), Some(digest(allBytes)))
+        }
+        Future.successful(a)
+      }
+      i1 <- put(update(doc, attached))
+      i2 <- bytesOrSource match {
+        case Left(_)  => Future.successful(i1)
+        case Right(s) => attach(i1, uri.path.toString, attached.attachmentType, allBytes)
+      }
+      //Remove old attachment if it was part of attachmentStore
+      _ <- oldAttachment
+        .map { old =>
+          val oldUri = Uri(old.attachmentName)
+          if (oldUri.scheme == cosmosScheme) {
+            val name = oldUri.path.toString
+            val docId = DocId(id)
+            client.deleteAttachment(s"${selfLinkOf(docId)}/attachments/$name", newRequestOption(docId)).head()
+          } else {
+            Future.successful(true)
+          }
+        }
+        .getOrElse(Future.successful(true))
+    } yield (i2, attached)
+  }
+
+  private def attach(doc: DocInfo, name: String, contentType: ContentType, allBytes: ByteString)(
+    implicit transid: TransactionId): Future[DocInfo] = {
+    val start = transid.started(
+      this,
+      LoggingMarkers.DATABASE_ATT_SAVE,
+      s"[ATT_PUT] '$collName' uploading attachment '$name' of document '$doc'")
+
+    checkDocHasRevision(doc)
+    val options = new MediaOptions
+    options.setContentType(contentType.toString())
+    options.setSlug(name)
+    val s = new ByteArrayInputStream(allBytes.toArray)
+    val f = client
+      .upsertAttachment(selfLinkOf(doc.id), s, options, matchRevOption(doc))
+      .head()
+      .transform(
+        { _ =>
+          transid
+            .finished(this, start, s"[ATT_PUT] '$collName' completed uploading attachment '$name' of document '$doc'")
+          doc //Adding attachment does not change the revision of document. So retain the doc info
+        }, {
+          case e: DocumentClientException if isConflict(e) =>
+            transid
+              .finished(this, start, s"[ATT_PUT] '$collName' uploading attachment '$name' of document '$doc'; conflict")
+            DocumentConflictException("conflict on 'attachment put'")
+          case e => e
+        })
+
+    reportFailure(
+      f,
+      start,
+      failure => s"[ATT_PUT] '$collName' internal error, name: '$name', doc: '$doc', failure: '${failure.getMessage}'")
+  }
+
+  private def toByteString(docStream: Source[ByteString, _]) =
+    docStream.runFold(new ByteStringBuilder)((builder, b) => builder ++= b).map(_.result().compact)
+
+  override protected[core] def readAttachment[T](doc: DocInfo, attached: Attached, sink: Sink[ByteString, Future[T]])(
+    implicit transid: TransactionId): Future[T] = {
+    val name = attached.attachmentName
+    val attachmentUri = Uri(name)
+    attachmentUri.scheme match {
+      case AttachmentSupport.MemScheme =>
+        memorySource(attachmentUri).runWith(sink)
+      case s if s == cosmosScheme || attachmentUri.isRelative =>
+        //relative case is for compatibility with earlier naming approach where attachment name would be like 'jarfile'
+        //Compared to current approach of '<scheme>:<name>'
+        readAttachmentFromCosmos(doc, attachmentUri, sink)
+      case s if attachmentStore.isDefined && attachmentStore.get.scheme == s =>
+        attachmentStore.get.readAttachment(doc.id, attachmentUri.path.toString, sink)
+      case _ =>
+        throw new IllegalArgumentException(s"Unknown attachment scheme in attachment uri $attachmentUri")
+    }
+  }
+
+  private def readAttachmentFromCosmos[T](doc: DocInfo, attachmentUri: Uri, sink: Sink[ByteString, Future[T]])(
+    implicit transid: TransactionId): Future[T] = {
+    val name = attachmentUri.path
+    val start = transid.started(
+      this,
+      LoggingMarkers.DATABASE_ATT_GET,
+      s"[ATT_GET] '$collName' finding attachment '$name' of document '$doc'")
+    checkDocHasRevision(doc)
+
+    val f = client
+      .readAttachment(s"${selfLinkOf(doc.id)}/attachments/$name", matchRevOption(doc))
+      .head()
+      .flatMap(a => client.readMedia(a.getResource.getMediaLink).head())
+      .transform(
+        { r =>
+          //Here stream can only be fetched once
+          StreamConverters
+            .fromInputStream(() => r.getMedia)
+            .runWith(sink)
+        }, {
+          case e: DocumentClientException if isNotFound(e) =>
+            transid.finished(
+              this,
+              start,
+              s"[ATT_GET] '$collName', retrieving attachment '$name' of document '$doc'; not found.")
+            NoDocumentException("not found on 'delete'")
+          case e => e
+        })
+      .flatMap(identity)
+
+    reportFailure(
+      f,
+      start,
+      failure => s"[ATT_GET] '$collName' internal error, name: '$name', doc: '$doc', failure: '${failure.getMessage}'")
+  }
+
+  override protected[core] def deleteAttachments[T](doc: DocInfo)(implicit transid: TransactionId): Future[Boolean] =
+    attachmentStore
+      .map(as => as.deleteAttachments(doc.id))
+      .getOrElse(Future.successful(true)) // For CosmosDB it is expected that the entire document is deleted.
+
+  override def shutdown(): Unit = clientRef.close()
+
+  private def isNotFound[A <: DocumentAbstraction](e: DocumentClientException) =
+    e.getStatusCode == StatusCodes.NotFound.intValue
+
+  private def isConflict(e: DocumentClientException) = {
+    e.getStatusCode == StatusCodes.Conflict.intValue || e.getStatusCode == StatusCodes.PreconditionFailed.intValue
+  }
+
+  private def toCosmosDoc(json: JsObject): Document = {
+    val computedJs = documentHandler.computedFields(json)
+    val computedOpt = if (computedJs.fields.nonEmpty) Some(computedJs) else None
+    val fieldsToAdd =
+      Seq(
+        (cid, Some(JsString(escapeId(json.fields(_id).convertTo[String])))),
+        (etag, json.fields.get(_rev)),
+        (computed, computedOpt))
+    val fieldsToRemove = Seq(_id, _rev)
+    val mapped = transform(json, fieldsToAdd, fieldsToRemove)
+    val doc = new Document(mapped.compactPrint)
+    doc.set(selfLink, createSelfLink(doc.getId))
+    doc
+  }
+
+  private def queryResultToWhiskJsonDoc(doc: Document): JsObject = {
+    val docJson = doc.toJson.parseJson.asJsObject
+    //If includeDocs is true then document json is to be used
+    val js = if (doc.has(alias)) docJson.fields(alias).asJsObject else docJson
+    val id = js.fields(cid).convertTo[String]
+    toWhiskJsonDoc(js, id, None)
+  }
+
+  private def getResultToWhiskJsonDoc(doc: Document): JsObject = {
+    checkDoc(doc)
+    val js = doc.toJson.parseJson.asJsObject
+    toWhiskJsonDoc(js, doc.getId, Some(JsString(doc.getETag)))
+  }
+
+  private def toWhiskJsonDoc(js: JsObject, id: String, etag: Option[JsString]): JsObject = {
+    val fieldsToAdd = Seq((_id, Some(JsString(unescapeId(id)))), (_rev, etag))
+    transform(stripInternalFields(js), fieldsToAdd, Seq.empty)
+  }
+
+  private def transform(json: JsObject, fieldsToAdd: Seq[(String, Option[JsValue])], fieldsToRemove: Seq[String]) = {
+    val fields = json.fields ++ fieldsToAdd.flatMap(f => f._2.map((f._1, _))) -- fieldsToRemove
+    JsObject(fields)
+  }
+
+  private def stripInternalFields(js: JsObject) = {
+    //Strip out all field name starting with '_' which are considered as db specific internal fields
+    JsObject(js.fields.filter { case (k, _) => !k.startsWith("_") && k != cid })
+  }
+
+  private def toDocInfo[T <: Resource](doc: T) = {
+    checkDoc(doc)
+    DocInfo(DocId(unescapeId(doc.getId)), DocRevision(doc.getETag))
+  }
+
+  private def selfLinkOf(id: DocId) = createSelfLink(escapeId(id.id))
+
+  private def createSelfLink(id: String) = s"dbs/${database.getId}/colls/${collection.getId}/docs/$id"
+
+  private def matchRevOption(info: DocInfo): RequestOptions = matchRevOption(escapeId(info.id.id), info.rev.rev)
+
+  private def matchRevOption(id: String, etag: String): RequestOptions = {
+    val options = newRequestOption(id)
+    val condition = new AccessCondition
+    condition.setCondition(etag)
+    options.setAccessCondition(condition)
+    options
+  }
+
+  //Using DummyImplicit to allow overloading work with type erasure of DocId AnyVal
+  private def newRequestOption(id: DocId)(implicit i: DummyImplicit): RequestOptions = newRequestOption(escapeId(id.id))
+
+  private def newRequestOption(id: String) = {
+    val options = new RequestOptions
+    options.setPartitionKey(new PartitionKey(id))
+    options
+  }
+
+  private def newFeedOptions() = {
+    val options = new FeedOptions()
+    options.setEnableCrossPartitionQuery(true)
+    options
+  }
+
+  private def checkDoc[T <: Resource](doc: T): Unit = {
+    require(doc.getId != null, s"$doc does not have id field set")
+    require(doc.getETag != null, s"$doc does not have etag field set")
+  }
+}
diff --git a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
new file mode 100644
index 0000000..d3943bf
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.cosmosdb
+
+import java.io.Closeable
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
+import spray.json.RootJsonFormat
+import whisk.common.Logging
+import whisk.core.database._
+import pureconfig._
+import whisk.core.entity.size._
+import whisk.core.ConfigKeys
+import whisk.core.database.cosmosdb.CosmosDBUtil.createClient
+import whisk.core.entity.{DocumentReader, WhiskActivation, WhiskAuth, WhiskEntity}
+
+import scala.reflect.ClassTag
+
+case class CosmosDBConfig(endpoint: String, key: String, db: String)
+
+case class ClientHolder(client: AsyncDocumentClient) extends Closeable {
+  override def close(): Unit = client.close()
+}
+
+object CosmosDBArtifactStoreProvider extends ArtifactStoreProvider {
+  type DocumentClientRef = ReferenceCounted[ClientHolder]#CountedReference
+  private lazy val config = loadConfigOrThrow[CosmosDBConfig](ConfigKeys.cosmosdb)
+  private var clientRef: ReferenceCounted[ClientHolder] = _
+
+  override def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean)(
+    implicit jsonFormat: RootJsonFormat[D],
+    docReader: DocumentReader,
+    actorSystem: ActorSystem,
+    logging: Logging,
+    materializer: ActorMaterializer): ArtifactStore[D] = {
+    makeStoreForClient(config, getOrCreateReference(config), getAttachmentStore())
+  }
+
+  def makeArtifactStore[D <: DocumentSerializer: ClassTag](config: CosmosDBConfig,
+                                                           attachmentStore: Option[AttachmentStore])(
+    implicit jsonFormat: RootJsonFormat[D],
+    docReader: DocumentReader,
+    actorSystem: ActorSystem,
+    logging: Logging,
+    materializer: ActorMaterializer): ArtifactStore[D] = {
+
+    makeStoreForClient(config, createReference(config).reference(), attachmentStore)
+  }
+
+  private def makeStoreForClient[D <: DocumentSerializer: ClassTag](config: CosmosDBConfig,
+                                                                    clientRef: DocumentClientRef,
+                                                                    attachmentStore: Option[AttachmentStore])(
+    implicit jsonFormat: RootJsonFormat[D],
+    docReader: DocumentReader,
+    actorSystem: ActorSystem,
+    logging: Logging,
+    materializer: ActorMaterializer): ArtifactStore[D] = {
+
+    val classTag = implicitly[ClassTag[D]]
+    val (dbName, handler, viewMapper) = handlerAndMapper(classTag)
+
+    new CosmosDBArtifactStore(
+      dbName,
+      config,
+      clientRef,
+      handler,
+      viewMapper,
+      loadConfigOrThrow[InliningConfig](ConfigKeys.db),
+      attachmentStore)
+  }
+
+  private def handlerAndMapper[D](entityType: ClassTag[D])(
+    implicit actorSystem: ActorSystem,
+    logging: Logging,
+    materializer: ActorMaterializer): (String, DocumentHandler, CosmosDBViewMapper) = {
+    val entityClass = entityType.runtimeClass
+    if (entityClass == classOf[WhiskEntity]) ("whisks", WhisksHandler, WhisksViewMapper)
+    else if (entityClass == classOf[WhiskActivation]) ("activations", ActivationHandler, ActivationViewMapper)
+    else if (entityClass == classOf[WhiskAuth]) ("subjects", SubjectHandler, SubjectViewMapper)
+    else throw new IllegalArgumentException(s"Unsupported entity type $entityType")
+  }
+
+  /*
+   * This method ensures that all store instances share same client instance and thus the underlying connection pool.
+   * Synchronization is required to ensure concurrent init of various store instances share same ref instance
+   */
+  private def getOrCreateReference(config: CosmosDBConfig) = synchronized {
+    if (clientRef == null || clientRef.isClosed) {
+      clientRef = createReference(config)
+    }
+    clientRef.reference()
+  }
+
+  private def createReference(config: CosmosDBConfig) =
+    new ReferenceCounted[ClientHolder](ClientHolder(createClient(config)))
+
+}
diff --git a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBSupport.scala b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBSupport.scala
new file mode 100644
index 0000000..d963094
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBSupport.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.cosmosdb
+
+import com.microsoft.azure.cosmosdb._
+import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable
+
+private[cosmosdb] trait CosmosDBSupport extends RxObservableImplicits with CosmosDBUtil {
+  protected def config: CosmosDBConfig
+  protected def collName: String
+  protected def client: AsyncDocumentClient
+  protected def viewMapper: CosmosDBViewMapper
+
+  def initialize(): (Database, DocumentCollection) = {
+    val db = getOrCreateDatabase()
+    (db, getOrCreateCollection(db))
+  }
+
+  private def getOrCreateDatabase(): Database = {
+    client
+      .queryDatabases(querySpec(config.db), null)
+      .blockingOnlyResult()
+      .getOrElse {
+        client.createDatabase(newDatabase, null).blockingResult()
+      }
+  }
+
+  private def getOrCreateCollection(database: Database) = {
+    client
+      .queryCollections(database.getSelfLink, querySpec(collName), null)
+      .blockingOnlyResult()
+      .map { coll =>
+        if (matchingIndexingPolicy(coll)) {
+          coll
+        } else {
+          //Modify the found collection with latest policy as its selfLink is set
+          coll.setIndexingPolicy(viewMapper.indexingPolicy.asJava())
+          client.replaceCollection(coll, null).blockingResult()
+        }
+      }
+      .getOrElse {
+        client.createCollection(database.getSelfLink, newDatabaseCollection, null).blockingResult()
+      }
+  }
+
+  private def matchingIndexingPolicy(coll: DocumentCollection): Boolean =
+    IndexingPolicy.isSame(viewMapper.indexingPolicy, IndexingPolicy(coll.getIndexingPolicy))
+
+  private def newDatabaseCollection = {
+    val defn = new DocumentCollection
+    defn.setId(collName)
+    defn.setIndexingPolicy(viewMapper.indexingPolicy.asJava())
+    defn.setPartitionKey(viewMapper.partitionKeyDefn)
+    defn
+  }
+
+  private def newDatabase = {
+    val databaseDefinition = new Database
+    databaseDefinition.setId(config.db)
+    databaseDefinition
+  }
+
+  /**
+   * Prepares a query for fetching any resource by id
+   */
+  protected def querySpec(id: String) =
+    new SqlQuerySpec("SELECT * FROM root r WHERE r.id=@id", new SqlParameterCollection(new SqlParameter("@id", id)))
+
+  protected def asSeq[T <: Resource](r: FeedResponse[T]): immutable.Seq[T] = r.getResults.asScala.to[immutable.Seq]
+}
diff --git a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBUtil.scala b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBUtil.scala
new file mode 100644
index 0000000..26fc967
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBUtil.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.cosmosdb
+
+import com.microsoft.azure.cosmosdb._
+import com.microsoft.azure.cosmosdb.internal.Constants.Properties.{AGGREGATE, E_TAG, ID, SELF_LINK}
+import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
+import whisk.core.database.cosmosdb.CosmosDBConstants._
+
+import scala.collection.immutable.Iterable
+
+private[cosmosdb] object CosmosDBConstants {
+  val computed: String = "_c"
+
+  val alias: String = "view"
+
+  val cid: String = ID
+
+  val etag: String = E_TAG
+
+  val aggregate: String = AGGREGATE
+
+  val selfLink: String = SELF_LINK
+}
+
+private[cosmosdb] trait CosmosDBUtil {
+
+  /**
+   * Prepares the json like select clause
+   * {{{
+   *   Seq("a", "b", "c.d.e") =>
+   *   { "a" : r['a'], "b" : r['b'], "c" : { "d" : { "e" : r['c']['d']['e']}}, "id" : r['id']} AS view
+   * }}}
+   * Here it uses {{{r['keyName']}}} notation to avoid issues around using reserved words as field name
+   */
+  def prepareFieldClause(fields: Iterable[String]): String = {
+    val m = fields.foldLeft(Map.empty[String, Any]) { (map, name) =>
+      addToMap(name, map)
+    }
+    val withId = addToMap(cid, m)
+    val json = asJsonLikeString(withId)
+    s"$json AS $alias"
+  }
+
+  private def addToMap(name: String, map: Map[String, _]): Map[String, Any] = name.split('.').toList match {
+    case Nil     => throw new IllegalStateException(s"'$name' split on '.' should not result in empty list")
+    case x :: xs => addToMap(x, xs, Nil, map)
+  }
+
+  private def addToMap(key: String,
+                       children: List[String],
+                       keyPath: List[String],
+                       map: Map[String, Any]): Map[String, Any] = children match {
+    case Nil => map + (key -> s"r${makeKeyPath(key :: keyPath)}")
+    case x :: xs =>
+      map + (key -> addToMap(x, xs, key :: keyPath, map.getOrElse(key, Map.empty).asInstanceOf[Map[String, Any]]))
+  }
+
+  private def makeKeyPath(keyPath: List[String]) = keyPath.reverse.map(f => s"['$f']").mkString
+
+  private def asJsonLikeString(m: Map[_, _]) =
+    m.map { case (k, v) => s""" "$k" : ${asString(v)}""" }.mkString("{", ",", "}")
+
+  private def asString(v: Any): String = v match {
+    case m: Map[_, _] => asJsonLikeString(m)
+    case x            => x.toString
+  }
+
+  def createClient(config: CosmosDBConfig): AsyncDocumentClient =
+    new AsyncDocumentClient.Builder()
+      .withServiceEndpoint(config.endpoint)
+      .withMasterKey(config.key)
+      .withConnectionPolicy(ConnectionPolicy.GetDefault)
+      .withConsistencyLevel(ConsistencyLevel.Session)
+      .build
+
+  /**
+   * CosmosDB id considers '/', '\' , '?' and '#' as invalid. EntityNames can include '/' so
+   * that need to be escaped. For that we use '|' as the replacement char
+   */
+  def escapeId(id: String): String = {
+    require(!id.contains("|"), s"Id [$id] should not contain '|'")
+    id.replace("/", "|")
+  }
+
+  def unescapeId(id: String): String = {
+    require(!id.contains("/"), s"Escaped Id [$id] should not contain '/'")
+    id.replace("|", "/")
+  }
+
+}
+
+private[cosmosdb] object CosmosDBUtil extends CosmosDBUtil
diff --git a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBViewMapper.scala b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBViewMapper.scala
new file mode 100644
index 0000000..f3d7353
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBViewMapper.scala
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.cosmosdb
+
+import java.util.Collections
+
+import com.microsoft.azure.cosmosdb.DataType.{Number, String}
+import com.microsoft.azure.cosmosdb.IndexKind.{Hash, Range}
+import com.microsoft.azure.cosmosdb.IndexingMode.Lazy
+import com.microsoft.azure.cosmosdb.{PartitionKeyDefinition, SqlParameter, SqlParameterCollection, SqlQuerySpec}
+import whisk.core.database.ActivationHandler.NS_PATH
+import whisk.core.database.WhisksHandler.ROOT_NS
+import whisk.core.database.cosmosdb.CosmosDBConstants.{alias, computed}
+import whisk.core.database.{
+  ActivationHandler,
+  DocumentHandler,
+  SubjectHandler,
+  UnsupportedQueryKeys,
+  UnsupportedView,
+  WhisksHandler
+}
+import whisk.core.entity.WhiskEntityQueries.TOP
+
+private[cosmosdb] trait CosmosDBViewMapper {
+  protected val NOTHING = ""
+  protected val ALL_FIELDS = "*"
+  protected def handler: DocumentHandler
+
+  def prepareQuery(ddoc: String,
+                   viewName: String,
+                   startKey: List[Any],
+                   endKey: List[Any],
+                   limit: Int,
+                   includeDocs: Boolean,
+                   descending: Boolean): SqlQuerySpec
+
+  def prepareCountQuery(ddoc: String, viewName: String, startKey: List[Any], endKey: List[Any]): SqlQuerySpec
+
+  def indexingPolicy: IndexingPolicy
+
+  val partitionKeyDefn: PartitionKeyDefinition = {
+    val defn = new PartitionKeyDefinition
+    defn.setPaths(Collections.singletonList("/id"))
+    defn
+  }
+
+  protected def checkKeys(startKey: List[Any], endKey: List[Any]): Unit = {
+    require(startKey.nonEmpty)
+    require(endKey.nonEmpty)
+    require(startKey.head == endKey.head, s"First key should be same => ($startKey) - ($endKey)")
+  }
+
+  protected def prepareSpec(query: String, params: List[(String, Any)]): SqlQuerySpec = {
+    val paramColl = new SqlParameterCollection
+    params.foreach { case (k, v) => paramColl.add(new SqlParameter(k, v)) }
+
+    new SqlQuerySpec(query, paramColl)
+  }
+}
+
+private[cosmosdb] abstract class SimpleMapper extends CosmosDBViewMapper {
+
+  def prepareQuery(ddoc: String,
+                   viewName: String,
+                   startKey: List[Any],
+                   endKey: List[Any],
+                   limit: Int,
+                   includeDocs: Boolean,
+                   descending: Boolean): SqlQuerySpec = {
+    checkKeys(startKey, endKey)
+
+    val selectClause = select(ddoc, viewName, limit, includeDocs)
+    val whereClause = where(ddoc, viewName, startKey, endKey)
+    val orderField = orderByField(ddoc, viewName)
+    val order = if (descending) "DESC" else NOTHING
+
+    val query = s"SELECT $selectClause FROM root r WHERE ${whereClause._1} ORDER BY $orderField $order"
+
+    prepareSpec(query, whereClause._2)
+  }
+
+  def prepareCountQuery(ddoc: String, viewName: String, startKey: List[Any], endKey: List[Any]): SqlQuerySpec = {
+    checkKeys(startKey, endKey)
+
+    val whereClause = where(ddoc, viewName, startKey, endKey)
+    val query = s"SELECT TOP 1 VALUE COUNT(r) FROM root r WHERE ${whereClause._1}"
+
+    prepareSpec(query, whereClause._2)
+  }
+
+  private def select(ddoc: String, viewName: String, limit: Int, includeDocs: Boolean): String = {
+    val fieldClause = if (includeDocs) ALL_FIELDS else prepareFieldClause(ddoc, viewName)
+    s"${top(limit)} $fieldClause"
+  }
+
+  private def top(limit: Int): String = {
+    if (limit > 0) s"TOP $limit" else NOTHING
+  }
+
+  private def prepareFieldClause(ddoc: String, viewName: String) =
+    CosmosDBUtil.prepareFieldClause(handler.fieldsRequiredForView(ddoc, viewName))
+
+  protected def where(ddoc: String,
+                      viewName: String,
+                      startKey: List[Any],
+                      endKey: List[Any]): (String, List[(String, Any)])
+
+  protected def orderByField(ddoc: String, viewName: String): String
+}
+
+private[cosmosdb] object WhisksViewMapper extends SimpleMapper {
+  private val NS = "namespace"
+  private val ROOT_NS_C = s"$computed.$ROOT_NS"
+  private val TYPE = "entityType"
+  private val UPDATED = "updated"
+  private val PUBLISH = "publish"
+  private val BINDING = "binding"
+
+  val handler = WhisksHandler
+
+  override def indexingPolicy: IndexingPolicy =
+    IndexingPolicy(
+      includedPaths = Set(
+        IncludedPath(s"/$TYPE/?", Index(Hash, String, -1)),
+        IncludedPath(s"/$NS/?", Index(Hash, String, -1)),
+        IncludedPath(s"/$computed/$ROOT_NS/?", Index(Hash, String, -1)),
+        IncludedPath(s"/$UPDATED/?", Index(Range, Number, -1))))
+
+  override protected def where(ddoc: String,
+                               view: String,
+                               startKey: List[Any],
+                               endKey: List[Any]): (String, List[(String, Any)]) = {
+    val entityType = WhisksHandler.getEntityTypeForDesignDoc(ddoc, view)
+    val namespace = startKey.head
+
+    val (vc, vcParams) =
+      viewConditions(ddoc, view).map(q => (s"${q._1} AND", q._2)).getOrElse((NOTHING, Nil))
+
+    val params = ("@entityType", entityType) :: ("@namespace", namespace) :: vcParams
+    val baseCondition = s"$vc r.$TYPE = @entityType AND (r.$NS = @namespace OR r.$ROOT_NS_C = @namespace)"
+
+    (startKey, endKey) match {
+      case (_ :: Nil, _ :: `TOP` :: Nil) =>
+        (baseCondition, params)
+
+      case (_ :: (since: Number) :: Nil, _ :: `TOP` :: `TOP` :: Nil) =>
+        (s"$baseCondition AND r.$UPDATED >= @since", ("@since", since) :: params)
+
+      case (_ :: (since: Number) :: Nil, _ :: (upto: Number) :: `TOP` :: Nil) =>
+        (s"$baseCondition AND (r.$UPDATED BETWEEN @since AND @upto)", ("@upto", upto) :: ("@since", since) :: params)
+
+      case _ => throw UnsupportedQueryKeys(s"$ddoc/$view -> ($startKey, $endKey)")
+    }
+  }
+
+  private def viewConditions(ddoc: String, view: String): Option[(String, List[(String, Any)])] = {
+    view match {
+      case "packages-public" if ddoc.startsWith("whisks") =>
+        Some(s"r.$PUBLISH = true AND (NOT IS_OBJECT(r.$BINDING) OR r.$BINDING = {})", Nil)
+      case _ => None
+    }
+  }
+
+  override protected def orderByField(ddoc: String, view: String): String = view match {
+    case "actions" | "rules" | "triggers" | "packages" | "packages-public" if ddoc.startsWith("whisks") =>
+      s"r.$UPDATED"
+    case _ => throw UnsupportedView(s"$ddoc/$view")
+  }
+
+}
+private[cosmosdb] object ActivationViewMapper extends SimpleMapper {
+  private val NS = "namespace"
+  private val NS_WITH_PATH = s"$computed.$NS_PATH"
+  private val START = "start"
+
+  val handler = ActivationHandler
+
+  override def indexingPolicy: IndexingPolicy =
+    IndexingPolicy(
+      mode = Lazy,
+      includedPaths = Set(
+        IncludedPath(s"/$NS/?", Index(Hash, String, -1)),
+        IncludedPath(s"/$computed/$NS_PATH/?", Index(Hash, String, -1)),
+        IncludedPath(s"/$START/?", Index(Range, Number, -1))))
+
+  override protected def where(ddoc: String,
+                               view: String,
+                               startKey: List[Any],
+                               endKey: List[Any]): (String, List[(String, Any)]) = {
+    val nsValue = startKey.head.asInstanceOf[String]
+    view match {
+      //whisks-filters ddoc uses namespace + invoking action path as first key
+      case "activations" if ddoc.startsWith("whisks-filters") =>
+        filterActivation(NS_WITH_PATH, nsValue, startKey, endKey)
+      //whisks ddoc uses namespace as first key
+      case "activations" if ddoc.startsWith("whisks") => filterActivation(NS, nsValue, startKey, endKey)
+      case _                                          => throw UnsupportedView(s"$ddoc/$view")
+    }
+  }
+
+  private def filterActivation(nsKey: String,
+                               nsValue: String,
+                               startKey: List[Any],
+                               endKey: List[Any]): (String, List[(String, Any)]) = {
+    val params = ("@nsvalue", nsValue) :: Nil
+    val filter = (startKey, endKey) match {
+      case (_ :: Nil, _ :: `TOP` :: Nil) =>
+        (s"r.$nsKey = @nsvalue", params)
+      case (_ :: (since: Number) :: Nil, _ :: `TOP` :: `TOP` :: Nil) =>
+        (s"r.$nsKey = @nsvalue AND r.$START >= @start", ("@start", since) :: params)
+      case (_ :: (since: Number) :: Nil, _ :: (upto: Number) :: `TOP` :: Nil) =>
+        (s"r.$nsKey = @nsvalue AND (r.$START BETWEEN @start AND @upto)", ("@upto", upto) :: ("@start", since) :: params)
+      case _ => throw UnsupportedQueryKeys(s"$startKey, $endKey")
+    }
+    filter
+  }
+
+  override protected def orderByField(ddoc: String, view: String): String = view match {
+    case "activations" if ddoc.startsWith("whisks") => s"r.$START"
+    case _                                          => throw UnsupportedView(s"$ddoc/$view")
+  }
+}
+private[cosmosdb] object SubjectViewMapper extends CosmosDBViewMapper {
+  private val UUID = "uuid"
+  private val KEY = "key"
+  private val NSS = "namespaces"
+  private val CONCURRENT_INVOCATIONS = "concurrentInvocations"
+  private val INVOCATIONS_PER_MIN = "invocationsPerMinute"
+  private val BLOCKED = "blocked"
+  private val SUBJECT = "subject"
+  private val NAME = "name"
+
+  val handler = SubjectHandler
+
+  override def indexingPolicy: IndexingPolicy =
+    //Booleans are indexed by default
+    //Specifying less precision for key as match on uuid should be sufficient
+    //and keys are bigger
+    IndexingPolicy(
+      includedPaths = Set(
+        IncludedPath(s"/$UUID/?", Index(Hash, String, -1)),
+        IncludedPath(s"/$NSS/[]/$NAME/?", Index(Hash, String, -1)),
+        IncludedPath(s"/$SUBJECT/?", Index(Hash, String, -1)),
+        IncludedPath(s"/$NSS/[]/$UUID/?", Index(Hash, String, -1)),
+        IncludedPath(s"/$CONCURRENT_INVOCATIONS/?", Index(Hash, Number, -1)),
+        IncludedPath(s"/$INVOCATIONS_PER_MIN/?", Index(Hash, Number, -1))))
+
+  override def prepareQuery(ddoc: String,
+                            view: String,
+                            startKey: List[Any],
+                            endKey: List[Any],
+                            limit: Int,
+                            includeDocs: Boolean,
+                            descending: Boolean): SqlQuerySpec =
+    prepareQuery(ddoc, view, startKey, endKey, count = false)
+
+  override def prepareCountQuery(ddoc: String, view: String, startKey: List[Any], endKey: List[Any]): SqlQuerySpec =
+    prepareQuery(ddoc, view, startKey, endKey, count = true)
+
+  private def prepareQuery(ddoc: String,
+                           view: String,
+                           startKey: List[Any],
+                           endKey: List[Any],
+                           count: Boolean): SqlQuerySpec = {
+    require(startKey == endKey, s"startKey: $startKey and endKey: $endKey must be same for $ddoc/$view")
+    (ddoc, view) match {
+      case ("subjects", "identities") =>
+        queryForMatchingSubjectOrNamespace(ddoc, view, startKey, endKey, count)
+      case ("namespaceThrottlings", "blockedNamespaces") =>
+        queryForBlacklistedNamespace(count)
+      case _ =>
+        throw UnsupportedView(s"$ddoc/$view")
+    }
+  }
+
+  private def queryForMatchingSubjectOrNamespace(ddoc: String,
+                                                 view: String,
+                                                 startKey: List[Any],
+                                                 endKey: List[Any],
+                                                 count: Boolean): SqlQuerySpec = {
+    val notBlocked = s"(NOT(IS_DEFINED(r.$BLOCKED)) OR r.$BLOCKED = false)"
+    val (where, params) = startKey match {
+      case (ns: String) :: Nil =>
+        (s"$notBlocked AND (r.$SUBJECT = @name OR n.$NAME = @name)", ("@name", ns) :: Nil)
+      case (uuid: String) :: (key: String) :: Nil =>
+        (
+          s"$notBlocked AND ((r.$UUID = @uuid AND r.$KEY = @key) OR (n.$UUID = @uuid AND n.$KEY = @key))",
+          ("@uuid", uuid) :: ("@key", key) :: Nil)
+      case _ => throw UnsupportedQueryKeys(s"$ddoc/$view -> ($startKey, $endKey)")
+    }
+    prepareSpec(s"SELECT ${selectClause(count)} AS $alias FROM root r JOIN n in r.namespaces WHERE $where", params)
+  }
+
+  private def queryForBlacklistedNamespace(count: Boolean): SqlQuerySpec =
+    prepareSpec(
+      s"""SELECT ${selectClause(count)} AS $alias
+                  FROM   root r
+                  WHERE  r.$BLOCKED = true
+                          OR r.$CONCURRENT_INVOCATIONS = 0
+                          OR r.$INVOCATIONS_PER_MIN = 0 """,
+      Nil)
+
+  private def selectClause(count: Boolean) = if (count) "TOP 1 VALUE COUNT(r)" else "r"
+}
diff --git a/common/scala/src/main/scala/whisk/core/database/cosmosdb/IndexingPolicy.scala b/common/scala/src/main/scala/whisk/core/database/cosmosdb/IndexingPolicy.scala
new file mode 100644
index 0000000..83c182e
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/cosmosdb/IndexingPolicy.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.cosmosdb
+
+import com.microsoft.azure.cosmosdb.{
+  DataType,
+  HashIndex,
+  IndexKind,
+  IndexingMode,
+  RangeIndex,
+  ExcludedPath => JExcludedPath,
+  IncludedPath => JIncludedPath,
+  Index => JIndex,
+  IndexingPolicy => JIndexingPolicy
+}
+
+import scala.collection.JavaConverters._
+
+/**
+ * Scala based IndexingPolicy type which maps to java based IndexingPolicy. This is done for 2 reasons
+ *
+ *  - Simplify constructing policy instance
+ *  - Enable custom comparison between existing policy and desired policy as policy instances
+ *    obtained from CosmosDB have extra index type configured per included path. Hence the comparison
+ *    needs to be customized
+ *
+ */
+case class IndexingPolicy(mode: IndexingMode = IndexingMode.Consistent,
+                          includedPaths: Set[IncludedPath],
+                          excludedPaths: Set[ExcludedPath] = Set(ExcludedPath("/"))) {
+
+  def asJava(): JIndexingPolicy = {
+    val policy = new JIndexingPolicy()
+    policy.setIndexingMode(mode)
+    policy.setIncludedPaths(includedPaths.map(_.asJava()).asJava)
+    policy.setExcludedPaths(excludedPaths.map(_.asJava()).asJava)
+    policy
+  }
+}
+
+object IndexingPolicy {
+  def apply(policy: JIndexingPolicy): IndexingPolicy =
+    IndexingPolicy(
+      policy.getIndexingMode,
+      policy.getIncludedPaths.asScala.map(IncludedPath(_)).toSet,
+      policy.getExcludedPaths.asScala.map(ExcludedPath(_)).toSet)
+
+  /**
+   * IndexingPolicy fetched from CosmosDB contains extra entries. So need to check
+   * that at least what we expect is present
+   */
+  def isSame(expected: IndexingPolicy, current: IndexingPolicy): Boolean = {
+    expected.mode == current.mode && expected.excludedPaths == current.excludedPaths &&
+    matchIncludes(expected.includedPaths, current.includedPaths)
+  }
+
+  private def matchIncludes(expected: Set[IncludedPath], current: Set[IncludedPath]): Boolean = {
+    expected.size == current.size && expected.forall { i =>
+      current.find(_.path == i.path) match {
+        case Some(x) => i.indexes.subsetOf(x.indexes)
+        case None    => false
+      }
+    }
+  }
+}
+
+case class IncludedPath(path: String, indexes: Set[Index]) {
+  def asJava(): JIncludedPath = {
+    val includedPath = new JIncludedPath()
+    includedPath.setIndexes(indexes.map(_.asJava()).asJava)
+    includedPath.setPath(path)
+    includedPath
+  }
+}
+
+object IncludedPath {
+  def apply(ip: JIncludedPath): IncludedPath = IncludedPath(ip.getPath, ip.getIndexes.asScala.map(Index(_)).toSet)
+
+  def apply(path: String, index: Index): IncludedPath = IncludedPath(path, Set(index))
+}
+
+case class ExcludedPath(path: String) {
+  def asJava(): JExcludedPath = {
+    val excludedPath = new JExcludedPath()
+    excludedPath.setPath(path)
+    excludedPath
+  }
+}
+
+object ExcludedPath {
+  def apply(ep: JExcludedPath): ExcludedPath = ExcludedPath(ep.getPath)
+}
+
+case class Index(kind: IndexKind, dataType: DataType, precision: Int) {
+  def asJava(): JIndex = kind match {
+    case IndexKind.Hash  => JIndex.Hash(dataType, precision)
+    case IndexKind.Range => JIndex.Range(dataType, precision)
+    case _               => throw new RuntimeException(s"Unsupported kind $kind")
+  }
+}
+
+object Index {
+  def apply(index: JIndex): Index = index match {
+    case i: HashIndex  => Index(i.getKind, i.getDataType, i.getPrecision)
+    case i: RangeIndex => Index(i.getKind, i.getDataType, i.getPrecision)
+    case _             => throw new RuntimeException(s"Unsupported kind $index")
+  }
+}
diff --git a/common/scala/src/main/scala/whisk/core/database/cosmosdb/ReferenceCounted.scala b/common/scala/src/main/scala/whisk/core/database/cosmosdb/ReferenceCounted.scala
new file mode 100644
index 0000000..693cad3
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/cosmosdb/ReferenceCounted.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.cosmosdb
+
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+private[cosmosdb] case class ReferenceCounted[T <: AutoCloseable](private val inner: T) {
+  private val count = new AtomicInteger(0)
+
+  private def inc(): Unit = count.incrementAndGet()
+
+  private def dec(): Unit = {
+    val newCount = count.decrementAndGet()
+    if (newCount <= 0) {
+      inner.close()
+      //Turn count to negative to ensure future reference call fail
+      count.decrementAndGet()
+    }
+  }
+
+  def isClosed: Boolean = count.get() < 0
+
+  def reference(): CountedReference = {
+    require(count.get >= 0, "Reference is already closed")
+    new CountedReference
+  }
+
+  class CountedReference extends AutoCloseable {
+    private val closed = new AtomicBoolean()
+    inc()
+    override def close(): Unit = if (closed.compareAndSet(false, true)) dec()
+
+    def get: T = inner
+  }
+}
diff --git a/common/scala/src/main/scala/whisk/core/database/cosmosdb/RxObservableImplicits.scala b/common/scala/src/main/scala/whisk/core/database/cosmosdb/RxObservableImplicits.scala
new file mode 100644
index 0000000..f981276
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/cosmosdb/RxObservableImplicits.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.cosmosdb
+
+import com.microsoft.azure.cosmosdb.{FeedResponse, Resource, ResourceResponse}
+import rx.lang.scala.JavaConverters._
+import rx.Observable
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{Future, Promise}
+
+private[cosmosdb] trait RxObservableImplicits {
+
+  implicit class RxScalaObservable[T](observable: Observable[T]) {
+
+    /**
+     * Returns the head of the [[Observable]] in a [[scala.concurrent.Future]].
+     *
+     * @return the head result of the [[Observable]].
+     */
+    def head(): Future[T] = {
+      val promise = Promise[T]()
+      observable.asScala.single.subscribe(x => promise.success(x), e => promise.failure(e))
+      promise.future
+    }
+  }
+
+  implicit class RxScalaResourceObservable[T <: Resource](observable: Observable[ResourceResponse[T]]) {
+    def blockingResult(): T = observable.toBlocking.single.getResource
+  }
+
+  implicit class RxScalaFeedObservable[T <: Resource](observable: Observable[FeedResponse[T]]) {
+    def blockingOnlyResult(): Option[T] = {
+      val value = observable.asScala.toList.toBlocking.single
+      val results = value.head.getResults.asScala
+      require(results.isEmpty || results.size == 1, s"More than one result found $results")
+      results.headOption
+    }
+  }
+}
diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2
index 62fa019..59e33f0 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -46,6 +46,12 @@ whisk {
         }
     }
 
+    cosmosdb {
+        endpoint = ${?COSMOSDB_ENDPOINT}
+        key      = ${?COSMOSDB_KEY}
+        db       = ${?COSMOSDB_NAME}
+    }
+
     controller {
       protocol = {{ controller.protocol }}
       https {
diff --git a/tests/src/test/scala/common/TestUtils.java b/tests/src/test/scala/common/TestUtils.java
index a4e7a4b..9fa1082 100644
--- a/tests/src/test/scala/common/TestUtils.java
+++ b/tests/src/test/scala/common/TestUtils.java
@@ -27,10 +27,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.text.SimpleDateFormat;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -201,6 +198,19 @@ public class TestUtils {
     }
 
     /**
+     * Determines if the test build is running for main repo and not on any fork or PR
+     */
+    public static boolean isBuildingOnMainRepo(){
+        //Based on https://docs.travis-ci.com/user/environment-variables/#Default-Environment-Variables
+        String repoName = System.getenv("TRAVIS_REPO_SLUG");
+        if (repoName == null) {
+            return false; //Not a travis build
+        } else {
+            return repoName.startsWith("apache/") && "false".equals(System.getenv("TRAVIS_PULL_REQUEST"));
+        }
+    }
+
+    /**
      * Encapsulates the result of running a native command, providing:
      *   exitCode the exit code of the process
      *   stdout the messages printed to standard out
diff --git a/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala b/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
new file mode 100644
index 0000000..bb45c72
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.cosmosdb
+
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+import whisk.core.entity.size._
+import whisk.core.database.test.behavior.ArtifactStoreBehavior
+
+@RunWith(classOf[JUnitRunner])
+class CosmosDBArtifactStoreTests extends FlatSpec with CosmosDBStoreBehaviorBase with ArtifactStoreBehavior {
+  override protected def maxAttachmentSizeWithoutAttachmentStore = 1.MB
+}
diff --git a/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBAttachmentStoreTests.scala b/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBAttachmentStoreTests.scala
new file mode 100644
index 0000000..5e5da0f
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBAttachmentStoreTests.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.cosmosdb
+
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+import whisk.core.database.DocumentSerializer
+import whisk.core.database.memory.MemoryAttachmentStoreProvider
+import whisk.core.database.test.behavior.ArtifactStoreAttachmentBehaviors
+
+import scala.reflect.ClassTag
+
+@RunWith(classOf[JUnitRunner])
+class CosmosDBAttachmentStoreTests
+    extends FlatSpec
+    with CosmosDBStoreBehaviorBase
+    with ArtifactStoreAttachmentBehaviors {
+  override protected def getAttachmentStore[D <: DocumentSerializer: ClassTag]() =
+    Some(MemoryAttachmentStoreProvider.makeStore[D]())
+}
diff --git a/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala b/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
new file mode 100644
index 0000000..2c12a9b
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.cosmosdb
+
+import org.scalatest.FlatSpec
+import whisk.core.database.test.behavior.ArtifactStoreBehaviorBase
+import whisk.core.database.{ArtifactStore, AttachmentStore, DocumentSerializer}
+import whisk.core.entity.{
+  DocumentReader,
+  WhiskActivation,
+  WhiskAuth,
+  WhiskDocumentReader,
+  WhiskEntity,
+  WhiskEntityJsonFormat
+}
+
+import scala.reflect.{classTag, ClassTag}
+import scala.util.Try
+
+trait CosmosDBStoreBehaviorBase extends FlatSpec with ArtifactStoreBehaviorBase with CosmosDBTestSupport {
+  override def storeType = "CosmosDB"
+
+  override lazy val storeAvailableCheck: Try[Any] = storeConfigTry
+
+  private lazy val config: CosmosDBConfig = storeConfig.copy(db = createTestDB().getId)
+
+  override lazy val authStore = {
+    implicit val docReader: DocumentReader = WhiskDocumentReader
+    CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskAuth](config, getAttachmentStore[WhiskAuth]())
+  }
+
+  override lazy val entityStore =
+    CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskEntity](config, getAttachmentStore[WhiskEntity]())(
+      classTag[WhiskEntity],
+      WhiskEntityJsonFormat,
+      WhiskDocumentReader,
+      actorSystem,
+      logging,
+      materializer)
+
+  override lazy val activationStore = {
+    implicit val docReader: DocumentReader = WhiskDocumentReader
+    CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskActivation](config, getAttachmentStore[WhiskActivation]())
+  }
+
+  override protected def getAttachmentStore(store: ArtifactStore[_]) =
+    store.asInstanceOf[CosmosDBArtifactStore[_]].attachmentStore
+
+  protected def getAttachmentStore[D <: DocumentSerializer: ClassTag](): Option[AttachmentStore] = None
+}
diff --git a/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBSupportTests.scala b/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBSupportTests.scala
new file mode 100644
index 0000000..8ffc807
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBSupportTests.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.cosmosdb
+
+import com.microsoft.azure.cosmosdb.IndexKind.Hash
+import com.microsoft.azure.cosmosdb.DataType.String
+import com.microsoft.azure.cosmosdb.DocumentCollection
+import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[JUnitRunner])
+class CosmosDBSupportTests extends FlatSpec with CosmosDBTestSupport with MockFactory with Matchers {
+
+  behavior of "index"
+
+  it should "be created and updated on init" in {
+    val testDb = createTestDB()
+    val config: CosmosDBConfig = storeConfig.copy(db = testDb.getId)
+
+    val indexedPaths1 = Set("/foo/?", "/bar/?")
+    val (_, coll) = new CosmosTest(config, client, newMapper(indexedPaths1)).initialize()
+    indexedPaths(coll) should contain theSameElementsAs indexedPaths1
+
+    //Test if index definition is updated in code it gets updated in db also
+    val indexedPaths2 = Set("/foo/?", "/bar2/?")
+    val (_, coll2) = new CosmosTest(config, client, newMapper(indexedPaths2)).initialize()
+    indexedPaths(coll2) should contain theSameElementsAs indexedPaths2
+  }
+
+  private def newMapper(paths: Set[String]) = {
+    val mapper = stub[CosmosDBViewMapper]
+    mapper.indexingPolicy _ when () returns newTestIndexingPolicy(paths)
+    mapper
+  }
+
+  private def indexedPaths(coll: DocumentCollection) =
+    coll.getIndexingPolicy.getIncludedPaths.asScala.map(_.getPath).toList
+
+  protected def newTestIndexingPolicy(paths: Set[String]): IndexingPolicy =
+    IndexingPolicy(includedPaths = paths.map(p => IncludedPath(p, Index(Hash, String, -1))))
+
+  private class CosmosTest(override val config: CosmosDBConfig,
+                           override val client: AsyncDocumentClient,
+                           mapper: CosmosDBViewMapper)
+      extends CosmosDBSupport {
+    override protected def collName = "test"
+    override protected def viewMapper = mapper
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBTestSupport.scala b/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBTestSupport.scala
new file mode 100644
index 0000000..2743538
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBTestSupport.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.cosmosdb
+
+import com.microsoft.azure.cosmosdb.Database
+import org.scalatest.{BeforeAndAfterAll, FlatSpec}
+import pureconfig.loadConfigOrThrow
+import whisk.core.ConfigKeys
+import whisk.core.database.test.behavior.ArtifactStoreTestUtil.storeAvailable
+
+import scala.collection.mutable.ListBuffer
+import scala.util.{Random, Try}
+
+trait CosmosDBTestSupport extends FlatSpec with BeforeAndAfterAll with RxObservableImplicits {
+  private val dbsToDelete = ListBuffer[Database]()
+
+  lazy val storeConfigTry = Try { loadConfigOrThrow[CosmosDBConfig](ConfigKeys.cosmosdb) }
+  lazy val client = CosmosDBUtil.createClient(storeConfig)
+
+  def storeConfig = storeConfigTry.get
+
+  override protected def withFixture(test: NoArgTest) = {
+    assume(storeAvailable(storeConfigTry), "CosmosDB not configured or available")
+    super.withFixture(test)
+  }
+
+  protected def generateDBName() = {
+    s"travis-${getClass.getSimpleName}-${Random.alphanumeric.take(5).mkString}"
+  }
+
+  protected def createTestDB() = {
+    val databaseDefinition = new Database
+    databaseDefinition.setId(generateDBName())
+    val db = client.createDatabase(databaseDefinition, null).blockingResult()
+    dbsToDelete += db
+    println(s"Credted database ${db.getId}")
+    db
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    dbsToDelete.foreach(db => client.deleteDatabase(db.getSelfLink, null).blockingResult())
+    client.close()
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBUtilTest.scala b/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBUtilTest.scala
new file mode 100644
index 0000000..50ec7f2
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBUtilTest.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.cosmosdb
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers, OptionValues}
+import spray.json._
+import whisk.utils.JsHelpers
+
+@RunWith(classOf[JUnitRunner])
+class CosmosDBUtilTest extends FlatSpec with Matchers with OptionValues {
+
+  behavior of "prepare field"
+
+  it should "always have id" in {
+    val result = fieldsAsJson()
+    val expected = """{"id" : "r['id']"}""".parseJson
+    result shouldBe expected
+    result.fields("id") shouldBe JsString("r['id']")
+  }
+
+  it should "build a json like string" in {
+    val result = fieldsAsJson("a")
+    val expected = """{ "a" : "r['a']", "id" : "r['id']"}""".parseJson
+    result shouldBe expected
+    result.fields("a") shouldBe JsString("r['a']")
+  }
+
+  it should "support nested fields" in {
+    val result = fieldsAsJson("a", "b.c")
+    val expected = """{
+                     |  "a": "r['a']",
+                     |  "b": {
+                     |    "c": "r['b']['c']"
+                     |  },
+                     |  "id": "r['id']"
+                     |}""".stripMargin.parseJson
+    result shouldBe expected
+    JsHelpers.getFieldPath(result, "b", "c").value shouldBe JsString("r['b']['c']")
+  }
+
+  private def fieldsAsJson(fields: String*) = toJson(CosmosDBUtil.prepareFieldClause(fields.toList))
+
+  private def toJson(s: String): JsObject = {
+    //Strip of last `As VIEW`
+    s.replace(" AS view", "")
+      .replaceAll(raw"(r[\w'\[\]]+)", "\"$1\"")
+      .parseJson
+      .asJsObject
+  }
+
+  behavior of "escaping"
+
+  it should "escape /" in {
+    CosmosDBUtil.escapeId("foo/bar") shouldBe "foo|bar"
+  }
+
+  it should "throw exception when input contains replacement char |" in {
+    an[IllegalArgumentException] should be thrownBy CosmosDBUtil.escapeId("foo/bar|baz")
+  }
+
+  it should "support unescaping" in {
+    val s = "foo/bar"
+    CosmosDBUtil.unescapeId(CosmosDBUtil.escapeId(s)) shouldBe s
+  }
+
+}
diff --git a/tests/src/test/scala/whisk/core/database/cosmosdb/IndexingPolicyTests.scala b/tests/src/test/scala/whisk/core/database/cosmosdb/IndexingPolicyTests.scala
new file mode 100644
index 0000000..9f11ce9
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/cosmosdb/IndexingPolicyTests.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.cosmosdb
+
+import com.microsoft.azure.cosmosdb.DataType.String
+import com.microsoft.azure.cosmosdb.IndexKind.{Hash, Range}
+import com.microsoft.azure.cosmosdb.IndexingMode
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class IndexingPolicyTests extends FlatSpec with Matchers {
+  behavior of "IndexingPolicy"
+
+  it should "match same instance" in {
+    val policy =
+      IndexingPolicy(mode = IndexingMode.Lazy, includedPaths = Set(IncludedPath("foo", Index(Hash, String, -1))))
+    IndexingPolicy.isSame(policy, policy) shouldBe true
+  }
+
+  it should "match when same path and subset of indexes" in {
+    val policy =
+      IndexingPolicy(
+        mode = IndexingMode.Lazy,
+        includedPaths = Set(IncludedPath("foo", Index(Hash, String, -1)), IncludedPath("bar", Index(Hash, String, -1))))
+
+    val policy2 =
+      IndexingPolicy(
+        mode = IndexingMode.Lazy,
+        includedPaths = Set(
+          IncludedPath("foo", Index(Hash, String, -1)),
+          IncludedPath("bar", Set(Index(Hash, String, -1), Index(Range, String, -1)))))
+
+    IndexingPolicy.isSame(policy, policy2) shouldBe true
+    IndexingPolicy.isSame(policy2, policy) shouldBe false
+  }
+
+  it should "not match when same path are different" in {
+    val policy =
+      IndexingPolicy(
+        mode = IndexingMode.Lazy,
+        includedPaths = Set(IncludedPath("foo", Index(Hash, String, -1)), IncludedPath("bar", Index(Hash, String, -1))))
+
+    val policy2 =
+      IndexingPolicy(
+        mode = IndexingMode.Lazy,
+        includedPaths = Set(
+          IncludedPath("foo2", Index(Hash, String, -1)),
+          IncludedPath("bar", Set(Index(Hash, String, -1), Index(Range, String, -1)))))
+
+    IndexingPolicy.isSame(policy, policy2) shouldBe false
+  }
+
+  it should "convert and match java IndexingPolicy" in {
+    val policy =
+      IndexingPolicy(
+        mode = IndexingMode.Lazy,
+        includedPaths = Set(
+          IncludedPath("foo", Index(Hash, String, -1)),
+          IncludedPath("bar", Set(Index(Hash, String, -1), Index(Range, String, -1)))))
+
+    val jpolicy = policy.asJava()
+    val policy2 = IndexingPolicy(jpolicy)
+
+    policy shouldBe policy2
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/database/cosmosdb/ReferenceCountedTests.scala b/tests/src/test/scala/whisk/core/database/cosmosdb/ReferenceCountedTests.scala
new file mode 100644
index 0000000..e9b0d85
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/cosmosdb/ReferenceCountedTests.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.cosmosdb
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class ReferenceCountedTests extends FlatSpec with Matchers {
+
+  class CloseProbe extends AutoCloseable {
+    var closed: Boolean = _
+    var closedCount: Int = _
+    override def close(): Unit = {
+      closed = true
+      closedCount += 1
+    }
+  }
+
+  behavior of "ReferenceCounted"
+
+  it should "close only once" in {
+    val probe = new CloseProbe
+    val refCounted = ReferenceCounted(probe)
+
+    val ref1 = refCounted.reference()
+
+    ref1.get should be theSameInstanceAs probe
+    ref1.close()
+    ref1.close()
+
+    probe.closed shouldBe true
+    probe.closedCount shouldBe 1
+    refCounted.isClosed shouldBe true
+  }
+
+  it should "not close with one reference active" in {
+    val probe = new CloseProbe
+    val refCounted = ReferenceCounted(probe)
+
+    val ref1 = refCounted.reference()
+    val ref2 = refCounted.reference()
+
+    ref1.close()
+    ref1.close()
+
+    probe.closed shouldBe false
+  }
+
+  it should "be closed when all reference close" in {
+    val probe = new CloseProbe
+    val refCounted = ReferenceCounted(probe)
+
+    val ref1 = refCounted.reference()
+    val ref2 = refCounted.reference()
+
+    ref1.close()
+    ref2.close()
+
+    probe.closed shouldBe true
+    probe.closedCount shouldBe 1
+  }
+
+  it should "throw exception if closed" in {
+    val probe = new CloseProbe
+    val refCounted = ReferenceCounted(probe)
+
+    val ref1 = refCounted.reference()
+    ref1.close()
+
+    intercept[IllegalArgumentException] {
+      refCounted.reference()
+    }
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
index 82b6b80..1438e79 100644
--- a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
+++ b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
@@ -22,10 +22,10 @@ import java.util.Base64
 
 import akka.http.scaladsl.model.{ContentTypes, Uri}
 import akka.stream.IOResult
+import scala.concurrent.duration.DurationInt
 import akka.stream.scaladsl.{Sink, StreamConverters}
 import akka.util.{ByteString, ByteStringBuilder}
 import whisk.common.TransactionId
-import whisk.core.entity.size._
 import whisk.core.database.{AttachmentSupport, CacheChangeNotification, NoDocumentException}
 import whisk.core.entity.Attachments.{Attached, Attachment, Inline}
 import whisk.core.entity.test.ExecHelpers
@@ -113,17 +113,22 @@ trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase with Ex
     getAttachmentBytes(i2, attached(action2)).futureValue.result() shouldBe decode(code1)
   }
 
-  it should "put and read 5 MB attachment" in {
+  it should "put and read large attachment" in {
     implicit val tid: TransactionId = transid()
-    val size = Math.max(nonInlinedAttachmentSize(entityStore), 5.MB.toBytes.toInt)
+    val size = Math.max(nonInlinedAttachmentSize(entityStore), getAttachmentSizeForTest(entityStore))
     val base64 = encodedRandomBytes(size)
 
     val exec = javaDefault(base64, Some("hello"))
     val javaAction =
       WhiskAction(namespace, EntityName("attachment_large"), exec)
 
+    //Have more patience as reading large attachments take time specially for remote
+    //storage like Cosmos
+    implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 1.minute)
+
     val i1 = WhiskAction.put(entityStore, javaAction, old = None).futureValue
     val action2 = entityStore.get[WhiskAction](i1, attachmentHandler).futureValue
+
     val action3 = WhiskAction.get(entityStore, i1.id, i1.rev).futureValue
 
     docsToDelete += ((entityStore, i1))
diff --git a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala
index 38ef12e..d4bcd5c 100644
--- a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala
+++ b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala
@@ -27,11 +27,13 @@ import spray.json.{JsObject, JsValue}
 import whisk.common.TransactionId
 import whisk.core.database.memory.MemoryAttachmentStore
 import whisk.core.database.test.DbUtils
+import whisk.core.database.test.behavior.ArtifactStoreTestUtil.storeAvailable
 import whisk.core.database.{ArtifactStore, AttachmentStore, StaleParameter}
 import whisk.core.entity._
+import whisk.core.entity.size._
 import whisk.utils.JsHelpers
 
-import scala.util.Random
+import scala.util.{Random, Try}
 
 trait ArtifactStoreBehaviorBase
     extends FlatSpec
@@ -59,6 +61,7 @@ trait ArtifactStoreBehaviorBase
 
   override def afterEach(): Unit = {
     cleanup()
+    stream.reset()
   }
 
   override def afterAll(): Unit = {
@@ -71,6 +74,16 @@ trait ArtifactStoreBehaviorBase
     assertAttachmentStoresAreClosed()
   }
 
+  override protected def withFixture(test: NoArgTest) = {
+    assume(storeAvailable(storeAvailableCheck), s"$storeType not configured or available")
+    val outcome = super.withFixture(test)
+    if (outcome.isFailed) {
+      println(logLines.mkString("\n"))
+    }
+    outcome
+  }
+
+  protected def storeAvailableCheck: Try[Any] = Try(true)
   //~----------------------------------------< utility methods >
 
   protected def query[A <: WhiskEntity](
@@ -149,6 +162,13 @@ trait ArtifactStoreBehaviorBase
     case _                        => None
   }
 
+  protected def getAttachmentSizeForTest(store: ArtifactStore[_]): Int = {
+    val mb = getAttachmentStore(store).map(_ => 5.MB).getOrElse(maxAttachmentSizeWithoutAttachmentStore)
+    mb.toBytes.toInt
+  }
+
+  protected def maxAttachmentSizeWithoutAttachmentStore: ByteSize = 5.MB
+
   private def assertAttachmentStoreIsEmpty(): Unit = {
     Seq(authStore, entityStore, activationStore).foreach { s =>
       for {
diff --git a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreTestUtil.scala b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreTestUtil.scala
new file mode 100644
index 0000000..417a0d0
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreTestUtil.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.test.behavior
+
+import common.TestUtils
+
+import scala.util.{Failure, Success, Try}
+
+object ArtifactStoreTestUtil {
+
+  def storeAvailable(storeAvailableCheck: Try[Any]): Boolean = {
+    storeAvailableCheck match {
+      case Success(_) => true
+      case Failure(x) =>
+        //If running on master on main repo build tests MUST be run
+        //For non main repo runs like in fork or for PR its fine for test
+        //to be cancelled
+        if (TestUtils.isBuildingOnMainRepo) throw x else false
+    }
+  }
+}
diff --git a/tools/db/cosmosDbUtil.py b/tools/db/cosmosDbUtil.py
new file mode 100755
index 0000000..87c3b23
--- /dev/null
+++ b/tools/db/cosmosDbUtil.py
@@ -0,0 +1,194 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from collections import namedtuple
+import glob
+import sys
+import os
+import argparse
+import traceback
+import pydocumentdb.documents as documents
+import pydocumentdb.errors as document_errors
+import pydocumentdb.document_client as document_client
+
+try:
+    import argcomplete
+except ImportError:
+    argcomplete = False
+
+CLI_DIR = os.path.dirname(os.path.realpath(sys.argv[0]))
+# ROOT_DIR is the OpenWhisk repository root
+ROOT_DIR = os.path.join(os.path.join(CLI_DIR, os.pardir), os.pardir)
+
+DbContext = namedtuple('DbContext', ['client', 'db', 'whisks', 'subjects', 'activations'])
+verbose = False
+
+
+def main():
+    global verbose
+    exit_code = 0
+    try:
+        args = parse_args()
+        verbose = args.verbose
+        client = init_client(args)
+        exit_code = {
+            'init': init_cmd,
+            'prune': prune_cmd,
+            'drop': drop_cmd
+        }[args.cmd](args, client)
+    except Exception as e:
+        print('Exception: ', e)
+        traceback.print_exc()
+        exit_code = 1
+
+    sys.exit(exit_code)
+
+
+def parse_args():
+    parser = argparse.ArgumentParser(description='OpenWhisk CosmosDB bootstrap tool')
+    parser.add_argument('--endpoint', help='DB Endpoint url like https://example.documents.azure.com:443/',
+                        required=True)
+    parser.add_argument('--key', help='DB access key', required=True)
+    parser.add_argument('-v', '--verbose', help='Verbose mode', action="store_true")
+
+    subparsers = parser.add_subparsers(title='available commands', dest='cmd')
+
+    propmenu = subparsers.add_parser('init', help='initialize database')
+    propmenu.add_argument('db', help='Database name under which the collections would be created')
+    propmenu.add_argument('--dir', help='Directory under which auth files are stored')
+
+    propmenu = subparsers.add_parser('prune', help='remove stale databases created by test')
+    propmenu.add_argument('--prefix', help='Database name prefix which are matched for removal', default="travis-")
+
+    propmenu = subparsers.add_parser('drop', help='drop database')
+    propmenu.add_argument('db', help='Database name to be removed')
+
+    if argcomplete:
+        argcomplete.autocomplete(parser)
+    return parser.parse_args()
+
+
+def init_cmd(args, client):
+    db = get_or_create_db(client, args.db)
+
+    whisks = init_coll(client, db, "whisks")
+    subjects = init_coll(client, db, "subjects")
+    activations = init_coll(client, db, "activations")
+
+    db_ctx = DbContext(client, db, whisks, subjects, activations)
+    init_auth(db_ctx)
+    return 0
+
+
+def prune_cmd(args, client):
+    # Remove database which are one day old
+    pass
+
+
+def drop_cmd(args, client):
+    db = get_db(client, args.db)
+    if db is not None:
+        client.DeleteDatabase(db['_self'])
+        log("Removed database : %s" % args.db)
+    else:
+        log("Database %s not found" % args.db)
+
+
+def init_auth(ctx):
+    for subject in find_default_subjects():
+        link = create_link(ctx.db, ctx.subjects, subject['id'])
+        options = {'partitionKey': subject.get('id')}
+        try:
+            ctx.client.ReadDocument(link, options)
+            log('Subject already exists : ' + subject['id'])
+        except document_errors.HTTPFailure as e:
+            if e.status_code == 404:
+                ctx.client.CreateDocument(ctx.subjects['_self'], subject, options)
+                log('Created subject : ' + subject['id'])
+            else:
+                raise e
+
+
+def create_link(db, coll, doc_id):
+    return 'dbs/' + db['id'] + '/colls/' + coll['id'] + '/docs/' + doc_id
+
+
+def find_default_subjects():
+    files_dir = os.path.join(ROOT_DIR, "ansible/files")
+    for name in glob.glob1(files_dir, "auth.*"):
+        auth_file = open(os.path.join(files_dir, name), 'r')
+        uuid, key = auth_file.read().strip().split(":")
+        subject = name[name.index('.') + 1:]
+        doc = {
+            'id': subject,
+            'subject': subject,
+            'namespaces': [
+                {
+                    'name': subject,
+                    'uuid': uuid,
+                    'key': key
+                }
+            ]
+        }
+        auth_file.close()
+        yield doc
+
+
+def init_client(args):
+    return document_client.DocumentClient(args.endpoint, {'masterKey': args.key})
+
+
+def get_db(client, db_name):
+    query = client.QueryDatabases('SELECT * FROM root r WHERE r.id=\'' + db_name + '\'')
+    return next(iter(query), None)
+
+
+def get_or_create_db(client, db_name):
+    db = get_db(client, db_name)
+    if db is None:
+        db = client.CreateDatabase({'id': db_name})
+        log('Created database "%s"' % db_name)
+    return db
+
+
+def init_coll(client, db, coll_name):
+    query = client.QueryCollections(db['_self'], 'SELECT * FROM root r WHERE r.id=\'' + coll_name + '\'')
+    it = iter(query)
+    coll = next(it, None)
+    if coll is None:
+        collection_definition = {'id': coll_name,
+                                 'partitionKey':
+                                     {
+                                         'paths': ['/id'],
+                                         'kind': documents.PartitionKind.Hash
+                                     }
+                                 }
+        collection_options = {}  # {'offerThroughput': 10100}
+        coll = client.CreateCollection(db['_self'], collection_definition, collection_options)
+        log('Created collection "%s"' % coll_name)
+    return coll
+
+
+def log(msg):
+    if verbose:
+        print(msg)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/tools/travis/setup.sh b/tools/travis/setup.sh
index 89cc76b..bc16774 100755
--- a/tools/travis/setup.sh
+++ b/tools/travis/setup.sh
@@ -33,3 +33,6 @@ pip install --user couchdb
 
 # Ansible
 pip install --user ansible==2.5.2
+
+# Azure CosmosDB
+pip install --user pydocumentdb


Mime
View raw message