This is an automated email from the ASF dual-hosted git repository.
cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new ec8148f Bound docker/runc commands in their allowed runtime. (#3094)
ec8148f is described below
commit ec8148fbba3bf8142853fa2aee0bcfb92fd7a5ae
Author: Markus Thömmes <markusthoemmes@me.com>
AuthorDate: Mon Jan 8 12:54:09 2018 +0100
Bound docker/runc commands in their allowed runtime. (#3094)
Docker can cause hanging commands which never finish. Essentially those break the invoker
and it needs to be restarted to recover. This adds a timeout to each of those commands to
detect this problem.
---
core/invoker/src/main/resources/application.conf | 22 ++++++++
.../core/containerpool/docker/DockerClient.scala | 61 ++++++++++++++--------
.../docker/DockerClientWithFileAccess.scala | 9 ++--
.../docker/DockerContainerFactory.scala | 2 +-
.../core/containerpool/docker/ProcessRunner.scala | 26 ++++++---
.../core/containerpool/docker/RuncClient.scala | 30 ++++++++---
.../docker/test/DockerClientTests.scala | 25 ++++++---
.../test/DockerClientWithFileAccessTests.scala | 28 ++++++----
.../docker/test/ProcessRunnerTests.scala | 24 ++++++---
.../docker/test/RuncClientTests.scala | 12 +++--
10 files changed, 173 insertions(+), 66 deletions(-)
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
new file mode 100644
index 0000000..e7ed0a6
--- /dev/null
+++ b/core/invoker/src/main/resources/application.conf
@@ -0,0 +1,22 @@
+# common logging configuration see common scala
+include "logging"
+include "akka-http-version"
+
+whisk {
+ # Timeouts for docker commands. Set to "Inf" to disable timeout.
+ docker.timeouts {
+ run: 1 minute
+ rm: 1 minute
+ pull: 10 minutes
+ ps: 1 minute
+ inspect: 1 minute
+ pause: 10 seconds
+ unpause: 10 seconds
+ }
+
+ # Timeouts for runc commands. Set to "Inf" to disable timeout.
+ runc.timeouts {
+ pause: 10 seconds
+ resume: 10 seconds
+ }
+}
\ No newline at end of file
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
index e6b3dab..4ed631d 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
@@ -22,6 +22,8 @@ import java.nio.file.Files
import java.nio.file.Paths
import java.util.concurrent.Semaphore
+import akka.actor.ActorSystem
+
import scala.collection.concurrent.TrieMap
import scala.concurrent.blocking
import scala.concurrent.ExecutionContext
@@ -30,13 +32,15 @@ import scala.util.Failure
import scala.util.Success
import scala.util.Try
import akka.event.Logging.ErrorLevel
-
+import pureconfig.loadConfigOrThrow
import whisk.common.Logging
import whisk.common.LoggingMarkers
import whisk.common.TransactionId
import whisk.core.containerpool.ContainerId
import whisk.core.containerpool.ContainerAddress
+import scala.concurrent.duration.Duration
+
object DockerContainerId {
val containerIdRegex = """^([0-9a-f]{64})$""".r
@@ -50,6 +54,17 @@ object DockerContainerId {
}
/**
+ * Configuration for docker client command timeouts.
+ */
+case class DockerClientTimeoutConfig(run: Duration,
+ rm: Duration,
+ pull: Duration,
+ ps: Duration,
+ pause: Duration,
+ unpause: Duration,
+ inspect: Duration)
+
+/**
* Serves as interface to the docker CLI tool.
*
* Be cautious with the ExecutionContext passed to this, as the
@@ -57,7 +72,10 @@ object DockerContainerId {
*
* You only need one instance (and you shouldn't get more).
*/
-class DockerClient(dockerHost: Option[String] = None)(executionContext: ExecutionContext)(implicit
log: Logging)
+class DockerClient(dockerHost: Option[String] = None,
+ timeouts: DockerClientTimeoutConfig =
+ loadConfigOrThrow[DockerClientTimeoutConfig]("whisk.docker.timeouts"))(
+ executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
extends DockerApi
with ProcessRunner {
implicit private val ec = executionContext
@@ -95,14 +113,12 @@ class DockerClient(dockerHost: Option[String] = None)(executionContext:
Executio
}
}.flatMap { _ =>
// Iff the semaphore was acquired successfully
- runCmd((Seq("run", "-d") ++ args ++ Seq(image)): _*)
+ runCmd(Seq("run", "-d") ++ args ++ Seq(image), timeouts.run)
.andThen {
// Release the semaphore as quick as possible regardless of the runCmd() result
case _ => runSemaphore.release()
}
- .map {
- ContainerId(_)
- }
+ .map(ContainerId.apply)
.recoverWith {
// https://docs.docker.com/v1.12/engine/reference/run/#/exit-status
// Exit code 125 means an error reported by the Docker daemon.
@@ -120,28 +136,28 @@ class DockerClient(dockerHost: Option[String] = None)(executionContext:
Executio
}
def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId):
Future[ContainerAddress] =
- runCmd("inspect", "--format", s"{{.NetworkSettings.Networks.${network}.IPAddress}}",
id.asString).flatMap {
- _ match {
- case "<no value>" => Future.failed(new NoSuchElementException)
- case stdout => Future.successful(ContainerAddress(stdout))
- }
+ runCmd(
+ Seq("inspect", "--format", s"{{.NetworkSettings.Networks.${network}.IPAddress}}", id.asString),
+ timeouts.inspect).flatMap {
+ case "<no value>" => Future.failed(new NoSuchElementException)
+ case stdout => Future.successful(ContainerAddress(stdout))
}
def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
- runCmd("pause", id.asString).map(_ => ())
+ runCmd(Seq("pause", id.asString), timeouts.pause).map(_ => ())
def unpause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
- runCmd("unpause", id.asString).map(_ => ())
+ runCmd(Seq("unpause", id.asString), timeouts.unpause).map(_ => ())
def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
- runCmd("rm", "-f", id.asString).map(_ => ())
+ runCmd(Seq("rm", "-f", id.asString), timeouts.rm).map(_ => ())
def ps(filters: Seq[(String, String)] = Seq(), all: Boolean = false)(
implicit transid: TransactionId): Future[Seq[ContainerId]] = {
- val filterArgs = filters.map { case (attr, value) => Seq("--filter", s"$attr=$value")
}.flatten
+ val filterArgs = filters.flatMap { case (attr, value) => Seq("--filter", s"$attr=$value")
}
val allArg = if (all) Seq("--all") else Seq.empty[String]
val cmd = Seq("ps", "--quiet", "--no-trunc") ++ allArg ++ filterArgs
- runCmd(cmd: _*).map(_.lines.toSeq.map(ContainerId.apply))
+ runCmd(cmd, timeouts.ps).map(_.lines.toSeq.map(ContainerId.apply))
}
/**
@@ -152,16 +168,19 @@ class DockerClient(dockerHost: Option[String] = None)(executionContext:
Executio
private val pullsInFlight = TrieMap[String, Future[Unit]]()
def pull(image: String)(implicit transid: TransactionId): Future[Unit] =
pullsInFlight.getOrElseUpdate(image, {
- runCmd("pull", image).map(_ => ()).andThen { case _ => pullsInFlight.remove(image)
}
+ runCmd(Seq("pull", image), timeouts.pull).map(_ => ()).andThen { case _ => pullsInFlight.remove(image)
}
})
def isOomKilled(id: ContainerId)(implicit transid: TransactionId): Future[Boolean] =
- runCmd("inspect", id.asString, "--format", "{{.State.OOMKilled}}").map(_.toBoolean)
+ runCmd(Seq("inspect", id.asString, "--format", "{{.State.OOMKilled}}"), timeouts.inspect).map(_.toBoolean)
- private def runCmd(args: String*)(implicit transid: TransactionId): Future[String] = {
+ private def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId):
Future[String] = {
val cmd = dockerCmd ++ args
- val start = transid.started(this, LoggingMarkers.INVOKER_DOCKER_CMD(args.head), s"running
${cmd.mkString(" ")}")
- executeProcess(cmd: _*).andThen {
+ val start = transid.started(
+ this,
+ LoggingMarkers.INVOKER_DOCKER_CMD(args.head),
+ s"running ${cmd.mkString(" ")} (timeout: $timeout)")
+ executeProcess(cmd, timeout).andThen {
case Success(_) => transid.finished(this, start)
case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
index 1644e2e..5487f5d 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
@@ -20,6 +20,7 @@ package whisk.core.containerpool.docker
import java.io.File
import java.nio.file.Paths
+import akka.actor.ActorSystem
import akka.stream.alpakka.file.scaladsl.FileTailSource
import akka.stream.scaladsl.{FileIO, Source => AkkaSource}
import akka.util.ByteString
@@ -37,10 +38,10 @@ import whisk.core.containerpool.ContainerAddress
import scala.io.Source
import scala.concurrent.duration.FiniteDuration
-class DockerClientWithFileAccess(
- dockerHost: Option[String] = None,
- containersDirectory: File = Paths.get("containers").toFile)(executionContext: ExecutionContext)(implicit
log: Logging)
- extends DockerClient(dockerHost)(executionContext)(log)
+class DockerClientWithFileAccess(dockerHost: Option[String] = None,
+ containersDirectory: File = Paths.get("containers").toFile)(
+ executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
+ extends DockerClient(dockerHost)(executionContext)
with DockerApiWithFileAccess {
implicit private val ec = executionContext
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
index af486f2..ffd18c9 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
@@ -41,7 +41,7 @@ class DockerContainerFactory(config: WhiskConfig, instance: InstanceId,
paramete
/** Initialize container clients */
implicit val docker = new DockerClientWithFileAccess()(ec)
- implicit val runc = new RuncClient(ec)
+ implicit val runc = new RuncClient()(ec)
/** Create a container using docker cli */
override def createContainer(tid: TransactionId,
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
index f8139b4..b27e62f 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
@@ -17,10 +17,13 @@
package whisk.core.containerpool.docker
+import akka.actor.ActorSystem
+
import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.blocking
+import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.sys.process._
trait ProcessRunner {
@@ -29,26 +32,37 @@ trait ProcessRunner {
* Runs the specified command with arguments asynchronously and
* capture stdout as well as stderr.
*
+ * If not set to infinite, after timeout is reached the process is killed.
+ *
* Be cautious with the execution context you pass because the command
* is blocking.
*
* @param args command to be run including arguments
+ * @param timeout maximum time the command is allowed to take
* @return a future completing according to the command's exit code
*/
- protected def executeProcess(args: String*)(implicit ec: ExecutionContext) =
+ protected def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext,
as: ActorSystem) =
Future(blocking {
val out = new mutable.ListBuffer[String]
val err = new mutable.ListBuffer[String]
- val exitCode = args ! ProcessLogger(o => out += o, e => err += e)
+ val process = args.run(ProcessLogger(o => out += o, e => err += e))
- (exitCode, out.mkString("\n"), err.mkString("\n"))
+ val scheduled = timeout match {
+ case t: FiniteDuration => Some(as.scheduler.scheduleOnce(t)(process.destroy()))
+ case _ => None
+ }
+
+ (process.exitValue(), out.mkString("\n"), err.mkString("\n"), scheduled)
}).flatMap {
- case (0, stdout, _) =>
+ case (0, stdout, _, scheduled) =>
+ scheduled.foreach(_.cancel())
Future.successful(stdout)
- case (code, stdout, stderr) =>
+ case (code, stdout, stderr, scheduled) =>
+ scheduled.foreach(_.cancel())
Future.failed(ProcessRunningException(code, stdout, stderr))
}
+
}
case class ProcessRunningException(exitCode: Int, stdout: String, stderr: String)
- extends Exception(s"code: $exitCode, stdout: $stdout, stderr: $stderr")
+ extends Exception(s"code: $exitCode ${if (exitCode == 143) "(killed)" else ""}, stdout:
$stdout, stderr: $stderr")
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala
index c398765..526bfc4 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala
@@ -17,16 +17,27 @@
package whisk.core.containerpool.docker
+import akka.actor.ActorSystem
+
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import scala.util.Failure
import whisk.common.TransactionId
+
import scala.util.Success
import whisk.common.LoggingMarkers
import whisk.common.Logging
import akka.event.Logging.ErrorLevel
+import pureconfig.loadConfigOrThrow
import whisk.core.containerpool.ContainerId
+import scala.concurrent.duration.Duration
+
+/**
+ * Configuration for runc client command timeouts.
+ */
+case class RuncClientTimeouts(pause: Duration, resume: Duration)
+
/**
* Serves as interface to the docker CLI tool.
*
@@ -35,22 +46,29 @@ import whisk.core.containerpool.ContainerId
*
* You only need one instance (and you shouldn't get more).
*/
-class RuncClient(executionContext: ExecutionContext)(implicit log: Logging) extends RuncApi
with ProcessRunner {
+class RuncClient(timeouts: RuncClientTimeouts = loadConfigOrThrow[RuncClientTimeouts]("whisk.runc.timeouts"))(
+ executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
+ extends RuncApi
+ with ProcessRunner {
implicit private val ec = executionContext
// Determines how to run docker. Failure to find a Docker binary implies
// a failure to initialize this instance of DockerClient.
protected val runcCmd: Seq[String] = Seq("/usr/bin/docker-runc")
- def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = runCmd("pause",
id.asString).map(_ => ())
+ def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
+ runCmd(Seq("pause", id.asString), timeouts.pause).map(_ => ())
def resume(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
- runCmd("resume", id.asString).map(_ => ())
+ runCmd(Seq("resume", id.asString), timeouts.resume).map(_ => ())
- private def runCmd(args: String*)(implicit transid: TransactionId): Future[String] = {
+ private def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId):
Future[String] = {
val cmd = runcCmd ++ args
- val start = transid.started(this, LoggingMarkers.INVOKER_RUNC_CMD(args.head), s"running
${cmd.mkString(" ")}")
- executeProcess(cmd: _*).andThen {
+ val start = transid.started(
+ this,
+ LoggingMarkers.INVOKER_RUNC_CMD(args.head),
+ s"running ${cmd.mkString(" ")} (timeout: $timeout)")
+ executeProcess(cmd, timeout).andThen {
case Success(_) => transid.finished(this, start)
case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
}
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
index 9c2b5b3..267865d 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
@@ -17,14 +17,15 @@
package whisk.core.containerpool.docker.test
+import akka.actor.ActorSystem
+
import java.util.concurrent.Semaphore
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
-import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration._
import scala.concurrent.Promise
import scala.util.Success
import org.junit.runner.RunWith
@@ -34,8 +35,7 @@ import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
import org.scalatest.Matchers
import org.scalatest.time.{Seconds, Span}
-import common.StreamLogging
-
+import common.{StreamLogging, WskActorSystem}
import whisk.common.LogMarker
import whisk.common.LoggingMarkers.INVOKER_DOCKER_CMD
import whisk.common.TransactionId
@@ -48,7 +48,13 @@ import whisk.core.containerpool.docker.ProcessRunningException
import whisk.utils.retry
@RunWith(classOf[JUnitRunner])
-class DockerClientTests extends FlatSpec with Matchers with StreamLogging with BeforeAndAfterEach
with Eventually {
+class DockerClientTests
+ extends FlatSpec
+ with Matchers
+ with StreamLogging
+ with BeforeAndAfterEach
+ with Eventually
+ with WskActorSystem {
override def beforeEach = stream.reset()
@@ -65,7 +71,8 @@ class DockerClientTests extends FlatSpec with Matchers with StreamLogging
with B
/** Returns a DockerClient with a mocked result for 'executeProcess' */
def dockerClient(execResult: => Future[String]) = new DockerClient()(global) {
override val dockerCmd = Seq(dockerCommand)
- override def executeProcess(args: String*)(implicit ec: ExecutionContext) = execResult
+ override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext,
as: ActorSystem) =
+ execResult
}
behavior of "DockerContainerId"
@@ -186,7 +193,8 @@ class DockerClientTests extends FlatSpec with Matchers with StreamLogging
with B
var runCmdCount = 0
val dc = new DockerClient()(global) {
override val dockerCmd = Seq(dockerCommand)
- override def executeProcess(args: String*)(implicit ec: ExecutionContext) = {
+ override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext,
+ as: ActorSystem)
= {
runCmdCount += 1
runCmdCount match {
case 1 => firstRunPromise.future
@@ -233,7 +241,8 @@ class DockerClientTests extends FlatSpec with Matchers with StreamLogging
with B
var runCmdCount = 0
val dc = new DockerClient()(global) {
override val dockerCmd = Seq(dockerCommand)
- override def executeProcess(args: String*)(implicit ec: ExecutionContext) = {
+ override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext,
+ as: ActorSystem)
= {
runCmdCount += 1
println(s"runCmdCount=${runCmdCount}, args.last=${args.last}")
runCmdCount match {
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
index 895461d..2918c8e 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
@@ -19,21 +19,20 @@ package whisk.core.containerpool.docker.test
import java.io.File
+import akka.actor.ActorSystem
+
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
-import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
-import scala.language.reflectiveCalls // Needed to invoke publicIpAddressFromFile() method
of structural dockerClientForIp extension
-
+import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
+import scala.language.reflectiveCalls
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterEach
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
import org.scalatest.Matchers
-
-import common.StreamLogging
+import common.{StreamLogging, WskActorSystem}
import spray.json._
import spray.json.DefaultJsonProtocol._
import whisk.common.TransactionId
@@ -42,7 +41,12 @@ import whisk.core.containerpool.ContainerAddress
import whisk.core.containerpool.docker.DockerClientWithFileAccess
@RunWith(classOf[JUnitRunner])
-class DockerClientWithFileAccessTestsIp extends FlatSpec with Matchers with StreamLogging
with BeforeAndAfterEach {
+class DockerClientWithFileAccessTestsIp
+ extends FlatSpec
+ with Matchers
+ with StreamLogging
+ with BeforeAndAfterEach
+ with WskActorSystem {
override def beforeEach = stream.reset()
@@ -69,7 +73,8 @@ class DockerClientWithFileAccessTestsIp extends FlatSpec with Matchers with
Stre
readResult: Future[JsObject] = Future.successful(dockerConfig)) =
new DockerClientWithFileAccess()(global) {
override val dockerCmd = Seq(dockerCommand)
- override def executeProcess(args: String*)(implicit ec: ExecutionContext) = execResult
+ override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext,
+ as: ActorSystem)
= execResult
override def configFileContents(configFile: File) = readResult
// Make protected ipAddressFromFile available for testing - requires reflectiveCalls
def publicIpAddressFromFile(id: ContainerId, network: String): Future[ContainerAddress]
=
@@ -108,7 +113,12 @@ class DockerClientWithFileAccessTestsIp extends FlatSpec with Matchers
with Stre
}
@RunWith(classOf[JUnitRunner])
-class DockerClientWithFileAccessTestsOom extends FlatSpec with Matchers with StreamLogging
with BeforeAndAfterEach {
+class DockerClientWithFileAccessTestsOom
+ extends FlatSpec
+ with Matchers
+ with StreamLogging
+ with BeforeAndAfterEach
+ with WskActorSystem {
override def beforeEach = stream.reset()
implicit val transid = TransactionId.testing
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala
index b2ac40f..4abe61f 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala
@@ -17,35 +17,41 @@
package whisk.core.containerpool.docker.test
-import scala.concurrent.Future
+import akka.actor.ActorSystem
+import common.WskActorSystem
+import scala.concurrent.Future
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
import scala.concurrent.ExecutionContext.Implicits.global
import whisk.core.containerpool.docker.ProcessRunner
+
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.concurrent.Await
import org.scalatest.Matchers
import whisk.core.containerpool.docker.ProcessRunningException
+
import scala.language.reflectiveCalls // Needed to invoke run() method of structural ProcessRunner
extension
@RunWith(classOf[JUnitRunner])
-class ProcessRunnerTests extends FlatSpec with Matchers {
+class ProcessRunnerTests extends FlatSpec with Matchers with WskActorSystem {
- def await[A](f: Future[A], timeout: FiniteDuration = 500.milliseconds) = Await.result(f,
timeout)
+ def await[A](f: Future[A], timeout: FiniteDuration = 2.seconds) = Await.result(f, timeout)
val processRunner = new ProcessRunner {
- def run(args: String*)(implicit ec: ExecutionContext) = executeProcess(args: _*)
+ def run(args: Seq[String], timeout: FiniteDuration = 100.milliseconds)(implicit ec: ExecutionContext,
+ as: ActorSystem)
=
+ executeProcess(args, timeout)(ec, as)
}
behavior of "ProcessRunner"
it should "run an external command successfully and capture its output" in {
val stdout = "Output"
- await(processRunner.run("echo", stdout)) shouldBe stdout
+ await(processRunner.run(Seq("echo", stdout))) shouldBe stdout
}
it should "run an external command unsuccessfully and capture its output" in {
@@ -53,8 +59,14 @@ class ProcessRunnerTests extends FlatSpec with Matchers {
val stdout = "Output"
val stderr = "Error"
- val future = processRunner.run("/bin/sh", "-c", s"echo ${stdout}; echo ${stderr} 1>&2;
exit ${exitCode}")
+ val future = processRunner.run(Seq("/bin/sh", "-c", s"echo ${stdout}; echo ${stderr}
1>&2; exit ${exitCode}"))
the[ProcessRunningException] thrownBy await(future) shouldBe ProcessRunningException(exitCode,
stdout, stderr)
}
+
+ it should "terminate an external command after the specified timeout is reached" in {
+ val future = processRunner.run(Seq("sleep", "1"), 100.milliseconds)
+ val exception = the[ProcessRunningException] thrownBy await(future)
+ exception.exitCode shouldBe 143
+ }
}
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala
index b4e2e47..cf5990b 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala
@@ -17,8 +17,9 @@
package whisk.core.containerpool.docker.test
-import scala.concurrent.Future
+import akka.actor.ActorSystem
+import scala.concurrent.Future
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
@@ -29,7 +30,7 @@ import scala.concurrent.duration._
import scala.concurrent.Await
import org.scalatest.Matchers
import whisk.core.containerpool.docker.RuncClient
-import common.StreamLogging
+import common.{StreamLogging, WskActorSystem}
import whisk.core.containerpool.ContainerId
import whisk.common.TransactionId
import org.scalatest.BeforeAndAfterEach
@@ -37,7 +38,7 @@ import whisk.common.LogMarker
import whisk.common.LoggingMarkers.INVOKER_RUNC_CMD
@RunWith(classOf[JUnitRunner])
-class RuncClientTests extends FlatSpec with Matchers with StreamLogging with BeforeAndAfterEach
{
+class RuncClientTests extends FlatSpec with Matchers with StreamLogging with BeforeAndAfterEach
with WskActorSystem {
override def beforeEach = stream.reset()
@@ -49,9 +50,10 @@ class RuncClientTests extends FlatSpec with Matchers with StreamLogging
with Bef
val runcCommand = "docker-runc"
/** Returns a RuncClient with a mocked result for 'executeProcess' */
- def runcClient(result: Future[String]) = new RuncClient(global) {
+ def runcClient(result: Future[String]) = new RuncClient()(global) {
override val runcCmd = Seq(runcCommand)
- override def executeProcess(args: String*)(implicit ec: ExecutionContext) = result
+ override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext,
as: ActorSystem) =
+ result
}
/** Calls a runc method based on the name of the method. */
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].
|