openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rab...@apache.org
Subject [incubator-openwhisk] branch master updated: Use an asynchronous process runner to spawn container commands. (#2752)
Date Sat, 16 Sep 2017 17:53:28 GMT
This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b0c037  Use an asynchronous process runner to spawn container commands. (#2752)
6b0c037 is described below

commit 6b0c03725b43d5454dc07ccc51318675b6b0905e
Author: Markus Thömmes <markusthoemmes@me.com>
AuthorDate: Sat Sep 16 19:53:23 2017 +0200

    Use an asynchronous process runner to spawn container commands. (#2752)
    
    scala.sys.Process only has a blocking interface which blocks one thread for the runtime
of the command. Furthermore, each of those processes open another thread to handle the output
streams of the started process. Under load, the system churns through a **lot** of threads,
which can cause efficiency problems.
    
    NuProcess is supposed to fix this by providing an epoll interface, to make waiting on
process completion asynchronous and its own (small) threadpool to maintain state and handle
streams.
---
 core/invoker/build.gradle                          |  2 +
 .../core/containerpool/docker/ProcessRunner.scala  | 49 ++++++++++++++--------
 2 files changed, 33 insertions(+), 18 deletions(-)

diff --git a/core/invoker/build.gradle b/core/invoker/build.gradle
index 14a9191..d60ea40 100644
--- a/core/invoker/build.gradle
+++ b/core/invoker/build.gradle
@@ -13,6 +13,8 @@ repositories {
 dependencies {
     compile "org.scala-lang:scala-library:${gradle.scala.version}"
     compile project(':common:scala')
+
+    compile "com.zaxxer:nuprocess:1.1.2"
 }
 
 tasks.withType(ScalaCompile) {
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
index f8139b4..8540203 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
@@ -17,11 +17,13 @@
 
 package whisk.core.containerpool.docker
 
-import scala.collection.mutable
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.blocking
-import scala.sys.process._
+import java.nio.ByteBuffer
+
+import akka.util.ByteString
+import com.zaxxer.nuprocess.{NuAbstractProcessHandler, NuProcessBuilder}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future, Promise}
 
 trait ProcessRunner {
 
@@ -35,19 +37,30 @@ trait ProcessRunner {
    * @param args command to be run including arguments
    * @return a future completing according to the command's exit code
    */
-  protected def executeProcess(args: String*)(implicit ec: ExecutionContext) =
-    Future(blocking {
-      val out = new mutable.ListBuffer[String]
-      val err = new mutable.ListBuffer[String]
-      val exitCode = args ! ProcessLogger(o => out += o, e => err += e)
-
-      (exitCode, out.mkString("\n"), err.mkString("\n"))
-    }).flatMap {
-      case (0, stdout, _) =>
-        Future.successful(stdout)
-      case (code, stdout, stderr) =>
-        Future.failed(ProcessRunningException(code, stdout, stderr))
-    }
+  protected def executeProcess(args: String*)(implicit ec: ExecutionContext): Future[String]
= {
+    val promise = Promise[String]()
+    val pb: NuProcessBuilder = new NuProcessBuilder(args.asJava)
+    pb.setProcessListener(new NuAbstractProcessHandler {
+      var out = ByteString.empty
+      var err = ByteString.empty
+
+      override def onExit(code: Int): Unit = code match {
+        case 0 => promise.success(out.utf8String.trim)
+        case _ => promise.failure(ProcessRunningException(code, out.utf8String.trim, err.utf8String.trim))
+      }
+
+      override def onStderr(buffer: ByteBuffer, closed: Boolean) = {
+        err = err ++ ByteString.fromByteBuffer(buffer)
+      }
+
+      override def onStdout(buffer: ByteBuffer, closed: Boolean): Unit = {
+        out = out ++ ByteString.fromByteBuffer(buffer)
+      }
+    })
+
+    pb.start()
+    promise.future
+  }
 }
 
 case class ProcessRunningException(exitCode: Int, stdout: String, stderr: String)

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Mime
View raw message