openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cbic...@apache.org
Subject [incubator-openwhisk] branch master updated: Ensure ResultMessage is processed. (#4135)
Date Thu, 29 Nov 2018 07:09:01 GMT
This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f571c3  Ensure ResultMessage is processed. (#4135)
7f571c3 is described below

commit 7f571c32bb8f3155c89f1d96fda4320909e097fd
Author: jiangpch <jiangpengcheng@navercorp.com>
AuthorDate: Thu Nov 29 15:08:48 2018 +0800

    Ensure ResultMessage is processed. (#4135)
---
 .../ShardingContainerPoolBalancer.scala            | 29 ++++++++++------------
 1 file changed, 13 insertions(+), 16 deletions(-)

diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 35a4547..4010cc1 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -175,6 +175,8 @@ class ShardingContainerPoolBalancer(
 
   /** State related to invocations and throttling */
   protected[loadBalancer] val activations = TrieMap[ActivationId, ActivationEntry]()
+  protected[loadBalancer] val blockingPromises =
+    TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
   private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
   private val totalActivations = new LongAdder()
   private val totalActivationMemory = new LongAdder()
@@ -262,9 +264,13 @@ class ShardingContainerPoolBalancer(
 
     chosen
       .map { invoker =>
-        val entry = setupActivation(msg, action, invoker)
+        setupActivation(msg, action, invoker)
         sendActivationToInvoker(messageProducer, msg, invoker).map { _ =>
-          entry.promise.future
+          if (msg.blocking) {
+            blockingPromises.getOrElseUpdate(msg.activationId, Promise[Either[ActivationId,
WhiskActivation]]()).future
+          } else {
+            Future.successful(Left(msg.activationId))
+          }
         }
       }
       .getOrElse {
@@ -313,8 +319,7 @@ class ShardingContainerPoolBalancer(
           action.limits.memory.megabytes.MB,
           action.limits.concurrency.maxConcurrent,
           action.fullyQualifiedName(true),
-          timeoutHandler,
-          Promise[Either[ActivationId, WhiskActivation]]())
+          timeoutHandler)
       })
   }
 
@@ -387,9 +392,7 @@ class ShardingContainerPoolBalancer(
     // Resolve the promise to send the result back to the user
     // The activation will be removed from `activations`-map later, when we receive the completion
message, because the
     // slot of the invoker is not yet free for new activations.
-    activations.get(aid).map { entry =>
-      entry.promise.trySuccess(response)
-    }
+    blockingPromises.remove(aid).map(_.trySuccess(response))
     logging.info(this, s"received result ack for '$aid'")(tid)
   }
 
@@ -422,13 +425,9 @@ class ShardingContainerPoolBalancer(
           .foreach(_.releaseConcurrent(entry.fullyQualifiedEntityName, entry.maxConcurrent,
entry.memory.toMB.toInt))
         if (!forced) {
           entry.timeoutHandler.cancel()
-          // If the action was blocking and the Resultmessage has been received before nothing
will happen here.
-          // If the action was blocking and the ResultMessage is still missing, we pass the
ActivationId. With this Id,
-          // the controller will get the result out of the database.
-          // If the action was non-blocking, we will close the promise here.
-          entry.promise.trySuccess(Left(aid))
         } else {
-          entry.promise.tryFailure(new Throwable("no completion ack received"))
+          // remove blocking promise when timeout, if the ResultMessage is already processed,
this will do nothing
+          blockingPromises.remove(aid).foreach(_.tryFailure(new Throwable("no completion
ack received")))
         }
 
         logging.info(this, s"${if (!forced) "received" else "forced"} completion ack for
'$aid'")(tid)
@@ -717,7 +716,6 @@ case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double,
timeout
  * @param namespaceId namespace that invoked the action
  * @param invokerName invoker the action is scheduled to
  * @param timeoutHandler times out completion of this activation, should be canceled on good
paths
- * @param promise the promise to be completed by the activation
  */
 case class ActivationEntry(id: ActivationId,
                            namespaceId: UUID,
@@ -725,5 +723,4 @@ case class ActivationEntry(id: ActivationId,
                            memory: ByteSize,
                            maxConcurrent: Int,
                            fullyQualifiedEntityName: FullyQualifiedEntityName,
-                           timeoutHandler: Cancellable,
-                           promise: Promise[Either[ActivationId, WhiskActivation]])
+                           timeoutHandler: Cancellable)


Mime
View raw message