openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeremiaswer...@apache.org
Subject [incubator-openwhisk] branch master updated: add additional metrics and logs (#2968)
Date Wed, 07 Feb 2018 14:47:17 GMT
This is an automated email from the ASF dual-hosted git repository.

jeremiaswerner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fc5a60  add additional metrics and logs (#2968)
2fc5a60 is described below

commit 2fc5a6001797ca78b974d4bb56f7b860b54699e7
Author: Martin Henke <martin.henke@web.de>
AuthorDate: Wed Feb 7 15:47:15 2018 +0100

    add additional metrics and logs (#2968)
    
    * add additional metrics and logs
    
    * move logging out of container creation logic
    
    * consolidate log messages
---
 .../src/main/scala/whisk/common/Logging.scala      | 10 +++-
 .../whisk/core/database/CouchDbRestStore.scala     |  7 ++-
 .../core/loadBalancer/ContainerPoolBalancer.scala  | 11 ++++-
 .../whisk/core/containerpool/ContainerPool.scala   | 54 ++++++++++++++++------
 4 files changed, 59 insertions(+), 23 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index 5a18ee3..e681eb5 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -191,17 +191,18 @@ object MetricEmitter {
 
   val metrics = Kamon.metrics
 
-  def emitCounterMetric(token: LogMarkerToken) = {
+  def emitCounterMetric(token: LogMarkerToken): Unit = {
     metrics
       .counter(token.toString)
       .increment(1)
   }
 
-  def emitHistogramMetric(token: LogMarkerToken, value: Long) = {
+  def emitHistogramMetric(token: LogMarkerToken, value: Long): Unit = {
     metrics
       .histogram(token.toString)
       .record(value)
   }
+
 }
 
 object LoggingMarkers {
@@ -241,6 +242,8 @@ object LoggingMarkers {
   // Check invoker healthy state from loadbalancer
   val LOADBALANCER_INVOKER_OFFLINE = LogMarkerToken(loadbalancer, "invokerOffline", count)
   val LOADBALANCER_INVOKER_UNHEALTHY = LogMarkerToken(loadbalancer, "invokerUnhealthy", count)
+  def LOADBALANCER_ACTIVATION_START(namespaceId: String) =
+    LogMarkerToken(loadbalancer, s"activations_$namespaceId", count)
 
   // Time that is needed to execute the action
   val INVOKER_ACTIVATION_RUN = LogMarkerToken(invoker, "activationRun", start)
@@ -255,6 +258,8 @@ object LoggingMarkers {
   val INVOKER_ACTIVATION = LogMarkerToken(invoker, activation, start)
   def INVOKER_DOCKER_CMD(cmd: String) = LogMarkerToken(invoker, s"docker.$cmd", start)
   def INVOKER_RUNC_CMD(cmd: String) = LogMarkerToken(invoker, s"runc.$cmd", start)
+  def INVOKER_CONTAINER_START(actionName: String, namespaceName: String, containerState:
String) =
+    LogMarkerToken(invoker, s"container_start_${containerState}_${namespaceName}_$actionName",
count)
 
   /*
    * General markers
@@ -268,4 +273,5 @@ object LoggingMarkers {
   val DATABASE_QUERY = LogMarkerToken(database, "queryView", start)
   val DATABASE_ATT_GET = LogMarkerToken(database, "getDocumentAttachment", start)
   val DATABASE_ATT_SAVE = LogMarkerToken(database, "saveDocumentAttachment", start)
+  val DATABASE_BATCH_SIZE = LogMarkerToken(database, "batchSize", count)
 }
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index 6832bc3..efc03cc 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -20,7 +20,6 @@ package whisk.core.database
 import scala.concurrent.Await
 import scala.concurrent.Future
 import scala.concurrent.duration._
-
 import akka.actor.ActorSystem
 import akka.event.Logging.ErrorLevel
 import akka.http.scaladsl.model._
@@ -28,9 +27,7 @@ import akka.stream.ActorMaterializer
 import akka.stream.scaladsl._
 import akka.util.ByteString
 import spray.json._
-import whisk.common.Logging
-import whisk.common.LoggingMarkers
-import whisk.common.TransactionId
+import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
 import whisk.core.entity.BulkEntityResult
 import whisk.core.entity.DocInfo
 import whisk.core.entity.DocRevision
@@ -141,6 +138,8 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol:
St
     val count = ds.size
     val start = transid.started(this, LoggingMarkers.DATABASE_BULK_SAVE, s"'$dbName' saving
$count documents")
 
+    MetricEmitter.emitHistogramMetric(LoggingMarkers.DATABASE_BATCH_SIZE, ds.size)
+
     val f = client.putDocs(ds).map {
       _ match {
         case Right(response) =>
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
index 786a94a..5d8b22d 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
@@ -34,6 +34,8 @@ import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.spi.SpiLoader
 import akka.event.Logging.InfoLevel
 
+import pureconfig._
+
 import scala.annotation.tailrec
 import scala.concurrent.duration._
 import scala.concurrent.{ExecutionContext, Future, Promise}
@@ -153,6 +155,12 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance:
InstanceId)
           processCompletion(Left(activationId), transid, forced = true, invoker = invokerName)
         }
 
+        transid.mark(
+          this,
+          LoggingMarkers.LOADBALANCER_ACTIVATION_START(namespaceId.asString),
+          s"loadbalancer: activation started for namespace $namespaceId and activation $activationId",
+          logLevel = InfoLevel)
+
         // please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g.
Success
         ActivationEntry(
           activationId,
@@ -176,8 +184,7 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)
     val start = transid.started(
       this,
       LoggingMarkers.CONTROLLER_KAFKA,
-      s"posting topic '$topic' with activation id '${msg.activationId}'",
-      logLevel = InfoLevel)
+      s"posting topic '$topic' with activation id '${msg.activationId}'")
 
     producer.send(topic, msg).andThen {
       case Success(status) =>
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index 3e83fca..d0945ab 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -18,12 +18,11 @@
 package whisk.core.containerpool
 
 import scala.collection.immutable
-import akka.actor.Actor
-import akka.actor.ActorRef
-import akka.actor.ActorRefFactory
-import akka.actor.Props
-import whisk.common.AkkaLogging
-import whisk.common.TransactionId
+
+import whisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
+
 import whisk.core.entity.ByteSize
 import whisk.core.entity.CodeExec
 import whisk.core.entity.EntityName
@@ -80,6 +79,18 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
     }
   }
 
+  def logContainerStart(r: Run, containerState: String): Unit = {
+    val namespaceName = r.msg.user.namespace.name
+    val actionName = r.action.name.name
+    val activationId = r.msg.activationId.toString
+
+    r.msg.transid.mark(
+      this,
+      LoggingMarkers.INVOKER_CONTAINER_START(actionName, namespaceName, containerState),
+      s"containerStart containerState: $containerState action: $actionName namespace: $namespaceName
activationId: $activationId",
+      akka.event.Logging.InfoLevel)
+  }
+
   def receive: Receive = {
     // A job to run on a container
     //
@@ -87,33 +98,46 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
     // their requests and send them back to the pool for rescheduling (this may happen if
"docker" operations
     // fail for example, or a container has aged and was destroying itself when a new request
was assigned)
     case r: Run =>
-      val container = if (busyPool.size < maxActiveContainers) {
+      val createdContainer = if (busyPool.size < maxActiveContainers) {
+
         // Schedule a job to a warm container
         ContainerPool
           .schedule(r.action, r.msg.user.namespace, freePool)
+          .map(container => {
+            (container, "warm")
+          })
           .orElse {
             if (busyPool.size + freePool.size < maxPoolSize) {
-              takePrewarmContainer(r.action).orElse {
-                Some(createContainer())
-              }
+              takePrewarmContainer(r.action)
+                .map(container => {
+                  (container, "prewarmed")
+                })
+                .orElse {
+                  Some(createContainer(), "cold")
+                }
             } else None
           }
           .orElse {
             // Remove a container and create a new one for the given job
             ContainerPool.remove(freePool).map { toDelete =>
               removeContainer(toDelete)
-              takePrewarmContainer(r.action).getOrElse {
-                createContainer()
-              }
+              takePrewarmContainer(r.action)
+                .map(container => {
+                  (container, "recreated")
+                })
+                .getOrElse {
+                  (createContainer(), "recreated")
+                }
             }
           }
       } else None
 
-      container match {
-        case Some((actor, data)) =>
+      createdContainer match {
+        case Some(((actor, data), containerState)) =>
           busyPool = busyPool + (actor -> data)
           freePool = freePool - actor
           actor ! r // forwards the run request to the container
+          logContainerStart(r, containerState)
         case None =>
           // this can also happen if createContainer fails to start a new container, or
           // if a job is rescheduled but the container it was allocated to has not yet destroyed
itself

-- 
To stop receiving notification emails like this one, please contact
jeremiaswerner@apache.org.

Mime
View raw message