openwhisk-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [openwhisk] rabbah commented on a change in pull request #4624: Combines active ack and slot release when both are available.
Date Thu, 19 Sep 2019 14:24:25 GMT
rabbah commented on a change in pull request #4624: Combines active ack and slot release when
both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r326203452
 
 

 ##########
 File path: core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
 ##########
 @@ -145,43 +146,38 @@ class InvokerReactive(
     new MessageFeed("activation", logging, consumer, maxPeek, 1.second, processActivationMessage)
   })
 
-  /** Sends an active-ack. */
   private val ack: InvokerReactive.ActiveAck = (tid: TransactionId,
                                                 activationResult: WhiskActivation,
                                                 blockingInvoke: Boolean,
                                                 controllerInstance: ControllerInstanceId,
                                                 userId: UUID,
-                                                isSlotFree: Boolean) => {
+                                                acknowledegment: AcknowledegmentMessage)
=> {
     implicit val transid: TransactionId = tid
 
-    def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {
-      val msg = if (isSlotFree) {
-        val aid = res.fold(identity, _.activationId)
-        val isWhiskSystemError = res.fold(_ => false, _.response.isWhiskError)
-        CompletionMessage(transid, aid, isWhiskSystemError, instance)
-      } else {
-        ResultMessage(transid, res)
-      }
-
+    def send(msg: AcknowledegmentMessage, recovery: Boolean = false) = {
       producer.send(topic = "completed" + controllerInstance.asString, msg).andThen {
         case Success(_) =>
-          logging.info(
-            this,
-            s"posted ${if (recovery) "recovery" else "completion"} of activation ${activationResult.activationId}")
+          val info = if (recovery) s"recovery ${msg.messageType}" else msg.messageType
+          logging.info(this, s"posted $info of activation ${activationResult.activationId}")
       }
     }
 
     // UserMetrics are sent, when the slot is free again. This ensures, that all metrics
are sent.
-    if (UserEvents.enabled && isSlotFree) {
-      EventMessage.from(activationResult, s"invoker${instance.instance}", userId) match {
-        case Success(msg) => UserEvents.send(producer, msg)
-        case Failure(t)   => logging.error(this, s"activation event was not sent: $t")
+    if (UserEvents.enabled && acknowledegment.isSlotFree.nonEmpty) {
+      acknowledegment.result.foreach {
+        case Right(activationResult) =>
+          EventMessage.from(activationResult, s"invoker${instance.instance}", userId) match
{
+            case Success(msg) => UserEvents.send(producer, msg)
+            case Failure(t)   => logging.error(this, s"activation event was not sent:
$t")
+          }
+        case _ => // all acknowledegment messages should have a result
       }
 
 Review comment:
   @sven-lange-last this is an alternate implementation but does not remove the `activation`
parameter which turns out to be more extensive refactoring. We can leave this change in place
and remove the parameter as part of addressing @chetanmeh's comment about turning this function
into a trait implementation.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message