openwhisk-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [openwhisk] rabbah commented on a change in pull request #4624: Combines active ack and slot release when both are available.
Date Thu, 19 Sep 2019 14:26:15 GMT
rabbah commented on a change in pull request #4624: Combines active ack and slot release when
both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r326204785
 
 

 ##########
 File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
 ##########
 @@ -64,100 +64,165 @@ case class ActivationMessage(override val transid: TransactionId,
   def causedBySequence: Boolean = cause.isDefined
 }
 
-object ActivationMessage extends DefaultJsonProtocol {
-
-  def parse(msg: String) = Try(serdes.read(msg.parseJson))
-
-  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-  implicit val serdes = jsonFormat11(ActivationMessage.apply)
-}
-
 /**
  * Message that is sent from the invoker to the controller after action is completed or after
slot is free again for
  * new actions.
  */
 abstract class AcknowledegmentMessage(private val tid: TransactionId) extends Message {
   override val transid: TransactionId = tid
-  override def serialize: String = {
-    AcknowledegmentMessage.serdes.write(this).compactPrint
-  }
+  override def serialize: String = AcknowledegmentMessage.serdes.write(this).compactPrint
+
+  /** Pithy descriptor for logging. */
+  def name: String
+
+  /** Does message indicate slot is free? */
+  def isSlotFree: Option[InvokerInstanceId]
+
+  /** Does message contain a result? */
+  def result: Option[Either[ActivationId, WhiskActivation]]
+
+  /**
+   * Is the acknowledgement for an activation that failed internally?
+   * For some message, this is not relevant and the result is None.
+   */
+  def isSystemError: Option[Boolean]
+
+  def activationId: ActivationId
+
+  /** Serializes the message to JSON. */
+  def toJson: JsValue
+
+  /**
+   * Converts the message to a more compact form if it cannot cross the message bus as is
or some of its details are not necessary.
+   */
+  def shrink: AcknowledegmentMessage
 }
 
 /**
- * This message is sent from the invoker to the controller, after the slot of an invoker
that has been used by the
- * current action, is free again (after log collection)
+ * This message is sent from an invoker to the controller in situtations when the resource
slot and the action
+ * result are available at the same time, and so the split-phase notification is not necessary.
Instead the message
+ * combines the `CompletionMessage` and `ResultMessage`. The `response` may be an `ActivationId`
to allow for failures
+ * to send the activation result because of event-bus size limitations.
  */
-case class CompletionMessage(override val transid: TransactionId,
-                             activationId: ActivationId,
-                             isSystemError: Boolean,
-                             invoker: InvokerInstanceId)
+case class CombinedCompletionAndResultMessage(override val transid: TransactionId,
+                                              response: Either[ActivationId, WhiskActivation],
+                                              override val isSystemError: Option[Boolean],
+                                              invoker: InvokerInstanceId)
     extends AcknowledegmentMessage(transid) {
-
-  override def toString = {
-    activationId.asString
-  }
+  override def name = "combined"
+  override def result = Some(response)
+  override def isSlotFree = Some(invoker)
+  override def activationId = response.fold(identity, _.activationId)
+  override def toJson = CombinedCompletionAndResultMessage.serdes.write(this)
+  override def shrink = copy(response = response.flatMap(a => Left(a.activationId)))
+  override def toString = response.fold(identity, _.activationId).asString
 }
 
-object CompletionMessage extends DefaultJsonProtocol {
-  def parse(msg: String): Try[CompletionMessage] = Try(serdes.read(msg.parseJson))
-  implicit val serdes = jsonFormat4(CompletionMessage.apply)
+/**
+ * This message is sent from an invoker to the controller, once the resource slot in the
invoker (used by the
+ * corresponding activation) free again (i.e., after log collection). The `CompletionMessage`
is part of a split
+ * phase notification to the load balancer where an invoker first sends a `ResultMessage`
and later sends the
+ * `CompletionMessage`.
+ */
+case class CompletionMessage(override val transid: TransactionId,
+                             override val activationId: ActivationId,
+                             override val isSystemError: Option[Boolean],
+                             invoker: InvokerInstanceId)
+    extends AcknowledegmentMessage(transid) {
+  override def name = "completion"
+  override def result = None
+  override def isSlotFree = Some(invoker)
+  override def toJson = CompletionMessage.serdes.write(this)
+  override def shrink = this
+  override def toString = activationId.asString
 }
 
 /**
- * That message will be sent from the invoker to the controller after action completion if
the user wants to have
- * the result immediately (blocking activation).
- * When adding fields, the serdes of the companion object must be updated also.
- * The whisk activation field will have its logs stripped.
+ * This message is sent from an invoker to the load balancer once an action result is available
for blocking actions.
+ * This is part of a split phase notification, and does not indicate that the slot is available,
which is indicated with
+ * a `CompletionMessage`. Note that activation record will not contain any logs from the
action execution, only the result.
  */
 case class ResultMessage(override val transid: TransactionId, response: Either[ActivationId,
WhiskActivation])
 
 Review comment:
   Note that I pushed a change to tighten the constructors - you cannot cons a result with
a Left - only a Right. To convert a Right to a Left you must use `shrink`. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message