openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rab...@apache.org
Subject [incubator-openwhisk] branch master updated: Update sequence impl to tune controller memory consumption (#2387)
Date Tue, 20 Jun 2017 16:16:39 GMT
This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f399a4  Update sequence impl to tune controller memory consumption (#2387)
7f399a4 is described below

commit 7f399a43e561765a23fb377e7747ca4c8c397ea6
Author: Nick Mitchell <starpit@users.noreply.github.com>
AuthorDate: Tue Jun 20 12:16:36 2017 -0400

    Update sequence impl to tune controller memory consumption (#2387)
    
    - switch to scheduleOnce+weakrefs for timeout handling in SequenceActions
    - switch SequenceAccounting to store array of ActivationId rather than array of String -- cheaper in memory
    -  use better (non-dragging) impl of withTimeout
    - use a getAndSet(null) pattern to avoid two copies of responses being alive simultaneously
    - refactor top level sequence scheduler to eliminate promises
---
 .../scala/whisk/core/entity/ActivationId.scala     |   2 +-
 .../src/main/scala/whisk/http/ErrorResponse.scala  |   3 +-
 .../whisk/utils/ExecutionContextFactory.scala      |  18 +-
 .../core/controller/actions/SequenceActions.scala  | 498 ++++++++++++---------
 .../test/scala/system/basic/WskSequenceTests.scala |  93 ++--
 .../actions/test/SequenceAccountingTests.scala     | 141 ++++++
 .../utils/test/ExecutionContextFactoryTests.scala  |  43 ++
 7 files changed, 531 insertions(+), 267 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/entity/ActivationId.scala b/common/scala/src/main/scala/whisk/core/entity/ActivationId.scala
index 374320c..4a71681 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ActivationId.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ActivationId.scala
@@ -39,7 +39,7 @@ import whisk.http.Messages
  *
  * @param id the activation id, required not null
  */
-protected[core] class ActivationId private (private val id: java.util.UUID) extends AnyVal {
+protected[whisk] class ActivationId private (private val id: java.util.UUID) extends AnyVal {
     def asString = toString
     override def toString = id.toString.replaceAll("-", "")
     def toJsObject = JsObject("activationId" -> toString.toJson)
diff --git a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
index d89d749..547618d 100644
--- a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
+++ b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
@@ -35,6 +35,7 @@ import whisk.common.TransactionId
 import whisk.core.entity.SizeError
 import whisk.core.entity.ByteSize
 import whisk.core.entity.Exec
+import whisk.core.entity.ActivationId
 
 object Messages {
     /** Standard message for reporting resource conflicts. */
@@ -95,7 +96,7 @@ object Messages {
     val notAllowedOnBinding = "Operation not permitted on package binding."
 
     /** Error messages for sequence activations. */
-    val sequenceRetrieveActivationTimeout = "Timeout reached when retrieving activation for sequence component."
+    def sequenceRetrieveActivationTimeout(id: ActivationId) = s"Timeout reached when retrieving activation $id for sequence component."
     val sequenceActivationFailure = "Sequence failed."
 
     /** Error messages for bad requests where parameters do not conform. */
diff --git a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
index bdbd00a..5aa465c 100644
--- a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
+++ b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
@@ -22,16 +22,32 @@ import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.Promise
 import scala.concurrent.duration.FiniteDuration
+import scala.util.Try
 
 import akka.actor.ActorSystem
 import akka.pattern.{ after => expire }
 
 object ExecutionContextFactory {
 
+    // Future.firstCompletedOf has a memory drag bug
+    // https://stackoverflow.com/questions/36420697/about-future-firstcompletedof-and-garbage-collect-mechanism
+    def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
+        val p = Promise[T]()
+        val pref = new java.util.concurrent.atomic.AtomicReference(p)
+        val completeFirst: Try[T] => Unit = { result: Try[T] =>
+            val promise = pref.getAndSet(null)
+            if (promise != null) {
+                promise.tryComplete(result)
+            }
+        }
+        futures foreach { _ onComplete completeFirst }
+        p.future
+    }
+
     implicit class FutureExtensions[T](f: Future[T]) {
         def withTimeout(timeout: FiniteDuration, msg: => Throwable)(implicit system: ActorSystem): Future[T] = {
             implicit val ec = system.dispatcher
-            Future firstCompletedOf Seq(f, expire(timeout, system.scheduler)(Future.failed(msg)))
+            firstCompletedOf(Seq(f, expire(timeout, system.scheduler)(Future.failed(msg))))
         }
     }
 
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
index 4c4ccbe..27dac99 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
@@ -18,16 +18,15 @@ package whisk.core.controller.actions
 
 import java.time.Clock
 import java.time.Instant
+import java.util.concurrent.atomic.AtomicReference
 
-import scala.Left
-import scala.Right
+import scala.collection._
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
-import scala.concurrent.Promise
 import scala.concurrent.duration._
+import scala.language.postfixOps
 import scala.util.Failure
 import scala.util.Success
-import scala.util.Try
 
 import akka.actor.ActorSystem
 import spray.json._
@@ -98,71 +97,59 @@ protected[actions] trait SequenceActions {
         // create new activation id that corresponds to the sequence
         val seqActivationId = activationIdFactory.make()
         logging.info(this, s"invoking sequence $action topmost $topmost activationid '$seqActivationId'")
+
         val start = Instant.now(Clock.systemUTC())
-        val seqActivationPromise = Promise[Option[WhiskActivation]]
-        // the cause for the component activations is the current sequence
-        val futureWskActivations = invokeSequenceComponents(user, action, seqActivationId, payload, components, cause = Some(seqActivationId), atomicActionsCount)
-        val futureSeqResult = Future.sequence(futureWskActivations)
-        val response: Future[(ActivationId, Option[WhiskActivation], Int)] =
-            if (topmost) { // need to deal with blocking and closing connection
-                if (blocking) {
-                    val timeout = maxWaitForBlockingActivation + blockingInvokeGrace
-                    val futureSeqResultTimeout = futureSeqResult withTimeout (timeout, new BlockingInvokeTimeout(seqActivationId))
-                    // if the future fails with a timeout, the failure is dealt with at the caller level
-                    futureSeqResultTimeout map { wskActivationTuples =>
-                        val wskActivationEithers = wskActivationTuples.map(_._1)
-                        // the execution of the sequence was successful, return the result
-                        val end = Instant.now(Clock.systemUTC())
-                        val seqActivation = Some(makeSequenceActivation(user, action, seqActivationId, wskActivationEithers, topmost, cause, start, end))
-                        val atomicActionCnt = wskActivationTuples.last._2
-                        (seqActivationId, seqActivation, atomicActionCnt)
-                    } andThen {
-                        case Success((_, seqActivation, _)) => seqActivationPromise.success(seqActivation)
-                        case Failure(t)                     => seqActivationPromise.success(None)
-                    }
-                } else {
-                    // non-blocking sequence execution, return activation id
-                    Future.successful((seqActivationId, None, 0)) andThen {
-                        case _ => seqActivationPromise.success(None)
-                    }
-                }
+        val futureSeqResult = {
+            completeSequenceActivation(
+                seqActivationId,
+                // the cause for the component activations is the current sequence
+                invokeSequenceComponents(user, action, seqActivationId, payload, components, cause = Some(seqActivationId), atomicActionsCount),
+                user, action, topmost, start, cause)
+        }
+
+        if (topmost) { // need to deal with blocking and closing connection
+            if (blocking) {
+                logging.info(this, s"invoke sequence blocking topmost!")
+                val timeout = maxWaitForBlockingActivation + blockingInvokeGrace
+                // if the future fails with a timeout, the failure is dealt with at the caller level
+                futureSeqResult.withTimeout(timeout, new BlockingInvokeTimeout(seqActivationId))
             } else {
-                // not topmost, no need to worry about terminating incoming request
-                futureSeqResult map { wskActivationTuples =>
-                    val wskActivationEithers = wskActivationTuples.map(_._1)
-                    // all activations are successful, the result of the sequence is the result of the last activation
-                    val end = Instant.now(Clock.systemUTC())
-                    val seqActivation = Some(makeSequenceActivation(user, action, seqActivationId, wskActivationEithers, topmost, cause, start, end))
-                    val atomicActionCnt = wskActivationTuples.last._2
-                    (seqActivationId, seqActivation, atomicActionCnt)
-                } andThen {
-                    case Success((_, seqActivation, _)) => seqActivationPromise.success(seqActivation)
-                    case Failure(t)                     => seqActivationPromise.success(None)
-                }
+                // non-blocking sequence execution, return activation id
+                Future.successful((seqActivationId, None, 0))
             }
+        } else {
+            // not topmost, no need to worry about terminating incoming request
+            // Note: the future for the sequence result recovers from all throwable failures
+            futureSeqResult
+        }
+    }
 
-        // store result of sequence execution
-        // if seqActivation is defined, use it; otherwise create it (e.g., for non-blocking activations)
-        // the execution can reach here without a seqActivation due to non-blocking activations OR blocking activations that reach the blocking invoke timeout
-        // futureSeqResult should always be successful, if failed, there is an error
-        futureSeqResult flatMap { tuples => seqActivationPromise.future map { (tuples, _) } } onComplete {
-            case Success((wskActivationTuples, seqActivation)) =>
-                // all activations were successful
-                val activation = seqActivation getOrElse {
-                    val wskActivationEithers = wskActivationTuples.map(_._1)
-                    val end = Instant.now(Clock.systemUTC())
-                    // the response of the sequence is the response of the very last activation
-                    makeSequenceActivation(user, action, seqActivationId, wskActivationEithers, topmost, cause, start, end)
-                }
-                storeSequenceActivation(activation)
-            case Failure(t: Throwable) =>
-                // consider this whisk error
-                // TODO shall we attempt storing the activation if it exists or even inspect the futures?
-                // this should be a pretty serious whisk errror if it gets here
+    /**
+     * Creates an activation for the sequence and writes it back to the datastore.
+     */
+    private def completeSequenceActivation(
+        seqActivationId: ActivationId,
+        futureSeqResult: Future[SequenceAccounting],
+        user: Identity,
+        action: WhiskAction,
+        topmost: Boolean,
+        start: Instant,
+        cause: Option[ActivationId])(
+            implicit transid: TransactionId): Future[(ActivationId, Some[WhiskActivation], Int)] = {
+        // not topmost, no need to worry about terminating incoming request
+        // Note: the future for the sequence result recovers from all throwable failures
+        futureSeqResult.map { accounting =>
+            // sequence terminated, the result of the sequence is the result of the last completed activation
+            val end = Instant.now(Clock.systemUTC())
+            val seqActivation = makeSequenceActivation(user, action, seqActivationId, accounting, topmost, cause, start, end)
+            (seqActivationId, Some(seqActivation), accounting.atomicActionCnt)
+        }.andThen {
+            case Success((_, Some(seqActivation), _)) => storeSequenceActivation(seqActivation)
+            case Failure(t) =>
+                // This should never happen; in this case, there is no activation record created or stored:
+                // should there be?
                 logging.error(this, s"Sequence activation failed: ${t.getMessage}")
         }
-
-        response
     }
 
     /**
@@ -172,7 +159,7 @@ protected[actions] trait SequenceActions {
         logging.info(this, s"recording activation '${activation.activationId}'")
         WhiskActivation.put(activationStore, activation) onComplete {
             case Success(id) => logging.info(this, s"recorded activation")
-            case Failure(t)  => logging.error(this, s"failed to record activation")
+            case Failure(t)  => logging.error(this, s"failed to record activation ${activation.activationId} with error ${t.getLocalizedMessage}")
         }
     }
 
@@ -183,49 +170,20 @@ protected[actions] trait SequenceActions {
         user: Identity,
         action: WhiskAction,
         activationId: ActivationId,
-        wskActivationEithers: Vector[Either[ActivationResponse, WhiskActivation]],
+        accounting: SequenceAccounting,
         topmost: Boolean,
         cause: Option[ActivationId],
         start: Instant,
         end: Instant): WhiskActivation = {
 
-        // extract all successful activations from the vector of activation eithers
-        // the vector is either all rights, all lefts, or some rights followed by some lefts (no interleaving)
-        val (right, left) = wskActivationEithers.span(_.isRight)
-        val wskActivations = right.map(_.right.get)
-
-        // the activation response is either the first left if it exists or the response of the last successful activation
-        val activationResponse = if (left.length == 0) {
-            wskActivations.last.response
-        } else {
-            left.head.left.get
-        }
-
-        // compose logs
-        val logs = ActivationLogs(wskActivations map {
-            activation => activation.activationId.toString
-        })
-
-        // compute duration
-        val duration = (wskActivations map { activation =>
-            activation.duration getOrElse {
-                logging.error(this, s"duration for $activation is not defined")
-                activation.end.toEpochMilli - activation.start.toEpochMilli
-            }
-        }).sum
-
         // compute max memory
-        val maxMemory = Try {
-            val memoryLimits = wskActivations map { activation =>
-                val limits = ActionLimits.serdes.read(activation.annotations.get("limits").get)
-                limits.memory.megabytes
-            }
-            memoryLimits.max.MB
-        }
-
-        val sequenceLimits = maxMemory map {
-            mb => ActionLimits(action.limits.timeout, MemoryLimit(mb), action.limits.logs)
-        }
+        val sequenceLimits = accounting.maxMemory map {
+            maxMemoryAcrossActionsInSequence =>
+                Parameters("limits", ActionLimits(
+                    action.limits.timeout,
+                    MemoryLimit(maxMemoryAcrossActionsInSequence MB),
+                    action.limits.logs).toJson)
+        } getOrElse (Parameters())
 
         // set causedBy if not topmost sequence
         val causedBy = if (!topmost) {
@@ -243,16 +201,16 @@ protected[actions] trait SequenceActions {
             start = start,
             end = end,
             cause = if (topmost) None else cause, // propagate the cause for inner sequences, but undefined for topmost
-            response = activationResponse,
-            logs = logs,
+            response = accounting.previousResponse.getAndSet(null), // getAndSet(null) drops reference to the activation result
+            logs = accounting.finalLogs,
             version = action.version,
             publish = false,
             annotations = Parameters("topmost", JsBoolean(topmost)) ++
                 Parameters("path", action.fullyQualifiedName(false).toString) ++
                 Parameters("kind", "sequence") ++
                 causedBy ++
-                sequenceLimits.map(l => Parameters("limits", l.toJson)).getOrElse(Parameters()),
-            duration = Some(duration))
+                sequenceLimits,
+            duration = Some(accounting.duration))
     }
 
     /**
@@ -264,96 +222,62 @@ protected[actions] trait SequenceActions {
      * @param user the user invoking the sequence
      * @param seqAction the sequence invoked
      * @param seqActivationId the id of the sequence
-     * @param payload the payload passed to the first component in the sequence
+     * @param inputPayload the payload passed to the first component in the sequence
      * @param components the components in the sequence
      * @param cause the activation id of the sequence that lead to invoking this sequence or None if this sequence is topmost
      * @param atomicActionCnt the dynamic atomic action count observed so far since the start of the execution of the topmost sequence
-     * @return a vector of successful futures; each element contains a tuple with
-     *         1. an either with activation(right) or activation response in case of error (left)
-     *         2. the dynamic atomic action count after executing the components
+     * @return a future which resolves with the accounting for a sequence, including the last result, duration, and activation ids
      */
     private def invokeSequenceComponents(
         user: Identity,
         seqAction: WhiskAction,
         seqActivationId: ActivationId,
-        payload: Option[JsObject],
+        inputPayload: Option[JsObject],
         components: Vector[FullyQualifiedEntityName],
         cause: Option[ActivationId],
         atomicActionCnt: Int)(
-            implicit transid: TransactionId): Vector[Future[(Either[ActivationResponse, WhiskActivation], Int)]] = {
-        logging.info(this, s"invoke sequence $seqAction ($seqActivationId) with components $components")
-
-        // first retrieve the information/entities on all actions
-        // do not wait to successfully retrieve all the actions before starting the execution
-        // start execution of the first action while potentially still retrieving entities
-        // Note: the execution starts even if one of the futures retrieving an entity may fail
-        // first components need to be resolved given any package bindings and the params need to be merged
-        // NOTE: OLD-STYLE sequences may have default namespace in the names of the components, resolve default namespace first
-        val resolvedFutureActions = resolveDefaultNamespace(components, user) map { c => WhiskAction.resolveActionAndMergeParameters(entityStore, c) }
-
-        // "scan" the wskActions to execute them in blocking fashion
-        // use scanLeft instead of foldLeft as we need the intermediate results
-        // TODO: double-check the package param policy
-        // env are the parameters for the package that the sequence is in; throw them away, not used in the sequence execution
-        // create a "fake" WhiskActivation to hold the payload of the sequence to init the scanLeft
-        val fakeStart = Instant.now()
-        val fakeEnd = Instant.now()
-        val fakeResponse = ActivationResponse.payloadPlaceholder(payload)
-
-        // NOTE: the init value is a fake (unused) activation to bootstrap the invocations of actions
-        val initFakeWhiskActivation: Future[(Either[ActivationResponse, WhiskActivation], Int, Boolean)] = Future successful {
-            // use boolean in tuple to indicate first/incoming payload
-            (Right(WhiskActivation(seqAction.namespace, seqAction.name, user.subject, seqActivationId, fakeStart, fakeEnd, response = fakeResponse, duration = None)), atomicActionCnt, true)
+            implicit transid: TransactionId): Future[SequenceAccounting] = {
+
+        // For each action in the sequence, fetch any of its associated parameters (including package or binding).
+        // We do this for all of the actions in the sequence even though it may be short circuited. This is to
+        // hide the latency of the fetches from the datastore and the parameter merging that has to occur. It
+        // may be desirable in the future to selectively speculate over a smaller number of components rather than
+        // the entire sequence.
+        //
+        // This action/parameter resolution is done in futures; the execution starts as soon as the first component
+        // is resolved.
+        val resolvedFutureActions = resolveDefaultNamespace(components, user) map {
+            c => WhiskAction.resolveActionAndMergeParameters(entityStore, c)
+        }
+
+        // this holds the initial value of the accounting structure, including the input boxed as an ActivationResponse
+        val initialAccounting = Future.successful {
+            SequenceAccounting(atomicActionCnt, ActivationResponse.payloadPlaceholder(inputPayload))
         }
 
-        // seqComponentWskActivationFutures contains a fake activation on the first position in the vector; the rest of the vector is the result of each component execution/activation
-        val seqComponentWskActivationFutures = resolvedFutureActions.scanLeft(initFakeWhiskActivation) {
-            (futureActivationAtomicCntTuple, futureAction) =>
-                futureAction flatMap {
-                    action =>
-                        futureActivationAtomicCntTuple flatMap {
-                            case (activationEither, atomicActionCount, first) =>
-                                activationEither match {
-                                    case Right(activation) =>
-                                        val payload = activation.response.result.map(_.asJsObject)
-                                        // first check conditions on payload that may lead to interrupting the execution of the sequence
-                                        val payloadContent = payload getOrElse JsObject.empty
-                                        val errorFields = payloadContent.getFields(ActivationResponse.ERROR_FIELD)
-                                        // short-circuit the execution of the sequence iff the payload contains an error field and is the result of an action return, not the initial payload
-                                        val errorShortcircuit = !errorFields.isEmpty && !first
-                                        if (!errorShortcircuit) {
-                                            // second check the atomic action count for sequence action limit)
-                                            if (atomicActionCount >= actionSequenceLimit) {
-                                                val activationResponse = ActivationResponse.applicationError(s"$sequenceIsTooLong")
-                                                Future.successful(Left(activationResponse), atomicActionCount, false) // dynamic action count and first don't really matter anymore
-                                            } else {
-                                                val compResultFuture : Future[(Either[ActivationResponse, WhiskActivation], Int)] = invokeSeqOneComponent(user, action, payload, cause, atomicActionCount)
-                                                compResultFuture map {
-                                                    activationDynamicCountPair => (activationDynamicCountPair._1, activationDynamicCountPair._2, false) // it's not first payload anymore
-                                                }
-                                            }
-                                        } else {
-                                            // there is an error field, terminate sequence early
-                                            // propagate the activation response
-                                            Future.successful(Left(activation.response), atomicActionCount, false) // dynamic action count and first don't really matter anymore
-                                        }
-                                    case Left(activationResponse) =>
-                                        // the sequence is interrupted, no more processing
-                                        Future.successful(Left(activationResponse), 0, false) // dynamic action count and first do not matter from now on
-                                }
+        // execute the actions in sequential blocking fashion
+        resolvedFutureActions.foldLeft(initialAccounting) {
+            (accountingFuture, futureAction) =>
+                accountingFuture.flatMap { accounting =>
+                    if (accounting.atomicActionCnt < actionSequenceLimit) {
+                        invokeNextAction(user, futureAction, accounting, cause).flatMap { accounting =>
+                            if (!accounting.shortcircuit) {
+                                Future.successful(accounting)
+                            } else {
+                                // this is to short circuit the fold
+                                Future.failed(FailedSequenceActivation(accounting)) // terminates the fold
+                            }
                         }
-                } recover {
-                    // check any failure here and generate an activation response such that this method always returns a vector of successful futures
-                    case t: Throwable =>
-                        // consider this failure a whisk error
-                        val activationResponse = ActivationResponse.whiskError(sequenceActivationFailure)
-                        (Left(activationResponse), 0, false)
+                    } else {
+                        val updatedAccount = accounting.fail(ActivationResponse.applicationError(sequenceIsTooLong), None)
+                        Future.failed(FailedSequenceActivation(updatedAccount)) // terminates the fold
+                    }
                 }
-        }
-        // drop the first future which contains the init value from scanLeft and project the first two fields from the tuples
-        // the third one was used to treat error property differently for first action vs the rest of the actions in the sequence (not useful past this point)
-        seqComponentWskActivationFutures.drop(1) map {
-            tupleFuture => tupleFuture map { tuple => (tuple._1, tuple._2) }
+        }.recoverWith {
+            // turn the failed accounting back to success; this is the only possible failure
+            // since all throwables are recovered with a failed accounting instance and this is
+            // in turned boxed to FailedSequenceActivation
+            case FailedSequenceActivation(accounting) => Future.successful(accounting)
         }
     }
 
@@ -365,55 +289,191 @@ protected[actions] trait SequenceActions {
      *
      * The method distinguishes between invoking a sequence or an atomic action.
      * @param user the user executing the sequence
-     * @param action the action to be invoked
-     * @param payload the payload for the action
-     * @param cause the activation id of the first sequence containing this action
-     * @param atomicActionCount the number of activations
-     * @return future with the result of the invocation and the dynamic atomic action count so far
+     * @param futureAction the future which fetches the action to be invoked from the db
+     * @param accounting the state of the sequence activation, contains the dynamic activation count, logs and payload for the next action
+     * @param cause the activation id of the first sequence containing this activations
+     * @return a future which resolves with updated accounting for a sequence, including the last result, duration, and activation ids
      */
-    private def invokeSeqOneComponent(user: Identity, action: WhiskAction, payload: Option[JsObject], cause: Option[ActivationId], atomicActionCount: Int)(
-        implicit transid: TransactionId): Future[(Either[ActivationResponse, WhiskActivation], Int)] = {
-        // invoke the action by calling the right method depending on whether it's an atomic action or a sequence
-        // the tuple contains activationId, wskActivation, atomicActionCount (up till this point in execution)
-        val futureWhiskActivationTuple = action.exec match {
-            case SequenceExec(components) =>
-                // invoke a sequence
-                logging.info(this, s"sequence invoking an enclosed sequence $action")
-                // call invokeSequence to invoke the inner sequence
-                // true for blocking; false for topmost
-                invokeSequence(user, action, payload, blocking = true, topmost = false, components, cause, atomicActionCount) map {
-                    case (activationId, wskActivation, seqAtomicActionCnt) =>
-                        (activationId, wskActivation, seqAtomicActionCnt + atomicActionCount)
-                }
-            case _ =>
-                // this is an invoke for an atomic action
-                logging.info(this, s"sequence invoking an enclosed atomic action $action")
-                val timeout = action.limits.timeout.duration + blockingInvokeGrace
-                invokeSingleAction(user, action, payload, timeout, blocking = true, cause) map {
-                    case (activationId, wskActivation) => (activationId, wskActivation, atomicActionCount + 1)
-                }
-        }
+    private def invokeNextAction(
+        user: Identity,
+        futureAction: Future[WhiskAction],
+        accounting: SequenceAccounting,
+        cause: Option[ActivationId])(
+            implicit transid: TransactionId): Future[SequenceAccounting] = {
+        futureAction.flatMap { action =>
+            // the previous response becomes input for the next action in the sequence;
+            // the accounting no longer needs to hold a reference to it once the action is
+            // invoked, so previousResponse.getAndSet(null) drops the reference at this point
+            // which prevents dragging the previous response for the lifetime of the next activation
+            val inputPayload = accounting.previousResponse.getAndSet(null).result.map(_.asJsObject)
+
+            // invoke the action by calling the right method depending on whether it's an atomic action or a sequence
+            val futureWhiskActivationTuple = action.exec match {
+                case SequenceExec(components) =>
+                    logging.info(this, s"sequence invoking an enclosed sequence $action")
+                    // call invokeSequence to invoke the inner sequence
+                    invokeSequence(user, action, inputPayload, blocking = true, topmost = false, components, cause, accounting.atomicActionCnt)
+                case _ =>
+                    // this is an invoke for an atomic action
+                    logging.info(this, s"sequence invoking an enclosed atomic action $action")
+                    val timeout = action.limits.timeout.duration + blockingInvokeGrace
+                    invokeSingleAction(user, action, inputPayload, timeout, blocking = true, cause) map {
+                        case (activationId, wskActivation) => (activationId, wskActivation, accounting.atomicActionCnt + 1)
+                    }
+            }
 
-        futureWhiskActivationTuple map {
-            case (activationId, wskActivation, atomicActionCountSoFar) =>
-                // the activation is None only if the activation could not be retrieved either from active ack or from db
-                wskActivation match {
-                    case Some(activation) => (Right(activation), atomicActionCountSoFar)
-                    case None => {
-                        val activationResponse = ActivationResponse.whiskError(s"$sequenceRetrieveActivationTimeout Activation id '$activationId'.")
-                        (Left(activationResponse), atomicActionCountSoFar) // dynamic count doesn't matter, sequence will be interrupted
+            futureWhiskActivationTuple.map {
+                case (activationId, wskActivation, atomicActionCountSoFar) =>
+                    wskActivation.map {
+                        activation => accounting.maybe(activation, atomicActionCountSoFar, actionSequenceLimit)
+                    }.getOrElse {
+                        // the wskActivation is None only if the result could not be retrieved in time either from active ack or from db
+                        logging.error(this, s"component activation timedout for $activationId")
+                        val activationResponse = ActivationResponse.whiskError(sequenceRetrieveActivationTimeout(activationId))
+                        accounting.fail(activationResponse, Some(activationId))
                     }
-                }
+            }.recover {
+                // check any failure here and generate an activation response to encapsulate
+                // the failure mode; consider this failure a whisk error
+                case t: Throwable =>
+                    logging.error(this, s"component activation failed: $t")
+                    accounting.fail(ActivationResponse.whiskError(sequenceActivationFailure), None)
+            }
         }
     }
 
     /** Replaces default namespaces in a vector of components from a sequence with appropriate namespace. */
     private def resolveDefaultNamespace(components: Vector[FullyQualifiedEntityName], user: Identity): Vector[FullyQualifiedEntityName] = {
-        // if components are part of the default namespace, they contain `_`; replace it!
-        val resolvedComponents = components map { c => FullyQualifiedEntityName(c.path.resolveNamespace(user.namespace), c.name) }
-        resolvedComponents
+        // resolve any namespaces that may appears as "_" (the default namespace)
+        components.map(c => FullyQualifiedEntityName(c.path.resolveNamespace(user.namespace), c.name))
     }
 
     /** Max atomic action count allowed for sequences */
     private lazy val actionSequenceLimit = whiskConfig.actionSequenceLimit.toInt
 }
+
+/**
+ * Cumulative accounting of what happened during the execution of a sequence.
+ *
+ * @param atomicActionCnt the current count of non-sequence (c.f. atomic) actions already invoked
+ * @param previousResponse a reference to the previous activation result which will be nulled out
+ *        when no longer needed (see previousResponse.getAndSet(null) below)
+ * @param logs a mutable buffer that is appended with new activation ids as the sequence unfolds
+ * @param duration the "user" time so far executing the sequence (sum of durations for
+ *        all actions invoked so far which is different from the total time spent executing the sequence)
+ * @param maxMemory the maximum memory annotation observed so far for the
+ *        components (needed to annotate the sequence with GB-s)
+ * @param shortcircuit when true, stops the execution of the next component in the sequence
+ */
+protected[actions] case class SequenceAccounting(
+    atomicActionCnt: Int,
+    previousResponse: AtomicReference[ActivationResponse],
+    logs: mutable.Buffer[ActivationId],
+    duration: Long = 0,
+    maxMemory: Option[Int] = None,
+    shortcircuit: Boolean = false) {
+
+    /** @return the ActivationLogs data structure for this sequence invocation */
+    def finalLogs = ActivationLogs(logs.map(id => id.asString).toVector)
+
+    /** The previous activation was successful. */
+    private def success(activation: WhiskActivation, newCnt: Int, shortcircuit: Boolean = false) = {
+        previousResponse.set(null)
+        SequenceAccounting(
+            prev = this,
+            newCnt = newCnt,
+            shortcircuit = shortcircuit,
+            incrDuration = activation.duration,
+            newResponse = activation.response,
+            newActivationId = activation.activationId,
+            newMemoryLimit = activation.annotations.get("limits") map {
+                limitsAnnotation => // we have a limits annotation
+                    limitsAnnotation.asJsObject.getFields("memory") match {
+                        case Seq(JsNumber(memory)) => Some(memory.toInt) // we have a numerical "memory" field in the "limits" annotation
+                    }
+            } getOrElse { None })
+    }
+
+    /** The previous activation failed (this is used when there is no activation record or an internal error. */
+    def fail(failureResponse: ActivationResponse, activationId: Option[ActivationId]) = {
+        require(!failureResponse.isSuccess)
+        logs.appendAll(activationId)
+        copy(previousResponse = new AtomicReference(failureResponse), shortcircuit = true)
+    }
+
+    /** Determines whether the previous activation succeeded or failed. */
+    def maybe(activation: WhiskActivation, newCnt: Int, maxSequenceCnt: Int) = {
+        // check conditions on payload that may lead to interrupting the execution of the sequence
+        //     short-circuit the execution of the sequence iff the payload contains an error field
+        //     and is the result of an action return, not the initial payload
+        val outputPayload = activation.response.result.map(_.asJsObject)
+        val payloadContent = outputPayload getOrElse JsObject.empty
+        val errorField = payloadContent.fields.get(ActivationResponse.ERROR_FIELD)
+        val withinSeqLimit = newCnt <= maxSequenceCnt
+
+        if (withinSeqLimit && errorField.isEmpty) {
+            // all good with this action invocation
+            success(activation, newCnt)
+        } else {
+            val nextActivation = if (!withinSeqLimit) {
+                // no error in the activation but the dynamic count of actions exceeds the threshold
+                // this is here as defensive code; the activation should not occur if its takes the
+                // count above its limit
+                val newResponse = ActivationResponse.applicationError(sequenceIsTooLong)
+                activation.copy(response = newResponse)
+            } else {
+                assert(errorField.isDefined)
+                activation
+            }
+
+            // there is an error field in the activation response. here, we treat this like success,
+            // in the sense of tallying up the accounting fields, but terminate the sequence early
+            success(nextActivation, newCnt, shortcircuit = true)
+        }
+    }
+}
+
+/**
+ *  Three constructors for SequenceAccounting:
+ *     - one for successful invocation of an action in the sequence,
+ *     - one for failed invocation, and
+ *     - one to initialize things
+ */
+protected[actions] object SequenceAccounting {
+
+    def maxMemory(prevMemoryLimit: Option[Int], newMemoryLimit: Option[Int]): Option[Int] = {
+        (prevMemoryLimit ++ newMemoryLimit).reduceOption(Math.max)
+    }
+
+    // constructor for successful invocations, or error'ing ones (where shortcircuit = true)
+    def apply(
+        prev: SequenceAccounting,
+        newCnt: Int,
+        incrDuration: Option[Long],
+        newResponse: ActivationResponse,
+        newActivationId: ActivationId,
+        newMemoryLimit: Option[Int],
+        shortcircuit: Boolean): SequenceAccounting = {
+
+        // compute the new max memory
+        val newMaxMemory = maxMemory(prev.maxMemory, newMemoryLimit)
+
+        // append log entry
+        prev.logs += newActivationId
+
+        SequenceAccounting(
+            atomicActionCnt = newCnt,
+            previousResponse = new AtomicReference(newResponse),
+            logs = prev.logs,
+            duration = incrDuration map { prev.duration + _ } getOrElse { prev.duration },
+            maxMemory = newMaxMemory,
+            shortcircuit = shortcircuit)
+    }
+
+    // constructor for initial payload
+    def apply(atomicActionCnt: Int, initialPayload: ActivationResponse): SequenceAccounting = {
+        SequenceAccounting(atomicActionCnt, new AtomicReference(initialPayload), mutable.Buffer.empty)
+    }
+}
+
+protected[actions] case class FailedSequenceActivation(accounting: SequenceAccounting) extends Throwable
diff --git a/tests/src/test/scala/system/basic/WskSequenceTests.scala b/tests/src/test/scala/system/basic/WskSequenceTests.scala
index e0dfdef..06f5dea 100644
--- a/tests/src/test/scala/system/basic/WskSequenceTests.scala
+++ b/tests/src/test/scala/system/basic/WskSequenceTests.scala
@@ -60,7 +60,7 @@ class WskSequenceTests
 
     behavior of "Wsk Sequence"
 
-    it should "invoke a blocking sequence action and invoke the updated sequence with normal payload and payload with error field" in withAssetCleaner(wskprops) {
+    it should "invoke a sequence with normal payload and payload with error field" in withAssetCleaner(wskprops) {
         (wp, assetHelper) =>
             val name = "sequence"
             val actions = Seq("split", "sort", "head", "cat")
@@ -109,7 +109,7 @@ class WskSequenceTests
             // result of sequence should be identical to previous invocation above
             val payload = Map("error" -> JsString("irrelevant error string"), "payload" -> args.mkString("\n").toJson)
             val thirdrun = wsk.action.invoke(name, payload)
-            withActivation(wsk.activation, thirdrun, totalWait = 2 *allowedActionDuration) {
+            withActivation(wsk.activation, thirdrun, totalWait = 2 * allowedActionDuration) {
                 activation =>
                     checkSequenceLogsAndAnnotations(activation, 2) // 2 activations in this sequence
                     val result = activation.response.result.get
@@ -118,14 +118,57 @@ class WskSequenceTests
             }
     }
 
+    it should "invoke a sequence with an enclosing sequence action" in withAssetCleaner(wskprops) {
+        (wp, assetHelper) =>
+            val inner_name = "inner_sequence"
+            val outer_name = "outer_sequence"
+            val inner_actions = Seq("sort", "head")
+            val actions = Seq("split") ++ inner_actions ++ Seq("cat")
+            // create atomic actions
+            for (actionName <- actions) {
+                val file = TestUtils.getTestActionFilename(s"$actionName.js")
+                assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
+                    action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration))
+                }
+            }
+
+            // create inner sequence
+            assetHelper.withCleaner(wsk.action, inner_name) {
+                val inner_sequence = inner_actions.mkString(",")
+                (action, _) => action.create(inner_name, Some(inner_sequence), kind = Some("sequence"))
+            }
+
+            // create outer sequence
+            assetHelper.withCleaner(wsk.action, outer_name) {
+                val outer_sequence = Seq("split", "inner_sequence", "cat").mkString(",")
+                (action, _) => action.create(outer_name, Some(outer_sequence), kind = Some("sequence"))
+            }
+
+            val now = "it is now " + new Date()
+            val args = Array("what time is it?", now)
+            val run = wsk.action.invoke(outer_name, Map("payload" -> args.mkString("\n").toJson))
+            withActivation(wsk.activation, run, totalWait = 4 * allowedActionDuration) {
+                activation =>
+                    checkSequenceLogsAndAnnotations(activation, 3) // 3 activations in this sequence
+                    activation.cause shouldBe None // topmost sequence
+                    val result = activation.response.result.get
+                    result.fields.get("payload") shouldBe defined
+                    result.fields.get("length") should not be defined
+                    result.fields.get("lines") shouldBe Some(JsArray(Vector(now.toJson)))
+            }
+    }
+
     /**
      * s -> echo, x, echo
      * x -> echo
      *
      * update x -> <limit-1> echo -- should work
      * run s -> should stop after <limit> echo
+     *
+     * This confirms that a dynamic check on the sequence length holds within the system limit.
+     * This is different from creating a long sequence up front which will report a length error at create time.
      */
-    it should "create a sequence, run it, update one of the atomic actions to a sequence and stop executing the outer sequence when limit reached" in withAssetCleaner(wskprops) {
+    it should "replace atomic component in a sequence that is too long and report invoke error" in withAssetCleaner(wskprops) {
         (wp, assetHelper) =>
             val xName = "xSequence"
             val sName = "sSequence"
@@ -176,52 +219,11 @@ class WskSequenceTests
                     withActivation(wsk.activation, getInnerSeq, totalWait = allowedActionDuration) {
                         innerSeqActivation =>
                             innerSeqActivation.logs.get.size shouldBe (limit - 1)
-                            innerSeqActivation.cause shouldBe defined
-                            innerSeqActivation.cause.get shouldBe (activation.activationId)
+                            innerSeqActivation.cause shouldBe Some(activation.activationId)
                     }
             }
     }
 
-    it should "invoke a blocking sequence action with an enclosing sequence action" in withAssetCleaner(wskprops) {
-        (wp, assetHelper) =>
-            val inner_name = "inner_sequence"
-            val outer_name = "outer_sequence"
-            val inner_actions = Seq("sort", "head")
-            val actions = Seq("split") ++ inner_actions ++ Seq("cat")
-            // create atomic actions
-            for (actionName <- actions) {
-                val file = TestUtils.getTestActionFilename(s"$actionName.js")
-                assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
-                    action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration))
-                }
-            }
-
-            // create inner sequence
-            assetHelper.withCleaner(wsk.action, inner_name) {
-                val inner_sequence = inner_actions.mkString(",")
-                (action, _) => action.create(inner_name, Some(inner_sequence), kind = Some("sequence"))
-            }
-
-            // create outer sequence
-            assetHelper.withCleaner(wsk.action, outer_name) {
-                val outer_sequence = Seq("split", "inner_sequence", "cat").mkString(",")
-                (action, _) => action.create(outer_name, Some(outer_sequence), kind = Some("sequence"))
-            }
-
-            val now = "it is now " + new Date()
-            val args = Array("what time is it?", now)
-            val run = wsk.action.invoke(outer_name, Map("payload" -> args.mkString("\n").toJson))
-            withActivation(wsk.activation, run, totalWait = 4 * allowedActionDuration) {
-                activation =>
-                    checkSequenceLogsAndAnnotations(activation, 3) // 3 activations in this sequence
-                    activation.cause shouldBe None // topmost sequence
-                    val result = activation.response.result.get
-                    result.fields.get("payload") shouldBe defined
-                    result.fields.get("length") should not be defined
-                    result.fields.get("lines") shouldBe Some(JsArray(Vector(now.toJson)))
-            }
-    }
-
     it should "create and run a sequence in a package with parameters" in withAssetCleaner(wskprops) {
         (wp, assetHelper) =>
             val sName = "sSequence"
@@ -294,6 +296,7 @@ class WskSequenceTests
             // action params trump package params
             checkLogsAtomicAction(0, run, new Regex(String.format(".*key0: value0.*key1a: value1a.*key1b: value2b.*key2a: value2a.*payload: %s", now)))
     }
+
     /**
      * s -> apperror, echo
      * only apperror should run
diff --git a/tests/src/test/scala/whisk/core/controller/actions/test/SequenceAccountingTests.scala b/tests/src/test/scala/whisk/core/controller/actions/test/SequenceAccountingTests.scala
new file mode 100644
index 0000000..d3e5918
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/controller/actions/test/SequenceAccountingTests.scala
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package whisk.core.controller.actions.test
+
+import java.time.Instant
+
+import scala.concurrent.duration.DurationInt
+
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+
+import common.WskActorSystem
+import spray.json._
+import whisk.core.controller.actions.SequenceAccounting
+import whisk.core.entity._
+import whisk.core.entity.ActivationResponse
+import whisk.core.entity.size.SizeInt
+import whisk.http.Messages
+
+@RunWith(classOf[JUnitRunner])
+class SequenceAccountingTests extends FlatSpec with Matchers with WskActorSystem {
+
+    behavior of "sequence accounting"
+
+    val okRes1 = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1))))
+    val okRes2 = ActivationResponse.success(Some(JsObject("res" -> JsNumber(2))))
+    val failedRes = ActivationResponse.applicationError(JsNumber(3))
+
+    val okActivation = WhiskActivation(
+        namespace = EntityPath("ns"),
+        name = EntityName("a"),
+        Subject(),
+        activationId = ActivationId(),
+        start = Instant.now(),
+        end = Instant.now(),
+        response = okRes2,
+        annotations = Parameters("limits", ActionLimits(
+            TimeLimit(1.second),
+            MemoryLimit(128.MB),
+            LogLimit(1.MB)).toJson),
+        duration = Some(123))
+
+    val notOkActivation = WhiskActivation(
+        namespace = EntityPath("ns"),
+        name = EntityName("a"),
+        Subject(),
+        activationId = ActivationId(),
+        start = Instant.now(),
+        end = Instant.now(),
+        response = failedRes,
+        annotations = Parameters("limits", ActionLimits(
+            TimeLimit(11.second),
+            MemoryLimit(256.MB),
+            LogLimit(2.MB)).toJson),
+        duration = Some(234))
+
+    it should "create initial accounting object" in {
+        val s = SequenceAccounting(2, okRes1)
+        s.atomicActionCnt shouldBe 2
+        s.previousResponse.get shouldBe okRes1
+        s.logs shouldBe empty
+        s.duration shouldBe 0
+        s.maxMemory shouldBe None
+        s.shortcircuit shouldBe false
+    }
+
+    it should "resolve maybe to success and update accounting object" in {
+        val p = SequenceAccounting(2, okRes1)
+        val n1 = p.maybe(okActivation, 3, 5)
+        n1.atomicActionCnt shouldBe 3
+        n1.previousResponse.get shouldBe okRes2
+        n1.logs.length shouldBe 1
+        n1.logs(0) shouldBe okActivation.activationId
+        n1.duration shouldBe 123
+        n1.maxMemory shouldBe Some(128)
+        n1.shortcircuit shouldBe false
+    }
+
+    it should "resolve maybe and enable short circuit" in {
+        val p = SequenceAccounting(2, okRes1)
+        val n1 = p.maybe(okActivation, 3, 5)
+        val n2 = n1.maybe(notOkActivation, 4, 5)
+        n2.atomicActionCnt shouldBe 4
+        n2.previousResponse.get shouldBe failedRes
+        n2.logs.length shouldBe 2
+        n2.logs(0) shouldBe okActivation.activationId
+        n2.logs(1) shouldBe notOkActivation.activationId
+        n2.duration shouldBe (123 + 234)
+        n2.maxMemory shouldBe Some(256)
+        n2.shortcircuit shouldBe true
+    }
+
+    it should "record an activation that exceeds allowed limit but also short circuit" in {
+        val p = SequenceAccounting(2, okRes1)
+        val n = p.maybe(okActivation, 3, 2)
+        n.atomicActionCnt shouldBe 3
+        n.previousResponse.get shouldBe ActivationResponse.applicationError(Messages.sequenceIsTooLong)
+        n.logs.length shouldBe 1
+        n.logs(0) shouldBe okActivation.activationId
+        n.duration shouldBe 123
+        n.maxMemory shouldBe Some(128)
+        n.shortcircuit shouldBe true
+    }
+
+    it should "set failed response and short circuit on failure" in {
+        val p = SequenceAccounting(2, okRes1)
+        val n = p.maybe(okActivation, 3, 3)
+        val f = n.fail(failedRes, None)
+        f.atomicActionCnt shouldBe 3
+        f.previousResponse.get shouldBe failedRes
+        f.logs.length shouldBe 1
+        f.logs(0) shouldBe okActivation.activationId
+        f.duration shouldBe 123
+        f.maxMemory shouldBe Some(128)
+        f.shortcircuit shouldBe true
+    }
+
+    it should "resolve max memory" in {
+        SequenceAccounting.maxMemory(None, None) shouldBe None
+        SequenceAccounting.maxMemory(None, Some(1)) shouldBe Some(1)
+        SequenceAccounting.maxMemory(Some(1), None) shouldBe Some(1)
+        SequenceAccounting.maxMemory(Some(1), Some(2)) shouldBe Some(2)
+        SequenceAccounting.maxMemory(Some(2), Some(1)) shouldBe Some(2)
+        SequenceAccounting.maxMemory(Some(2), Some(2)) shouldBe Some(2)
+    }
+}
diff --git a/tests/src/test/scala/whisk/utils/test/ExecutionContextFactoryTests.scala b/tests/src/test/scala/whisk/utils/test/ExecutionContextFactoryTests.scala
new file mode 100644
index 0000000..b530d3a
--- /dev/null
+++ b/tests/src/test/scala/whisk/utils/test/ExecutionContextFactoryTests.scala
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package whisk.utils.test
+
+import scala.concurrent.Await
+import scala.concurrent.Future
+import scala.concurrent.duration.DurationInt
+
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+
+import common.WskActorSystem
+import whisk.utils.ExecutionContextFactory.FutureExtensions
+
+@RunWith(classOf[JUnitRunner])
+class ExecutionContextFactoryTests extends FlatSpec with Matchers with WskActorSystem {
+
+    behavior of "future extensions"
+
+    it should "take first to complete" in {
+        val f1 = Future.successful({}).withTimeout(500.millis, new Throwable("error"))
+        Await.result(f1, 1.second) shouldBe ({})
+
+        val failure = new Throwable("error")
+        val f2 = Future { Thread.sleep(1.second.toMillis) }.withTimeout(500.millis, failure)
+        a[Throwable] shouldBe thrownBy { Await.result(f2, 1.seconds) }
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Mime
View raw message