openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
Subject [incubator-openwhisk] branch master updated: Wait for logs based on intervals not based on total processing time. (#3273)
Date Mon, 12 Feb 2018 09:59:31 GMT
This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository

The following commit(s) were added to refs/heads/master by this push:
     new ebe7788  Wait for logs based on intervals not based on total processing time. (#3273)
ebe7788 is described below

commit ebe7788b5261bb845e43b74525fa2ad2cacbf80b
Author: Markus Thömmes <>
AuthorDate: Mon Feb 12 10:59:28 2018 +0100

    Wait for logs based on intervals not based on total processing time. (#3273)
    Writing a large chunk of logs can take quite some time to process. The standard timeout
for this process is 2 seconds today. It is bounded, because an action developer might break
the action proxy to make sentinels not appear at all which would cause us to infinitely wait
on sentinels.
    As we process logs after an activation has run though, we can safely rely on the time
**between** two logs not exceeding a certain threshold. That way, the complete processing
is not bounded by some arbitrary timeout (which can even be too short for large volumes) and
is still tight enough to exit early if sentinels really are missing.
    Furthermore, an error line is inserted if this timeout hits to inform the user that something
might've gone wrong.
 .../core/containerpool/docker/DockerContainer.scala      | 16 ++++++++++------
 .../containerpool/docker/test/DockerContainerTests.scala |  7 +++----
 2 files changed, 13 insertions(+), 10 deletions(-)

diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index 265a450..5d3083c 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -18,6 +18,7 @@
 package whisk.core.containerpool.docker
 import java.time.Instant
+import java.util.concurrent.TimeoutException
 import java.util.concurrent.atomic.AtomicLong
@@ -80,7 +81,7 @@ object DockerContainer {
                                                             as: ActorSystem,
                                                             ec: ExecutionContext,
                                                             log: Logging): Future[DockerContainer]
= {
-    implicit val tid = transid
+    implicit val tid: TransactionId = transid
     val environmentArgs = environment.flatMap {
       case (key, value) => Seq("-e", s"$key=$value")
@@ -246,18 +247,21 @@ class DockerContainer(protected val id: ContainerId,
       .via(new CompleteAfterOccurrences(_.containsSlice(DockerContainer.ActivationSentinel),
2, waitForSentinel))
+      // As we're reading the logs after the activation has finished the invariant is that
all loglines are already
+      // written and we mostly await them being flushed by the docker daemon. Therefore we
can timeout based on the time
+      // between two loglines appear without relying on the log frequency in the action itself.
+      .idleTimeout(waitForLogs)
       .recover {
         case _: StreamLimitReachedException =>
           // While the stream has already ended by failing the limitWeighted stage above,
we inject a truncation
           // notice downstream, which will be processed as usual. This will be the last element
of the stream.
           ByteString(LogLine(, "stderr", Messages.truncateLogs(limit)).toJson.compactPrint)
-        case _: OccurrencesNotFoundException | _: FramingException =>
+        case _: OccurrencesNotFoundException | _: FramingException | _: TimeoutException
           // Stream has already ended and we insert a notice that data might be missing from
the logs. While a
           // FramingException can also mean exceeding the limits, we cannot decide which
case happened so we resort
           // to the general error message. This will be the last element of the stream.
           ByteString(LogLine(, "stderr", Messages.logFailure).toJson.compactPrint)
-      .takeWithin(waitForLogs)
   /** Delimiter used to split log-lines as written by the json-log-driver. */
@@ -279,9 +283,9 @@ class DockerContainer(protected val id: ContainerId,
 class CompleteAfterOccurrences[T](isInEvent: T => Boolean, neededOccurrences: Int, errorOnNotEnough:
     extends GraphStage[FlowShape[T, T]] {
-  val in = Inlet[T]("")
-  val out = Outlet[T]("WaitForOccurances.out")
-  override val shape = FlowShape.of(in, out)
+  val in: Inlet[T] = Inlet[T]("")
+  val out: Outlet[T] = Outlet[T]("WaitForOccurrences.out")
+  override val shape: FlowShape[T, T] = FlowShape.of(in, out)
   override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
     new GraphStageLogic(shape) with InHandler with OutHandler {
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 5f9898e..ea77810 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -34,7 +34,6 @@ import org.scalamock.scalatest.MockFactory
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.FlatSpec
 import whisk.core.containerpool.logging.{DockerToActivationLogStore, LogLine}
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.Matchers
 import common.{StreamLogging, WskActorSystem}
@@ -49,7 +48,6 @@ import whisk.core.entity.ActivationResponse.ContainerResponse
 import whisk.core.entity.ActivationResponse.Timeout
 import whisk.core.entity.size._
 import whisk.http.Messages
 import whisk.core.entity.size._
@@ -583,8 +581,9 @@ class DockerContainerTests
     docker.rawContainerLogsInvocations should have size 1
-    processedLog should have size expectedLog.length
-    processedLog shouldBe
+    processedLog should have size expectedLog.length + 1 //error log should be appended
+    processedLog.head shouldBe expectedLog.head.toFormattedString
+    processedLog(1) should include(Messages.logFailure)
   it should "truncate logs and advance reading position to end of current read" in {

To stop receiving notification emails like this one, please contact

View raw message