openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cbic...@apache.org
Subject [incubator-openwhisk] branch master updated: Bound docker/runc commands in their allowed runtime. (#3094)
Date Mon, 08 Jan 2018 11:54:12 GMT
This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new ec8148f  Bound docker/runc commands in their allowed runtime. (#3094)
ec8148f is described below

commit ec8148fbba3bf8142853fa2aee0bcfb92fd7a5ae
Author: Markus Thömmes <markusthoemmes@me.com>
AuthorDate: Mon Jan 8 12:54:09 2018 +0100

    Bound docker/runc commands in their allowed runtime. (#3094)
    
    Docker can cause hanging commands which never finish. Essentially those break the invoker
and it needs to be restarted to recover. This adds a timeout to each of those commands to
detect this problem.
---
 core/invoker/src/main/resources/application.conf   | 22 ++++++++
 .../core/containerpool/docker/DockerClient.scala   | 61 ++++++++++++++--------
 .../docker/DockerClientWithFileAccess.scala        |  9 ++--
 .../docker/DockerContainerFactory.scala            |  2 +-
 .../core/containerpool/docker/ProcessRunner.scala  | 26 ++++++---
 .../core/containerpool/docker/RuncClient.scala     | 30 ++++++++---
 .../docker/test/DockerClientTests.scala            | 25 ++++++---
 .../test/DockerClientWithFileAccessTests.scala     | 28 ++++++----
 .../docker/test/ProcessRunnerTests.scala           | 24 ++++++---
 .../docker/test/RuncClientTests.scala              | 12 +++--
 10 files changed, 173 insertions(+), 66 deletions(-)

diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
new file mode 100644
index 0000000..e7ed0a6
--- /dev/null
+++ b/core/invoker/src/main/resources/application.conf
@@ -0,0 +1,22 @@
+# common logging configuration see common scala
+include "logging"
+include "akka-http-version"
+
+whisk {
+  # Timeouts for docker commands. Set to "Inf" to disable timeout.
+  docker.timeouts {
+    run: 1 minute
+    rm: 1 minute
+    pull: 10 minutes
+    ps: 1 minute
+    inspect: 1 minute
+    pause: 10 seconds
+    unpause: 10 seconds
+  }
+
+  # Timeouts for runc commands. Set to "Inf" to disable timeout.
+  runc.timeouts {
+    pause: 10 seconds
+    resume: 10 seconds
+  }
+}
\ No newline at end of file
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
index e6b3dab..4ed631d 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
@@ -22,6 +22,8 @@ import java.nio.file.Files
 import java.nio.file.Paths
 import java.util.concurrent.Semaphore
 
+import akka.actor.ActorSystem
+
 import scala.collection.concurrent.TrieMap
 import scala.concurrent.blocking
 import scala.concurrent.ExecutionContext
@@ -30,13 +32,15 @@ import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
 import akka.event.Logging.ErrorLevel
-
+import pureconfig.loadConfigOrThrow
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
 import whisk.core.containerpool.ContainerId
 import whisk.core.containerpool.ContainerAddress
 
+import scala.concurrent.duration.Duration
+
 object DockerContainerId {
 
   val containerIdRegex = """^([0-9a-f]{64})$""".r
@@ -50,6 +54,17 @@ object DockerContainerId {
 }
 
 /**
+ * Configuration for docker client command timeouts.
+ */
+case class DockerClientTimeoutConfig(run: Duration,
+                                     rm: Duration,
+                                     pull: Duration,
+                                     ps: Duration,
+                                     pause: Duration,
+                                     unpause: Duration,
+                                     inspect: Duration)
+
+/**
  * Serves as interface to the docker CLI tool.
  *
  * Be cautious with the ExecutionContext passed to this, as the
@@ -57,7 +72,10 @@ object DockerContainerId {
  *
  * You only need one instance (and you shouldn't get more).
  */
-class DockerClient(dockerHost: Option[String] = None)(executionContext: ExecutionContext)(implicit
log: Logging)
+class DockerClient(dockerHost: Option[String] = None,
+                   timeouts: DockerClientTimeoutConfig =
+                     loadConfigOrThrow[DockerClientTimeoutConfig]("whisk.docker.timeouts"))(
+  executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
     extends DockerApi
     with ProcessRunner {
   implicit private val ec = executionContext
@@ -95,14 +113,12 @@ class DockerClient(dockerHost: Option[String] = None)(executionContext:
Executio
       }
     }.flatMap { _ =>
       // Iff the semaphore was acquired successfully
-      runCmd((Seq("run", "-d") ++ args ++ Seq(image)): _*)
+      runCmd(Seq("run", "-d") ++ args ++ Seq(image), timeouts.run)
         .andThen {
           // Release the semaphore as quick as possible regardless of the runCmd() result
           case _ => runSemaphore.release()
         }
-        .map {
-          ContainerId(_)
-        }
+        .map(ContainerId.apply)
         .recoverWith {
           // https://docs.docker.com/v1.12/engine/reference/run/#/exit-status
           // Exit code 125 means an error reported by the Docker daemon.
@@ -120,28 +136,28 @@ class DockerClient(dockerHost: Option[String] = None)(executionContext:
Executio
   }
 
   def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId):
Future[ContainerAddress] =
-    runCmd("inspect", "--format", s"{{.NetworkSettings.Networks.${network}.IPAddress}}",
id.asString).flatMap {
-      _ match {
-        case "<no value>" => Future.failed(new NoSuchElementException)
-        case stdout       => Future.successful(ContainerAddress(stdout))
-      }
+    runCmd(
+      Seq("inspect", "--format", s"{{.NetworkSettings.Networks.${network}.IPAddress}}", id.asString),
+      timeouts.inspect).flatMap {
+      case "<no value>" => Future.failed(new NoSuchElementException)
+      case stdout       => Future.successful(ContainerAddress(stdout))
     }
 
   def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
-    runCmd("pause", id.asString).map(_ => ())
+    runCmd(Seq("pause", id.asString), timeouts.pause).map(_ => ())
 
   def unpause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
-    runCmd("unpause", id.asString).map(_ => ())
+    runCmd(Seq("unpause", id.asString), timeouts.unpause).map(_ => ())
 
   def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
-    runCmd("rm", "-f", id.asString).map(_ => ())
+    runCmd(Seq("rm", "-f", id.asString), timeouts.rm).map(_ => ())
 
   def ps(filters: Seq[(String, String)] = Seq(), all: Boolean = false)(
     implicit transid: TransactionId): Future[Seq[ContainerId]] = {
-    val filterArgs = filters.map { case (attr, value) => Seq("--filter", s"$attr=$value")
}.flatten
+    val filterArgs = filters.flatMap { case (attr, value) => Seq("--filter", s"$attr=$value")
}
     val allArg = if (all) Seq("--all") else Seq.empty[String]
     val cmd = Seq("ps", "--quiet", "--no-trunc") ++ allArg ++ filterArgs
-    runCmd(cmd: _*).map(_.lines.toSeq.map(ContainerId.apply))
+    runCmd(cmd, timeouts.ps).map(_.lines.toSeq.map(ContainerId.apply))
   }
 
   /**
@@ -152,16 +168,19 @@ class DockerClient(dockerHost: Option[String] = None)(executionContext:
Executio
   private val pullsInFlight = TrieMap[String, Future[Unit]]()
   def pull(image: String)(implicit transid: TransactionId): Future[Unit] =
     pullsInFlight.getOrElseUpdate(image, {
-      runCmd("pull", image).map(_ => ()).andThen { case _ => pullsInFlight.remove(image)
}
+      runCmd(Seq("pull", image), timeouts.pull).map(_ => ()).andThen { case _ => pullsInFlight.remove(image)
}
     })
 
   def isOomKilled(id: ContainerId)(implicit transid: TransactionId): Future[Boolean] =
-    runCmd("inspect", id.asString, "--format", "{{.State.OOMKilled}}").map(_.toBoolean)
+    runCmd(Seq("inspect", id.asString, "--format", "{{.State.OOMKilled}}"), timeouts.inspect).map(_.toBoolean)
 
-  private def runCmd(args: String*)(implicit transid: TransactionId): Future[String] = {
+  private def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId):
Future[String] = {
     val cmd = dockerCmd ++ args
-    val start = transid.started(this, LoggingMarkers.INVOKER_DOCKER_CMD(args.head), s"running
${cmd.mkString(" ")}")
-    executeProcess(cmd: _*).andThen {
+    val start = transid.started(
+      this,
+      LoggingMarkers.INVOKER_DOCKER_CMD(args.head),
+      s"running ${cmd.mkString(" ")} (timeout: $timeout)")
+    executeProcess(cmd, timeout).andThen {
       case Success(_) => transid.finished(this, start)
       case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
     }
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
index 1644e2e..5487f5d 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
@@ -20,6 +20,7 @@ package whisk.core.containerpool.docker
 import java.io.File
 import java.nio.file.Paths
 
+import akka.actor.ActorSystem
 import akka.stream.alpakka.file.scaladsl.FileTailSource
 import akka.stream.scaladsl.{FileIO, Source => AkkaSource}
 import akka.util.ByteString
@@ -37,10 +38,10 @@ import whisk.core.containerpool.ContainerAddress
 import scala.io.Source
 import scala.concurrent.duration.FiniteDuration
 
-class DockerClientWithFileAccess(
-  dockerHost: Option[String] = None,
-  containersDirectory: File = Paths.get("containers").toFile)(executionContext: ExecutionContext)(implicit
log: Logging)
-    extends DockerClient(dockerHost)(executionContext)(log)
+class DockerClientWithFileAccess(dockerHost: Option[String] = None,
+                                 containersDirectory: File = Paths.get("containers").toFile)(
+  executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
+    extends DockerClient(dockerHost)(executionContext)
     with DockerApiWithFileAccess {
 
   implicit private val ec = executionContext
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
index af486f2..ffd18c9 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
@@ -41,7 +41,7 @@ class DockerContainerFactory(config: WhiskConfig, instance: InstanceId,
paramete
 
   /** Initialize container clients */
   implicit val docker = new DockerClientWithFileAccess()(ec)
-  implicit val runc = new RuncClient(ec)
+  implicit val runc = new RuncClient()(ec)
 
   /** Create a container using docker cli */
   override def createContainer(tid: TransactionId,
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
index f8139b4..b27e62f 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
@@ -17,10 +17,13 @@
 
 package whisk.core.containerpool.docker
 
+import akka.actor.ActorSystem
+
 import scala.collection.mutable
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.blocking
+import scala.concurrent.duration.{Duration, FiniteDuration}
 import scala.sys.process._
 
 trait ProcessRunner {
@@ -29,26 +32,37 @@ trait ProcessRunner {
    * Runs the specified command with arguments asynchronously and
    * capture stdout as well as stderr.
    *
+   * If not set to infinite, after timeout is reached the process is killed.
+   *
    * Be cautious with the execution context you pass because the command
    * is blocking.
    *
    * @param args command to be run including arguments
+   * @param timeout maximum time the command is allowed to take
    * @return a future completing according to the command's exit code
    */
-  protected def executeProcess(args: String*)(implicit ec: ExecutionContext) =
+  protected def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext,
as: ActorSystem) =
     Future(blocking {
       val out = new mutable.ListBuffer[String]
       val err = new mutable.ListBuffer[String]
-      val exitCode = args ! ProcessLogger(o => out += o, e => err += e)
+      val process = args.run(ProcessLogger(o => out += o, e => err += e))
 
-      (exitCode, out.mkString("\n"), err.mkString("\n"))
+      val scheduled = timeout match {
+        case t: FiniteDuration => Some(as.scheduler.scheduleOnce(t)(process.destroy()))
+        case _                 => None
+      }
+
+      (process.exitValue(), out.mkString("\n"), err.mkString("\n"), scheduled)
     }).flatMap {
-      case (0, stdout, _) =>
+      case (0, stdout, _, scheduled) =>
+        scheduled.foreach(_.cancel())
         Future.successful(stdout)
-      case (code, stdout, stderr) =>
+      case (code, stdout, stderr, scheduled) =>
+        scheduled.foreach(_.cancel())
         Future.failed(ProcessRunningException(code, stdout, stderr))
     }
+
 }
 
 case class ProcessRunningException(exitCode: Int, stdout: String, stderr: String)
-    extends Exception(s"code: $exitCode, stdout: $stdout, stderr: $stderr")
+    extends Exception(s"code: $exitCode ${if (exitCode == 143) "(killed)" else ""}, stdout:
$stdout, stderr: $stderr")
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala
index c398765..526bfc4 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala
@@ -17,16 +17,27 @@
 
 package whisk.core.containerpool.docker
 
+import akka.actor.ActorSystem
+
 import scala.concurrent.Future
 import scala.concurrent.ExecutionContext
 import scala.util.Failure
 import whisk.common.TransactionId
+
 import scala.util.Success
 import whisk.common.LoggingMarkers
 import whisk.common.Logging
 import akka.event.Logging.ErrorLevel
+import pureconfig.loadConfigOrThrow
 import whisk.core.containerpool.ContainerId
 
+import scala.concurrent.duration.Duration
+
+/**
+ * Configuration for runc client command timeouts.
+ */
+case class RuncClientTimeouts(pause: Duration, resume: Duration)
+
 /**
  * Serves as interface to the docker CLI tool.
  *
@@ -35,22 +46,29 @@ import whisk.core.containerpool.ContainerId
  *
  * You only need one instance (and you shouldn't get more).
  */
-class RuncClient(executionContext: ExecutionContext)(implicit log: Logging) extends RuncApi
with ProcessRunner {
+class RuncClient(timeouts: RuncClientTimeouts = loadConfigOrThrow[RuncClientTimeouts]("whisk.runc.timeouts"))(
+  executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
+    extends RuncApi
+    with ProcessRunner {
   implicit private val ec = executionContext
 
   // Determines how to run docker. Failure to find a Docker binary implies
   // a failure to initialize this instance of DockerClient.
   protected val runcCmd: Seq[String] = Seq("/usr/bin/docker-runc")
 
-  def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = runCmd("pause",
id.asString).map(_ => ())
+  def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
+    runCmd(Seq("pause", id.asString), timeouts.pause).map(_ => ())
 
   def resume(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
-    runCmd("resume", id.asString).map(_ => ())
+    runCmd(Seq("resume", id.asString), timeouts.resume).map(_ => ())
 
-  private def runCmd(args: String*)(implicit transid: TransactionId): Future[String] = {
+  private def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId):
Future[String] = {
     val cmd = runcCmd ++ args
-    val start = transid.started(this, LoggingMarkers.INVOKER_RUNC_CMD(args.head), s"running
${cmd.mkString(" ")}")
-    executeProcess(cmd: _*).andThen {
+    val start = transid.started(
+      this,
+      LoggingMarkers.INVOKER_RUNC_CMD(args.head),
+      s"running ${cmd.mkString(" ")} (timeout: $timeout)")
+    executeProcess(cmd, timeout).andThen {
       case Success(_) => transid.finished(this, start)
       case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
     }
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
index 9c2b5b3..267865d 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
@@ -17,14 +17,15 @@
 
 package whisk.core.containerpool.docker.test
 
+import akka.actor.ActorSystem
+
 import java.util.concurrent.Semaphore
 
 import scala.concurrent.Await
 import scala.concurrent.ExecutionContext
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.Future
-import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration._
 import scala.concurrent.Promise
 import scala.util.Success
 import org.junit.runner.RunWith
@@ -34,8 +35,7 @@ import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.Matchers
 import org.scalatest.time.{Seconds, Span}
-import common.StreamLogging
-
+import common.{StreamLogging, WskActorSystem}
 import whisk.common.LogMarker
 import whisk.common.LoggingMarkers.INVOKER_DOCKER_CMD
 import whisk.common.TransactionId
@@ -48,7 +48,13 @@ import whisk.core.containerpool.docker.ProcessRunningException
 import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
-class DockerClientTests extends FlatSpec with Matchers with StreamLogging with BeforeAndAfterEach
with Eventually {
+class DockerClientTests
+    extends FlatSpec
+    with Matchers
+    with StreamLogging
+    with BeforeAndAfterEach
+    with Eventually
+    with WskActorSystem {
 
   override def beforeEach = stream.reset()
 
@@ -65,7 +71,8 @@ class DockerClientTests extends FlatSpec with Matchers with StreamLogging
with B
   /** Returns a DockerClient with a mocked result for 'executeProcess' */
   def dockerClient(execResult: => Future[String]) = new DockerClient()(global) {
     override val dockerCmd = Seq(dockerCommand)
-    override def executeProcess(args: String*)(implicit ec: ExecutionContext) = execResult
+    override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext,
as: ActorSystem) =
+      execResult
   }
 
   behavior of "DockerContainerId"
@@ -186,7 +193,8 @@ class DockerClientTests extends FlatSpec with Matchers with StreamLogging
with B
     var runCmdCount = 0
     val dc = new DockerClient()(global) {
       override val dockerCmd = Seq(dockerCommand)
-      override def executeProcess(args: String*)(implicit ec: ExecutionContext) = {
+      override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext,
+                                                                        as: ActorSystem)
= {
         runCmdCount += 1
         runCmdCount match {
           case 1 => firstRunPromise.future
@@ -233,7 +241,8 @@ class DockerClientTests extends FlatSpec with Matchers with StreamLogging
with B
     var runCmdCount = 0
     val dc = new DockerClient()(global) {
       override val dockerCmd = Seq(dockerCommand)
-      override def executeProcess(args: String*)(implicit ec: ExecutionContext) = {
+      override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext,
+                                                                        as: ActorSystem)
= {
         runCmdCount += 1
         println(s"runCmdCount=${runCmdCount}, args.last=${args.last}")
         runCmdCount match {
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
index 895461d..2918c8e 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
@@ -19,21 +19,20 @@ package whisk.core.containerpool.docker.test
 
 import java.io.File
 
+import akka.actor.ActorSystem
+
 import scala.concurrent.Await
 import scala.concurrent.ExecutionContext
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.Future
-import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
-import scala.language.reflectiveCalls // Needed to invoke publicIpAddressFromFile() method
of structural dockerClientForIp extension
-
+import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
+import scala.language.reflectiveCalls
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.Matchers
-
-import common.StreamLogging
+import common.{StreamLogging, WskActorSystem}
 import spray.json._
 import spray.json.DefaultJsonProtocol._
 import whisk.common.TransactionId
@@ -42,7 +41,12 @@ import whisk.core.containerpool.ContainerAddress
 import whisk.core.containerpool.docker.DockerClientWithFileAccess
 
 @RunWith(classOf[JUnitRunner])
-class DockerClientWithFileAccessTestsIp extends FlatSpec with Matchers with StreamLogging
with BeforeAndAfterEach {
+class DockerClientWithFileAccessTestsIp
+    extends FlatSpec
+    with Matchers
+    with StreamLogging
+    with BeforeAndAfterEach
+    with WskActorSystem {
 
   override def beforeEach = stream.reset()
 
@@ -69,7 +73,8 @@ class DockerClientWithFileAccessTestsIp extends FlatSpec with Matchers with
Stre
                    readResult: Future[JsObject] = Future.successful(dockerConfig)) =
     new DockerClientWithFileAccess()(global) {
       override val dockerCmd = Seq(dockerCommand)
-      override def executeProcess(args: String*)(implicit ec: ExecutionContext) = execResult
+      override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext,
+                                                                        as: ActorSystem)
= execResult
       override def configFileContents(configFile: File) = readResult
       // Make protected ipAddressFromFile available for testing - requires reflectiveCalls
       def publicIpAddressFromFile(id: ContainerId, network: String): Future[ContainerAddress]
=
@@ -108,7 +113,12 @@ class DockerClientWithFileAccessTestsIp extends FlatSpec with Matchers
with Stre
 }
 
 @RunWith(classOf[JUnitRunner])
-class DockerClientWithFileAccessTestsOom extends FlatSpec with Matchers with StreamLogging
with BeforeAndAfterEach {
+class DockerClientWithFileAccessTestsOom
+    extends FlatSpec
+    with Matchers
+    with StreamLogging
+    with BeforeAndAfterEach
+    with WskActorSystem {
   override def beforeEach = stream.reset()
 
   implicit val transid = TransactionId.testing
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala
index b2ac40f..4abe61f 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala
@@ -17,35 +17,41 @@
 
 package whisk.core.containerpool.docker.test
 
-import scala.concurrent.Future
+import akka.actor.ActorSystem
+import common.WskActorSystem
 
+import scala.concurrent.Future
 import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
 
 import scala.concurrent.ExecutionContext.Implicits.global
 import whisk.core.containerpool.docker.ProcessRunner
+
 import scala.concurrent.ExecutionContext
 import scala.concurrent.duration._
 import scala.concurrent.Await
 import org.scalatest.Matchers
 import whisk.core.containerpool.docker.ProcessRunningException
+
 import scala.language.reflectiveCalls // Needed to invoke run() method of structural ProcessRunner
extension
 
 @RunWith(classOf[JUnitRunner])
-class ProcessRunnerTests extends FlatSpec with Matchers {
+class ProcessRunnerTests extends FlatSpec with Matchers with WskActorSystem {
 
-  def await[A](f: Future[A], timeout: FiniteDuration = 500.milliseconds) = Await.result(f,
timeout)
+  def await[A](f: Future[A], timeout: FiniteDuration = 2.seconds) = Await.result(f, timeout)
 
   val processRunner = new ProcessRunner {
-    def run(args: String*)(implicit ec: ExecutionContext) = executeProcess(args: _*)
+    def run(args: Seq[String], timeout: FiniteDuration = 100.milliseconds)(implicit ec: ExecutionContext,
+                                                                           as: ActorSystem)
=
+      executeProcess(args, timeout)(ec, as)
   }
 
   behavior of "ProcessRunner"
 
   it should "run an external command successfully and capture its output" in {
     val stdout = "Output"
-    await(processRunner.run("echo", stdout)) shouldBe stdout
+    await(processRunner.run(Seq("echo", stdout))) shouldBe stdout
   }
 
   it should "run an external command unsuccessfully and capture its output" in {
@@ -53,8 +59,14 @@ class ProcessRunnerTests extends FlatSpec with Matchers {
     val stdout = "Output"
     val stderr = "Error"
 
-    val future = processRunner.run("/bin/sh", "-c", s"echo ${stdout}; echo ${stderr} 1>&2;
exit ${exitCode}")
+    val future = processRunner.run(Seq("/bin/sh", "-c", s"echo ${stdout}; echo ${stderr}
1>&2; exit ${exitCode}"))
 
     the[ProcessRunningException] thrownBy await(future) shouldBe ProcessRunningException(exitCode,
stdout, stderr)
   }
+
+  it should "terminate an external command after the specified timeout is reached" in {
+    val future = processRunner.run(Seq("sleep", "1"), 100.milliseconds)
+    val exception = the[ProcessRunningException] thrownBy await(future)
+    exception.exitCode shouldBe 143
+  }
 }
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala
index b4e2e47..cf5990b 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala
@@ -17,8 +17,9 @@
 
 package whisk.core.containerpool.docker.test
 
-import scala.concurrent.Future
+import akka.actor.ActorSystem
 
+import scala.concurrent.Future
 import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
@@ -29,7 +30,7 @@ import scala.concurrent.duration._
 import scala.concurrent.Await
 import org.scalatest.Matchers
 import whisk.core.containerpool.docker.RuncClient
-import common.StreamLogging
+import common.{StreamLogging, WskActorSystem}
 import whisk.core.containerpool.ContainerId
 import whisk.common.TransactionId
 import org.scalatest.BeforeAndAfterEach
@@ -37,7 +38,7 @@ import whisk.common.LogMarker
 import whisk.common.LoggingMarkers.INVOKER_RUNC_CMD
 
 @RunWith(classOf[JUnitRunner])
-class RuncClientTests extends FlatSpec with Matchers with StreamLogging with BeforeAndAfterEach
{
+class RuncClientTests extends FlatSpec with Matchers with StreamLogging with BeforeAndAfterEach
with WskActorSystem {
 
   override def beforeEach = stream.reset()
 
@@ -49,9 +50,10 @@ class RuncClientTests extends FlatSpec with Matchers with StreamLogging
with Bef
   val runcCommand = "docker-runc"
 
   /** Returns a RuncClient with a mocked result for 'executeProcess' */
-  def runcClient(result: Future[String]) = new RuncClient(global) {
+  def runcClient(result: Future[String]) = new RuncClient()(global) {
     override val runcCmd = Seq(runcCommand)
-    override def executeProcess(args: String*)(implicit ec: ExecutionContext) = result
+    override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext,
as: ActorSystem) =
+      result
   }
 
   /** Calls a runc method based on the name of the method. */

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Mime
View raw message