Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 86AAB200D0A for ; Wed, 4 Oct 2017 20:00:31 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 84C4F1609DD; Wed, 4 Oct 2017 18:00:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2A6531609D6 for ; Wed, 4 Oct 2017 20:00:30 +0200 (CEST) Received: (qmail 53851 invoked by uid 500); 4 Oct 2017 18:00:29 -0000 Mailing-List: contact commits-help@openwhisk.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@openwhisk.apache.org Delivered-To: mailing list commits@openwhisk.apache.org Received: (qmail 53842 invoked by uid 99); 4 Oct 2017 18:00:29 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Oct 2017 18:00:29 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 7540D819A6; Wed, 4 Oct 2017 18:00:26 +0000 (UTC) Date: Wed, 04 Oct 2017 18:00:26 +0000 To: "commits@openwhisk.apache.org" Subject: [incubator-openwhisk] branch master updated: Notify the user if a container is killed due to memory exhaustion. (#2827) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <150714002649.18551.10084060460966513616@gitbox.apache.org> From: rabbah@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-openwhisk X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 8bb248a65d92f3da5dd975ca9cf945a148bf7eed X-Git-Newrev: 1e27a81195c646239ab025c90d08684b2f7c7d74 X-Git-Rev: 1e27a81195c646239ab025c90d08684b2f7c7d74 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated archived-at: Wed, 04 Oct 2017 18:00:31 -0000 This is an automated email from the ASF dual-hosted git repository. rabbah pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git The following commit(s) were added to refs/heads/master by this push: new 1e27a81 Notify the user if a container is killed due to memory exhaustion. (#2827) 1e27a81 is described below commit 1e27a81195c646239ab025c90d08684b2f7c7d74 Author: Markus Thömmes AuthorDate: Wed Oct 4 20:00:23 2017 +0200 Notify the user if a container is killed due to memory exhaustion. (#2827) If a user-container runs out of memory, the HTTP connections is abruptly aborted and the user has no chance to get further evidence into why her action failed. Docker actually provides that information so it can be checked on an abrupt connection termination whether the container was indeed aborted by the OOM killer. --- .../scala/whisk/core/containerpool/Container.scala | 8 +-- .../scala/whisk/core/containerpool/HttpUtils.scala | 6 +- .../scala/whisk/core/entity/ActivationResult.scala | 17 ++++-- .../src/main/scala/whisk/http/ErrorResponse.scala | 1 + .../core/containerpool/docker/DockerClient.scala | 14 +++++ .../docker/DockerClientWithFileAccess.scala | 5 ++ .../containerpool/docker/DockerContainer.scala | 66 +++++++++++++++++++--- .../test/DockerClientWithFileAccessTests.scala | 34 +++++++++++ .../docker/test/DockerContainerTests.scala | 15 +++-- .../whisk/core/limits/ActionLimitsTests.scala | 22 ++++++-- 10 files changed, 156 insertions(+), 32 deletions(-) diff --git a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala index 0cecbe6..2e913ab 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala @@ -57,7 +57,7 @@ trait Container { protected implicit val ec: ExecutionContext /** HTTP connection to the container, will be lazily established by callContainer */ - private var httpConnection: Option[HttpUtils] = None + protected var httpConnection: Option[HttpUtils] = None /** Stops the container from consuming CPU cycles. */ def suspend()(implicit transid: TransactionId): Future[Unit] @@ -147,10 +147,8 @@ trait Container { * @param timeout timeout of the request * @param retry whether or not to retry the request */ - protected def callContainer(path: String, - body: JsObject, - timeout: FiniteDuration, - retry: Boolean = false): Future[RunResult] = { + protected def callContainer(path: String, body: JsObject, timeout: FiniteDuration, retry: Boolean = false)( + implicit transid: TransactionId): Future[RunResult] = { val started = Instant.now() val http = httpConnection.getOrElse { val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, 1.MB) diff --git a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala index 2815068..e0fd37f 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala @@ -49,7 +49,7 @@ import whisk.core.entity.size.SizeLong * determined why that is. * * @param hostname the host name - * @param timeoutMsec the timeout in msecs to wait for a response + * @param timeout the timeout in msecs to wait for a response * @param maxResponse the maximum size in bytes the connection will accept */ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse: ByteSize) { @@ -68,7 +68,7 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe * @param retry whether or not to retry on connection failure * @return Left(Error Message) or Right(Status Code, Response as UTF-8 String) */ - def post(endpoint: String, body: JsValue, retry: Boolean): Either[ContainerConnectionError, ContainerResponse] = { + def post(endpoint: String, body: JsValue, retry: Boolean): Either[ContainerHttpError, ContainerResponse] = { val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8) entity.setContentType("application/json") @@ -81,7 +81,7 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe private def execute(request: HttpRequestBase, timeoutMsec: Integer, - retry: Boolean): Either[ContainerConnectionError, ContainerResponse] = { + retry: Boolean): Either[ContainerHttpError, ContainerResponse] = { Try(connection.execute(request)).map { response => val containerResponse = Option(response.getEntity) .map { entity => diff --git a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala index 51d195d..85a9f36 100644 --- a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala +++ b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala @@ -95,10 +95,13 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol { /** * Class of errors for invoker-container communication. */ - protected[core] sealed abstract class ContainerConnectionError - protected[core] case class ConnectionError(t: Throwable) extends ContainerConnectionError - protected[core] case class NoResponseReceived() extends ContainerConnectionError - protected[core] case class Timeout() extends ContainerConnectionError + protected[core] sealed trait ContainerConnectionError + protected[core] sealed trait ContainerHttpError extends ContainerConnectionError + protected[core] case class ConnectionError(t: Throwable) extends ContainerHttpError + protected[core] case class NoResponseReceived() extends ContainerHttpError + protected[core] case class Timeout() extends ContainerHttpError + + protected[core] case class MemoryExhausted() extends ContainerConnectionError /** * @param statusCode the container HTTP response code (e.g., 200 OK) @@ -154,6 +157,9 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol { containerError(truncatedResponse(str, length, maxlength)) } + case Left(_: MemoryExhausted) => + containerError(memoryExhausted) + case Left(e) => // This indicates a terminal failure in the container (it exited prematurely). containerError(abnormalInitialization) @@ -205,6 +211,9 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol { containerError(truncatedResponse(str, length, maxlength)) } + case Left(_: MemoryExhausted) => + containerError(memoryExhausted) + case Left(e) => // This indicates a terminal failure in the container (it exited prematurely). containerError(abnormalRun) diff --git a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala index 760b9f9..09c3bfe 100644 --- a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala +++ b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala @@ -112,6 +112,7 @@ object Messages { /** Error messages for activations. */ val abnormalInitialization = "The action did not initialize and exited unexpectedly." val abnormalRun = "The action did not produce a valid response and exited unexpectedly." + val memoryExhausted = "The action exhausted its memory and was aborted." def badEntityName(value: String) = s"Parameter is not a valid value for a entity name: $value" def badNamespace(value: String) = s"Parameter is not a valid value for a namespace: $value" def badEpoch(value: String) = s"Parameter is not a valid value for epoch seconds: $value" diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala index 28b5b7a..d714a9f 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala @@ -20,6 +20,7 @@ package whisk.core.containerpool.docker import java.io.FileNotFoundException import java.nio.file.Files import java.nio.file.Paths + import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.Failure @@ -29,6 +30,7 @@ import akka.event.Logging.ErrorLevel import whisk.common.Logging import whisk.common.LoggingMarkers import whisk.common.TransactionId + import scala.collection.concurrent.TrieMap import whisk.core.containerpool.ContainerId import whisk.core.containerpool.ContainerAddress @@ -100,6 +102,9 @@ class DockerClient(dockerHost: Option[String] = None)(executionContext: Executio runCmd("pull", image).map(_ => ()).andThen { case _ => pullsInFlight.remove(image) } }) + def isOomKilled(id: ContainerId)(implicit transid: TransactionId): Future[Boolean] = + runCmd("inspect", id.asString, "--format", "{{.State.OOMKilled}}").map(_.toBoolean) + private def runCmd(args: String*)(implicit transid: TransactionId): Future[String] = { val cmd = dockerCmd ++ args val start = transid.started(this, LoggingMarkers.INVOKER_DOCKER_CMD(args.head), s"running ${cmd.mkString(" ")}") @@ -175,4 +180,13 @@ trait DockerApi { * @return a Future completing once the pull is complete */ def pull(image: String)(implicit transid: TransactionId): Future[Unit] + + /** + * Determines whether the given container was killed due to + * memory constraints. + * + * @param id the id of the container to check + * @return a Future containing whether the container was killed or not + */ + def isOomKilled(id: ContainerId)(implicit transid: TransactionId): Future[Boolean] } diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala index 444c365..72e1eb9 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala @@ -130,6 +130,11 @@ class DockerClientWithFileAccess( } } + override def isOomKilled(id: ContainerId)(implicit transid: TransactionId): Future[Boolean] = + configFileContents(containerConfigFile(id)) + .map(_.fields("State").asJsObject.fields("OOMKilled").convertTo[Boolean]) + .recover { case _ => false } + // See extended trait for description def rawContainerLogs(containerId: ContainerId, fromPos: Long): Future[ByteBuffer] = Future { blocking { // Needed due to synchronous file operations diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala index a63c304..89960e3 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala @@ -18,8 +18,10 @@ package whisk.core.containerpool.docker import java.nio.charset.StandardCharsets +import java.time.Instant import akka.actor.ActorSystem +import spray.json._ import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -27,11 +29,8 @@ import scala.concurrent.duration._ import scala.util.Failure import whisk.common.Logging import whisk.common.TransactionId -import whisk.core.containerpool.BlackboxStartupError -import whisk.core.containerpool.Container -import whisk.core.containerpool.ContainerId -import whisk.core.containerpool.ContainerAddress -import whisk.core.containerpool.WhiskContainerStartupError +import whisk.core.containerpool._ +import whisk.core.entity.ActivationResponse.{ConnectionError, MemoryExhausted} import whisk.core.entity.ByteSize import whisk.core.entity.size._ @@ -132,8 +131,9 @@ class DockerContainer(protected val id: ContainerId, protected val addr: Contain /** The last read-position in the log file */ private var logFileOffset = 0L - protected val logsRetryCount = 15 - protected val logsRetryWait = 100.millis + protected val waitForLogs: FiniteDuration = 2.seconds + protected val waitForOomState: FiniteDuration = 2.seconds + protected val filePollInterval: FiniteDuration = 100.milliseconds def suspend()(implicit transid: TransactionId): Future[Unit] = runc.pause(id) def resume()(implicit transid: TransactionId): Future[Unit] = runc.resume(id) @@ -143,6 +143,54 @@ class DockerContainer(protected val id: ContainerId, protected val addr: Contain } /** + * Was the container killed due to memory exhaustion? + * + * Retries because as all docker state-relevant operations, they won't + * be reflected by the respective commands immediately but will take + * some time to be propagated. + * + * @param retries number of retries to make + * @return a Future indicating a memory exhaustion situation + */ + private def isOomKilled(retries: Int = (waitForOomState / filePollInterval).toInt)( + implicit transid: TransactionId): Future[Boolean] = { + docker.isOomKilled(id)(TransactionId.invoker).flatMap { killed => + if (killed) Future.successful(true) + else if (retries > 0) akka.pattern.after(filePollInterval, as.scheduler)(isOomKilled(retries - 1)) + else Future.successful(false) + } + } + + override protected def callContainer(path: String, body: JsObject, timeout: FiniteDuration, retry: Boolean = false)( + implicit transid: TransactionId): Future[RunResult] = { + val started = Instant.now() + val http = httpConnection.getOrElse { + val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, 1.MB) + httpConnection = Some(conn) + conn + } + Future { + http.post(path, body, retry) + }.flatMap { response => + val finished = Instant.now() + + response.left + .map { + // Only check for memory exhaustion if there was a + // terminal connection error. + case error: ConnectionError => + isOomKilled().map { + case true => MemoryExhausted() + case false => error + } + case other => Future.successful(other) + } + .fold(_.map(Left(_)), right => Future.successful(Right(right))) + .map(res => RunResult(Interval(started, finished), res)) + } + } + + /** * Obtains the container's stdout and stderr output and converts it to our own JSON format. * At the moment, this is done by reading the internal Docker log file for the container. * Said file is written by Docker's JSON log driver and has a "well-known" location and name. @@ -176,7 +224,7 @@ class DockerContainer(protected val id: ContainerId, protected val addr: Contain if (retries > 0 && !isComplete && !isTruncated) { logging.info(this, s"log cursor advanced but missing sentinel, trying $retries more times") - akka.pattern.after(logsRetryWait, as.scheduler)(readLogs(retries - 1)) + akka.pattern.after(filePollInterval, as.scheduler)(readLogs(retries - 1)) } else { logFileOffset += rawLogBytes.position - rawLogBytes.arrayOffset Future.successful(formattedLogs) @@ -188,7 +236,7 @@ class DockerContainer(protected val id: ContainerId, protected val addr: Contain } } - readLogs(logsRetryCount) + readLogs((waitForLogs / filePollInterval).toInt) } } diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala index 7c6acbf..8b3a060 100644 --- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala @@ -39,6 +39,7 @@ import org.scalatest.fixture.{FlatSpec => FixtureFlatSpec} import common.StreamLogging import spray.json._ +import spray.json.DefaultJsonProtocol._ import whisk.common.TransactionId import whisk.core.containerpool.ContainerId import whisk.core.containerpool.ContainerAddress @@ -110,6 +111,39 @@ class DockerClientWithFileAccessTestsIp extends FlatSpec with Matchers with Stre } } +@RunWith(classOf[JUnitRunner]) +class DockerClientWithFileAccessTestsOom extends FlatSpec with Matchers with StreamLogging with BeforeAndAfterEach { + override def beforeEach = stream.reset() + + implicit val transid = TransactionId.testing + val id = ContainerId("Id") + + def await[A](f: Future[A], timeout: FiniteDuration = 500.milliseconds) = Await.result(f, timeout) + + def dockerClient(readResult: Future[JsObject]) = + new DockerClientWithFileAccess()(global) { + override val dockerCmd = Seq("docker") + override def configFileContents(configFile: File) = readResult + } + + def stateObject(oom: Boolean) = JsObject("State" -> JsObject("OOMKilled" -> oom.toJson)) + + behavior of "DockerClientWithFileAccess - isOomKilled" + + it should "return the state of the container respectively" in { + val dcTrue = dockerClient(Future.successful(stateObject(true))) + await(dcTrue.isOomKilled(id)) shouldBe true + + val dcFalse = dockerClient(Future.successful(stateObject(false))) + await(dcFalse.isOomKilled(id)) shouldBe false + } + + it should "default to 'false' if the json structure is unparseable" in { + val dc = dockerClient(Future.successful(JsObject())) + await(dc.isOomKilled(id)) shouldBe false + } +} + /** * The file access tests use fixtures (org.scalatest.fixture.FlatSpec) in contrast to * the IP address related tests. For this reason, the file access tests are in a separate diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala index 47e67c2..960ab92 100644 --- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala @@ -76,14 +76,15 @@ class DockerContainerTests retryCount: Int = 0)(implicit docker: DockerApiWithFileAccess, runc: RuncApi): DockerContainer = { new DockerContainer(id, addr) { - override protected def callContainer(path: String, - body: JsObject, - timeout: FiniteDuration, - retry: Boolean = false): Future[RunResult] = { + override protected def callContainer( + path: String, + body: JsObject, + timeout: FiniteDuration, + retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = { ccRes } - override protected val logsRetryCount = retryCount - override protected val logsRetryWait = 0.milliseconds + override protected val waitForLogs = retryCount.milliseconds + override protected val filePollInterval = 1.millisecond } } @@ -680,6 +681,8 @@ class DockerContainerTests Future.successful(()) } + override def isOomKilled(id: ContainerId)(implicit transid: TransactionId): Future[Boolean] = ??? + def rawContainerLogs(containerId: ContainerId, fromPos: Long): Future[ByteBuffer] = { rawContainerLogsInvocations += ((containerId, fromPos)) Future.successful(ByteBuffer.wrap(Array[Byte]())) diff --git a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala index e3e955c..4ac38f2 100644 --- a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala +++ b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala @@ -231,7 +231,7 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers { error.fields("message") shouldBe { JsObject( "code" -> "EMFILE".toJson, - "errno" -> -24.toJson, + "errno" -> (-24).toJson, "path" -> "/dev/zero".toJson, "syscall" -> "open".toJson) } @@ -241,10 +241,7 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers { activation.logs .getOrElse(List()) - .filter { - _.contains("ERROR: opened files = ") - } - .length shouldBe 1 + .count(_.contains("ERROR: opened files = ")) shouldBe 1 } } @@ -266,4 +263,19 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers { } } } + + it should "be aborted when exceeding its memory limits" in withAssetCleaner(wskprops) { (wp, assetHelper) => + val name = "TestNodeJsMemoryExceeding" + assetHelper.withCleaner(wsk.action, name, confirmDelete = true) { + val allowedMemory = 256.megabytes + val actionName = TestUtils.getTestActionFilename("memoryWithGC.js") + (action, _) => + action.create(name, Some(actionName), memory = Some(allowedMemory)) + } + + val run = wsk.action.invoke(name, Map("payload" -> 512.toJson)) + withActivation(wsk.activation, run) { + _.response.result.get.fields("error") shouldBe Messages.memoryExhausted.toJson + } + } } -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" '].