openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rab...@apache.org
Subject [incubator-openwhisk] branch master updated: Remove old invoker code and refactor accordingly. (#2602)
Date Wed, 16 Aug 2017 15:57:48 GMT
This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 10fae73  Remove old invoker code and refactor accordingly. (#2602)
10fae73 is described below

commit 10fae73e15cf886bb3ad892e659b15a28cc4c80b
Author: Markus Thömmes <markusthoemmes@me.com>
AuthorDate: Wed Aug 16 17:57:46 2017 +0200

    Remove old invoker code and refactor accordingly. (#2602)
    
    - Refactoring the invoker for better encapsulation and separation of concerns in its packages.
    - Removing Dispatcher.
    - Remove unused properties from the deployment manifests.
    - Add error handling for terminal failures
---
 ansible/environments/docker-machine/group_vars/all |   3 -
 ansible/environments/local/group_vars/all          |   3 -
 ansible/environments/mac/group_vars/all            |   3 -
 ansible/group_vars/all                             |   4 -
 ansible/roles/invoker/tasks/deploy.yml             |   5 -
 ansible/templates/whisk.properties.j2              |   4 -
 .../src/main/scala/whisk/core/WhiskConfig.scala    |  12 -
 .../scala/whisk/core/container/Container.scala     | 119 ---
 .../scala/whisk/core/container/ContainerPool.scala | 846 ---------------------
 .../whisk/core/container/ContainerUtils.scala      | 329 --------
 .../scala/whisk/core/container/RuncUtils.scala     |  80 --
 .../whisk/core/container/WhiskContainer.scala      | 176 -----
 .../main/scala/whisk/core/container/package.scala  | 150 ----
 .../scala/whisk/core/containerpool/Container.scala |  15 +-
 .../whisk/core/containerpool/ContainerProxy.scala  |   1 -
 .../docker/DockerActionLogDriver.scala}            |  11 +-
 .../containerpool/docker/DockerContainer.scala     |  16 +-
 .../docker}/HttpUtils.scala                        |   8 +-
 .../scala/whisk/core/dispatcher/Dispatcher.scala   | 159 ----
 .../whisk/core/dispatcher/MessageHandler.scala     |  39 -
 .../main/scala/whisk/core/invoker/Invoker.scala    | 460 +----------
 .../scala/whisk/core/invoker/InvokerReactive.scala | 161 ++--
 .../scala/actionContainers/ActionContainer.scala   |   2 +-
 .../core/container/test/ContainerPoolTests.scala   | 279 -------
 .../docker/test}/ActionLogDriverTests.scala        |  14 +-
 .../docker}/test/ContainerConnectionTests.scala    |  10 +-
 .../docker/test/DockerContainerTests.scala         |   8 +-
 .../containerpool/test/ContainerProxyTests.scala   |   1 -
 .../core/dispatcher/test/DispatcherTests.scala     | 155 ----
 .../test/ActivationResponseTests.scala             |   6 +-
 30 files changed, 160 insertions(+), 2919 deletions(-)

diff --git a/ansible/environments/docker-machine/group_vars/all b/ansible/environments/docker-machine/group_vars/all
index 5628a1f..08bf632 100644
--- a/ansible/environments/docker-machine/group_vars/all
+++ b/ansible/environments/docker-machine/group_vars/all
@@ -36,8 +36,5 @@ apigw_auth_pwd: ""
 apigw_host: "http://{{ groups['apigateway']|first }}:{{apigateway.port.api}}/v1"
 apigw_host_v2: "http://{{ groups['apigateway']|first }}:{{apigateway.port.api}}/v2"
 
-# RunC enablement
-invoker_use_runc: true
-
 controller_arguments: '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=1098'
 invoker_arguments: "{{ controller_arguments }}"
\ No newline at end of file
diff --git a/ansible/environments/local/group_vars/all b/ansible/environments/local/group_vars/all
index 22b23c4..c7914df 100755
--- a/ansible/environments/local/group_vars/all
+++ b/ansible/environments/local/group_vars/all
@@ -30,8 +30,5 @@ apigw_auth_pwd: ""
 apigw_host: "http://{{ groups['apigateway']|first }}:{{apigateway.port.api}}/v1"
 apigw_host_v2: "http://{{ groups['apigateway']|first }}:{{apigateway.port.api}}/v2"
 
-# RunC enablement
-invoker_use_runc: true
-
 controller_arguments: '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=1098'
 invoker_arguments: "{{ controller_arguments }}"
\ No newline at end of file
diff --git a/ansible/environments/mac/group_vars/all b/ansible/environments/mac/group_vars/all
index bd09410..c596c41 100755
--- a/ansible/environments/mac/group_vars/all
+++ b/ansible/environments/mac/group_vars/all
@@ -30,8 +30,5 @@ apigw_auth_pwd: ""
 apigw_host: "http://{{ groups['apigateway']|first }}:{{apigateway.port.api}}/v1"
 apigw_host_v2: "http://{{ groups['apigateway']|first }}:{{apigateway.port.api}}/v2"
 
-# RunC enablement
-invoker_use_runc: true
-
 controller_arguments: '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=1098'
 invoker_arguments: "{{ controller_arguments }}"
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 2763ec5..0f99f0d 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -146,10 +146,6 @@ invoker:
   numcore: 2
   coreshare: 2
   busyThreshold: "{{ invoker_busy_threshold | default(16) }}"
-  serializeDockerOp: true
-  serializeDockerPull: true
-  useRunc: false
-  useReactivePool: "{{ invoker_use_reactive_pool | default(true) }}"
   instances: "{{ groups['invokers'] | length }}"
 
 nginx:
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index b4e43f2..8747576 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -84,7 +84,6 @@
         -e WHISK_API_HOST_PORT='{{ whisk_api_host_port | default('443') }}'
         -e WHISK_API_HOST_NAME='{{ whisk_api_host_name | default(groups['edge'] | first) }}'
         -e RUNTIMES_MANIFEST='{{ runtimesManifest | to_json }}'
-        -e SELF_DOCKER_ENDPOINT='localhost'
         -e DOCKER_REGISTRY='{{ docker_registry }}'
         -e DOCKER_IMAGE_PREFIX='{{ docker.image.prefix }}'
         -e DOCKER_IMAGE_TAG='{{ docker.image.tag }}'
@@ -93,10 +92,6 @@
         -e INVOKER_CONTAINER_DNS='{{ invoker_container_network_dns_servers | default()}}'
         -e INVOKER_NUMCORE='{{ invoker.numcore }}'
         -e INVOKER_CORESHARE='{{ invoker.coreshare }}'
-        -e INVOKER_SERIALIZEDOCKEROP='{{ invoker.serializeDockerOp }}'
-        -e INVOKER_SERIALIZEDOCKERPULL='{{ invoker.serializeDockerPull }}'
-        -e INVOKER_USERUNC='{{ invoker_use_runc | default(invoker.useRunc) }}'
-        -e INVOKER_USEREACTIVEPOOL='{{ invoker.useReactivePool }}'
         -e WHISK_LOGS_DIR='{{ whisk_logs_dir }}'
         -v /sys/fs/cgroup:/sys/fs/cgroup
         -v /run/runc:/run/runc
diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2
index 1543f70..711653f 100644
--- a/ansible/templates/whisk.properties.j2
+++ b/ansible/templates/whisk.properties.j2
@@ -68,10 +68,6 @@ invoker.container.policy={{ invoker_container_policy_name | default()}}
 invoker.container.dns={{ invoker_container_network_dns_servers | default()}}
 invoker.numcore={{ invoker.numcore }}
 invoker.coreshare={{ invoker.coreshare }}
-invoker.serializeDockerOp={{ invoker.serializeDockerOp }}
-invoker.serializeDockerPull={{ invoker.serializeDockerPull }}
-invoker.useRunc={{ invoker_use_runc | default(invoker.useRunc) }}
-invoker.useReactivePool={{ invoker.useReactivePool }}
 invoker.instances={{ invoker.instances }}
 
 main.docker.endpoint={{ groups["controllers"]|first }}:{{ docker.port }}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 1f3bbee..96ff45d 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -53,11 +53,9 @@ class WhiskConfig(
         properties
     }
 
-    val logsDir = this(WhiskConfig.logsDir)
     val servicePort = this(WhiskConfig.servicePort)
     val dockerRegistry = this(WhiskConfig.dockerRegistry)
     val dockerEndpoint = this(WhiskConfig.dockerEndpoint)
-    val selfDockerEndpoint = this(WhiskConfig.selfDockerEndpoint)
     val dockerPort = this(WhiskConfig.dockerPort)
 
     val dockerImagePrefix = this(WhiskConfig.dockerImagePrefix)
@@ -68,10 +66,6 @@ class WhiskConfig(
     val invokerContainerDns = if (this(WhiskConfig.invokerContainerDns) == "") Seq() else this(WhiskConfig.invokerContainerDns).split(" ").toSeq
     val invokerNumCore = this(WhiskConfig.invokerNumCore)
     val invokerCoreShare = this(WhiskConfig.invokerCoreShare)
-    val invokerSerializeDockerOp = this(WhiskConfig.invokerSerializeDockerOp)
-    val invokerSerializeDockerPull = this(WhiskConfig.invokerSerializeDockerPull)
-    val invokerUseRunc = this(WhiskConfig.invokerUseRunc)
-    val invokerUseReactivePool = this(WhiskConfig.invokerUseReactivePool)
 
     val wskApiHost = this(WhiskConfig.wskApiProtocol) + "://" + this(WhiskConfig.wskApiHostname) + ":" + this(WhiskConfig.wskApiPort)
     val controllerBlackboxFraction = this.getAsDouble(WhiskConfig.controllerBlackboxFraction, 0.10)
@@ -158,13 +152,11 @@ object WhiskConfig {
             key.replace('.', '_').toUpperCase
         else null
 
-    val logsDir = "whisk.logs.dir"
     val servicePort = "port"
     val dockerRegistry = "docker.registry"
     val dockerPort = "docker.port"
 
     val dockerEndpoint = "main.docker.endpoint"
-    val selfDockerEndpoint = "self.docker.endpoint"
 
     val dbProvider = "db.provider"
     val dbProtocol = "db.protocol"
@@ -194,10 +186,6 @@ object WhiskConfig {
     val invokerContainerDns = "invoker.container.dns"
     val invokerNumCore = "invoker.numcore"
     val invokerCoreShare = "invoker.coreshare"
-    val invokerSerializeDockerOp = "invoker.serializeDockerOp"
-    val invokerSerializeDockerPull = "invoker.serializeDockerPull"
-    val invokerUseRunc = "invoker.useRunc"
-    val invokerUseReactivePool = "invoker.useReactivePool"
 
     val wskApiProtocol = "whisk.api.host.proto"
     val wskApiPort = "whisk.api.host.port"
diff --git a/core/invoker/src/main/scala/whisk/core/container/Container.scala b/core/invoker/src/main/scala/whisk/core/container/Container.scala
deleted file mode 100644
index a966357..0000000
--- a/core/invoker/src/main/scala/whisk/core/container/Container.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.container
-
-import scala.annotation.tailrec
-
-import whisk.common.Counter
-import whisk.common.Logging
-import whisk.common.TransactionId
-import whisk.core.WhiskConfig.invokerContainerNetwork
-import whisk.core.WhiskConfig.selfDockerEndpoint
-import whisk.core.entity.ActionLimits
-
-/**
- * Reifies a docker container.
- */
-class Container(
-    originalId: TransactionId,
-    useRunc: Boolean,
-    val dockerhost: String,
-    mounted: Boolean,
-    val key: ActionContainerId,
-    containerName: Option[ContainerName],
-    val image: String,
-    network: String,
-    cpuShare: Int,
-    policy: Option[String],
-    dnsServers: Seq[String],
-    val limits: ActionLimits = ActionLimits(),
-    env: Map[String, String] = Map(),
-    args: Array[String] = Array())(
-        implicit val logging: Logging)
-    extends ContainerUtils {
-
-    implicit var transid = originalId
-
-    val id = Container.idCounter.next()
-    val nameAsString = containerName.map(_.name).getOrElse("anon")
-
-    val (containerId, containerHostAndPort) = bringup(mounted, containerName, image, network, cpuShare, env, args, limits, policy, dnsServers)
-
-    def details: String = {
-        val name = containerName.map(_.name) getOrElse "??"
-        val id = containerId.id
-        val ip = containerHostAndPort getOrElse "??"
-        s"container [$name] [$id] [$ip]"
-    }
-
-    def pause(): Boolean =
-        if (useRunc) {
-            RuncUtils.isSuccessful(RuncUtils.pause(containerId))
-        } else {
-            DockerOutput.isSuccessful(pauseContainer(containerId))
-        }
-
-    def unpause(): Boolean =
-        if (useRunc) {
-            RuncUtils.isSuccessful(RuncUtils.resume(containerId))
-        } else {
-            DockerOutput.isSuccessful(unpauseContainer(containerId))
-        }
-
-    /**
-     * A prefix of the container id known to be displayed by docker ps.
-     */
-    lazy val containerIdPrefix: String = {
-        // docker ps contains only a prefix of the id
-        containerId.id.take(8)
-    }
-
-    /**
-     * Gets logs for container.
-     */
-    def getLogs()(implicit transid: TransactionId): String = {
-        getContainerLogs(containerId).toOption getOrElse ""
-    }
-
-    /**
-     * Unpauses and removes a container (it may be running).
-     */
-    @tailrec
-    final def remove(needUnpause: Boolean, tryCount: Int = Container.removeContainerRetryCount)(implicit transid: TransactionId): Unit = {
-        if (tryCount <= 0) {
-            logging.error(this, s"Failed to remove container ${containerId.id}")
-        } else {
-            if (tryCount == Container.removeContainerRetryCount) {
-                logging.info(this, s"Removing container ${containerId.id}")
-            } else {
-                logging.warn(this, s"Retrying to remove container ${containerId.id}")
-            }
-            if (needUnpause) unpause() // a paused container cannot be removed
-            rmContainer(containerId).toOption match {
-                case None => remove(needUnpause, tryCount - 1)
-                case _    => ()
-            }
-        }
-    }
-}
-
-object Container {
-    def requiredProperties = Map(selfDockerEndpoint -> null, invokerContainerNetwork -> "bridge")
-    private val idCounter = new Counter()
-    private val removeContainerRetryCount = 2
-}
diff --git a/core/invoker/src/main/scala/whisk/core/container/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/container/ContainerPool.scala
deleted file mode 100644
index 95e85be..0000000
--- a/core/invoker/src/main/scala/whisk/core/container/ContainerPool.scala
+++ /dev/null
@@ -1,846 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.container
-
-import java.nio.file.Files
-import java.nio.file.Paths
-import java.util.Timer
-import java.util.TimerTask
-import java.util.concurrent.ConcurrentLinkedQueue
-import java.util.concurrent.locks.ReentrantLock
-
-import scala.annotation.tailrec
-import scala.collection.concurrent.TrieMap
-import scala.collection.mutable.ListBuffer
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.util.{ Try, Success, Failure }
-
-import akka.actor.ActorSystem
-import whisk.common.Counter
-import whisk.common.Logging
-import whisk.common.Scheduler
-import whisk.common.TimingUtil
-import whisk.common.TransactionId
-import whisk.core.WhiskConfig
-import whisk.core.WhiskConfig._
-import whisk.core.entity._
-
-/**
- * A thread-safe container pool that internalizes container creation/teardown and allows users
- * to check out a container.
- *
- * Synchronization via "this" is used to maintain integrity of the data structures.
- * A separate object "gcSync" is used to prevent multiple GC's from occurring.
- *
- * TODO: Parallel container creation under evaluation for docker 12.
- */
-class ContainerPool(
-    config: WhiskConfig,
-    invokerInstance: InstanceId = InstanceId(0),
-    standalone: Boolean = false,
-    saveContainerLog: Boolean = false)(implicit actorSystem: ActorSystem, val logging: Logging)
-    extends ContainerUtils {
-
-    implicit val executionContext = actorSystem.dispatcher
-
-    val mounted = !standalone
-    val dockerhost = config.selfDockerEndpoint
-    val serializeDockerOp = config.invokerSerializeDockerOp.toBoolean
-    val serializeDockerPull = config.invokerSerializeDockerPull.toBoolean
-    val useRunc = checkRuncAccess(config.invokerUseRunc.toBoolean)
-    logging.info(this, s"dockerhost = $dockerhost    serializeDockerOp = $serializeDockerOp   serializeDockerPull = $serializeDockerPull   useRunC = $useRunc")
-
-    // Eventually, we will have a more sophisticated warmup strategy that does multiple sizes
-    private val defaultMemoryLimit = MemoryLimit(MemoryLimit.STD_MEMORY)
-    private val NODEJS6_IMAGE = ExecManifest.runtimesManifest.manifests("nodejs:6").image
-
-    /**
-     *  Check whether we should use runc.  To do so,
-     *  1. The whisk config flag must be on.
-     *  2. Runc must be successfully accessible.  This is a failsafe in case runc is not set up correctly.
-     *     For this stage, logging shows success or failure if we get this far.
-     */
-    def checkRuncAccess(useRunc: Boolean): Boolean = {
-        if (useRunc) {
-            implicit val tid = TransactionId.invokerNanny
-            val (code, result) = RuncUtils.list()
-            val success = (code == 0)
-            if (success) {
-                logging.info(this, s"Using runc. list result: ${result}")
-            } else {
-                logging.warn(this, s"Not using runc due to error (code = ${code}): ${result}")
-            }
-            success
-        } else {
-            logging.info(this, s"Not using runc because of configuration flag")
-            false
-        }
-    }
-
-    /**
-     * Enables GC.
-     */
-    def enableGC(): Unit = {
-        gcOn = true
-    }
-
-    /**
-     * Disables GC. If disabled, overrides other flags/methods.
-     */
-    def disableGC(): Unit = {
-        gcOn = false
-    }
-
-    /**
-     * Performs a GC immediately of all idle containers, blocking the caller until completed.
-     */
-    def forceGC()(implicit transid: TransactionId): Unit = {
-        removeAllIdle({ containerInfo => true })
-    }
-
-    /*
-     * Getter/Setter for various GC parameters.
-     */
-    def gcThreshold: FiniteDuration = _gcThreshold
-    def maxIdle: Int = _maxIdle // container count
-    def maxActive: Int = _maxActive // container count
-    def gcThreshold_=(value: FiniteDuration): Unit = _gcThreshold = (Duration.Zero max value)
-    def maxIdle_=(value: Int): Unit = _maxIdle = Math.max(0, value)
-    def maxActive_=(value: Int): Unit = _maxActive = Math.max(0, value)
-
-    def resetMaxIdle() = _maxIdle = defaultMaxIdle
-    def resetMaxActive() = {
-        _maxActive = ContainerPool.getDefaultMaxActive(config)
-        logging.info(this, s"maxActive set to ${_maxActive}")
-    }
-    def resetGCThreshold() = _gcThreshold = defaultGCThreshold
-
-    /*
-     * Controls where docker container logs are put.
-     */
-    def logDir: String = _logDir
-    def logDir_=(value: String): Unit = _logDir = value
-
-    /*
-     * How many containers are in the pool at the moment?
-     * There are also counts of containers we are trying to start but have not inserted into the data structure.
-     */
-    def idleCount() = countByState(State.Idle)
-    def activeCount() = countByState(State.Active)
-    private val startingCounter = new Counter()
-    private var shuttingDown = false
-
-    /*
-     * Tracks requests for getting containers.
-     * The first value doled out via nextPosition.next() will be 1 and completedPosition.cur remains at 0 until completion.
-     */
-    private val nextPosition = new Counter()
-    private val completedPosition = new Counter()
-
-    /*
-     * Lists ALL containers at this docker point with "docker ps -a --no-trunc".
-     * This could include containers not in this pool at all.
-     */
-    def listAll()(implicit transid: TransactionId): Seq[ContainerState] = listContainers(true)
-
-    /**
-     * Retrieves (possibly create) a container based on the subject and versioned action.
-     * A flag is included to indicate whether initialization succeeded.
-     * The invariant of returning the container back to the pool holds regardless of whether init succeeded or not.
-     * In case of failure to start a container (or for failed docker operations e.g., pull), an exception is thrown.
-     */
-    def getAction(action: ExecutableWhiskAction, auth: AuthKey)(implicit transid: TransactionId): (WhiskContainer, Option[RunResult]) = {
-        if (shuttingDown) {
-            logging.info(this, s"Shutting down: Not getting container for ${action.fullyQualifiedName(true)} with ${auth.uuid}")
-            throw new Exception("system is shutting down")
-        } else {
-            val key = ActionContainerId(auth.uuid, action.fullyQualifiedName(true).toString, action.rev)
-            val myPos = nextPosition.next()
-            logging.info(this, s"""Getting container for ${action.fullyQualifiedName(true)} of kind ${action.exec.kind} with ${auth.uuid}:
-                          | myPos = $myPos
-                          | completed = ${completedPosition.cur}
-                          | slack = ${slack()}
-                          | activeCount = ${activeCount()}
-                          | toBeRemoved = ${toBeRemoved.size}
-                          | startingCounter = ${startingCounter.cur}""".stripMargin)
-            val conResult = Try(getContainer(1, myPos, key, () => makeWhiskContainer(action, auth)))
-            completedPosition.next()
-            conResult match {
-                case Success(Cold(con)) =>
-                    logging.info(this, s"Obtained cold container ${con.containerId.id} - about to initialize")
-                    val initResult = initWhiskContainer(action, con)
-                    (con, Some(initResult))
-                case Success(Warm(con)) =>
-                    logging.info(this, s"Obtained warm container ${con.containerId.id}")
-                    (con, None)
-                case Failure(t) =>
-                    logging.error(this, s"Exception while trying to get a container: $t")
-                    throw t
-            }
-        }
-    }
-
-    /*
-     * For testing by ContainerPoolTests where non whisk containers are used.
-     * These do not require initialization.
-     */
-    def getByImageName(imageName: String, args: Array[String])(implicit transid: TransactionId): Option[Container] = {
-        logging.info(this, s"Getting container for image $imageName with args " + args.mkString(" "))
-        // Not a regular key. Doesn't matter in testing.
-        val key = new ActionContainerId(s"instantiated." + imageName + args.mkString("_"))
-        getContainer(1, 0, key, () => makeContainer(key, imageName, args)) match {
-            case Cold(con) => Some(con)
-            case Warm(con) => Some(con)
-            case _         => None
-        }
-    }
-
-    /**
-     * Tries to get/create a container via the thunk by delegating to getOrMake.
-     * This method will apply retry so that the caller is blocked until retry succeeds.
-     */
-    @tailrec
-    final def getContainer(tryCount: Int, position: Long, key: ActionContainerId, conMaker: () => WhiskContainer)(implicit transid: TransactionId): ContainerResult = {
-        val positionInLine = position - completedPosition.cur // Indicates queue position.  1 means front of the line
-        val available = slack()
-        // Warn at 10 seconds and then once a minute after that.
-        val waitDur = 50.millis
-        val warnAtCount = 10.seconds.toMillis / waitDur.toMillis
-        val warnPeriodic = 60.seconds.toMillis / waitDur.toMillis
-        if (tryCount == warnAtCount || tryCount % warnPeriodic == 0) {
-            logging.warn(this, s"""getContainer has been waiting about ${warnAtCount * waitDur.toMillis} ms:
-                          | position = $position
-                          | completed = ${completedPosition.cur}
-                          | slack = $available
-                          | maxActive = ${_maxActive}
-                          | activeCount = ${activeCount()}
-                          | startingCounter = ${startingCounter.cur}""".stripMargin)
-        }
-        if (positionInLine <= available) {
-            getOrMake(key, conMaker) match {
-                case Some(cr) => cr
-                case None     => getContainer(tryCount + 1, position, key, conMaker)
-            }
-        } else { // It's not our turn in line yet.
-            Thread.sleep(waitDur.toMillis) // TODO: Replace with wait/notify but tricky because of desire for maximal concurrency
-            getContainer(tryCount + 1, position, key, conMaker)
-        }
-    }
-
-    def getNumberOfIdleContainers(key: ActionContainerId)(implicit transid: TransactionId): Int = {
-        this.synchronized {
-            keyMap.get(key) map { bucket => bucket.count { _.isIdle } } getOrElse 0
-        }
-    }
-
-    /*
-     * How many containers can we start?  Someone could have fully started a container so we must include startingCounter.
-     * The use of a method rather than a getter is meant to signify the synchronization in the implementation.
-     */
-    private def slack() = _maxActive - (activeCount() + startingCounter.cur + Math.max(toBeRemoved.size - RM_SLACK, 0))
-
-    /*
-     * Try to get or create a container, returning None if there are too many
-     * active containers.
-     *
-     * The multiple synchronization block, and the use of startingCounter,
-     * is needed to make sure container count is accurately tracked,
-     * data structure maintains integrity, but to keep all length operations
-     * outside of the lock.
-     *
-     * The returned container will be active (not paused).
-     */
-    def getOrMake(key: ActionContainerId, conMaker: () => WhiskContainer)(implicit transid: TransactionId): Option[ContainerResult] = {
-        retrieve(key) match {
-            case CacheMiss => {
-                val con = conMaker()
-                this.synchronized {
-                    introduceContainer(key, con).state = State.Active
-                }
-                Some(Cold(con))
-            }
-            case CacheHit(con) =>
-                con.transid = transid
-                runDockerOp {
-                    if (con.unpause()) {
-                        Some(Warm(con))
-                    } else {
-                        // resume failed, gc the container
-                        putBack(con, delete = true)
-                        None
-                    }
-                }
-            case CacheBusy => None
-        }
-    }
-
-    /**
-     * Obtains a pre-existing container from the pool - and putting it to Active state but without docker unpausing.
-     * If we are over capacity, signal Busy.
-     * If it does not exist ready to do, indicate a miss.
-     */
-    def retrieve(key: ActionContainerId)(implicit transid: TransactionId): CacheResult = {
-        this.synchronized {
-            // first check if there is a matching container and only if there aren't any
-            // determine if the pool is full or has capacity to accommodate a new container;
-            // this allows any new containers introduced into the pool to be reused if already idle
-            val bucket = keyMap.getOrElseUpdate(key, new ListBuffer())
-            bucket.find({ ci => ci.isIdle }) match {
-                case None =>
-                    if (activeCount() + startingCounter.cur >= _maxActive) {
-                        CacheBusy
-                    } else {
-                        CacheMiss
-                    }
-                case Some(ci) => {
-                    ci.state = State.Active
-                    CacheHit(ci.container)
-                }
-            }
-        }
-    }
-
-    /**
-     * Moves a container from one bucket (i.e. key) to a different one.
-     * This operation is performed when we specialize a pre-warmed container to an action.
-     * ContainerMap does not need to be updated as the Container <-> ContainerInfo relationship does not change.
-     */
-    def changeKey(ci: ContainerInfo, oldKey: ActionContainerId, newKey: ActionContainerId)(implicit transid: TransactionId) = {
-        this.synchronized {
-            assert(ci.state == State.Active)
-            assert(keyMap.contains(oldKey))
-            val oldBucket = keyMap(oldKey)
-            val newBucket = keyMap.getOrElseUpdate(newKey, new ListBuffer())
-            oldBucket -= ci
-            newBucket += ci
-        }
-    }
-
-    /**
-     * Returns the container to the pool or delete altogether.
-     * This call can be slow but not while locking data structure so it does not interfere with other activations.
-     */
-    def putBack(container: Container, delete: Boolean = false)(implicit transid: TransactionId): Unit = {
-        logging.info(this, s"""putBack returning container ${container.id}
-                      | delete = $delete
-                      | completed = ${completedPosition.cur}
-                      | slack = ${slack()}
-                      | maxActive = ${_maxActive}
-                      | activeCount = ${activeCount()}
-                      | startingCounter = ${startingCounter.cur}""".stripMargin)
-        // Docker operation outside sync block. Don't pause if we are deleting.
-        if (!delete) {
-            runDockerOp {
-                // pausing eagerly is pessimal; there could be an action waiting
-                // that will immediately unpause the same container to reuse it;
-                // to skip pausing, will need to inspect the queue of waiting activations
-                // for a matching key
-                container.pause()
-            }
-        }
-
-        val toBeDeleted = this.synchronized { // Return container to pool logically and then optionally delete
-            // Always put back logically for consistency
-            val Some(ci) = containerMap.get(container)
-            assert(ci.state == State.Active)
-            ci.lastUsed = System.currentTimeMillis()
-            ci.state = State.Idle
-            val toBeDeleted = if (delete) {
-                removeContainerInfo(ci) // no docker operation here
-                List(ci)
-            } else {
-                List()
-            }
-            this.notify()
-            toBeDeleted
-        }
-
-        toBeDeleted.foreach { ci => toBeRemoved.offer(RemoveJob(false, ci)) }
-        // Perform capacity-based GC here.
-        if (gcOn) { // Synchronization occurs inside calls in a fine-grained manner.
-            while (idleCount() > _maxIdle) { // it is safe for this to be non-atomic with body
-                removeOldestIdle()
-            }
-        }
-    }
-
-    // ------------------------------------------------------------------------------------------------------------
-
-    object State extends Enumeration {
-        val Idle, Active = Value
-    }
-
-    /**
-     * Wraps a Container to allow a ContainerPool-specific information.
-     */
-    class ContainerInfo(k: ActionContainerId, con: WhiskContainer) {
-        val key = k
-        val container = con
-        var state = State.Idle
-        var lastUsed = System.currentTimeMillis()
-        def isIdle = state == State.Idle
-        def isStemCell = key == stemCellNodejsKey
-    }
-
-    private val containerMap = new TrieMap[Container, ContainerInfo]
-    private val keyMap = new TrieMap[ActionContainerId, ListBuffer[ContainerInfo]]
-
-    // These are containers that are already removed from the data structure waiting to be docker-removed
-    case class RemoveJob(needUnpause: Boolean, containerInfo: ContainerInfo)
-    private val toBeRemoved = new ConcurrentLinkedQueue[RemoveJob]
-
-    // Note that the prefix separates the name space of this from regular keys.
-    // TODO: Generalize across language by storing image name when we generalize to other languages
-    //       Better heuristic for # of containers to keep warm - make sensitive to idle capacity
-    private val stemCellNodejsKey = StemCellNodeJsActionContainerId
-    private val WARM_NODEJS_CONTAINERS = 2
-
-    // This parameter controls how many outstanding un-removed containers there are before
-    // we stop stem cell container creation.  This is also the an allowance in slack calculation
-    // to allow limited de-coupling between container removal and creation when under load.
-    private val RM_SLACK = 4
-
-    private def keyMapToString(): String = {
-        keyMap.map(p => s"[${p._1.stringRepr} -> ${p._2}]").mkString("  ")
-    }
-
-    // Easier to walk containerMap than keyMap
-    private def countByState(state: State.Value) = this.synchronized { containerMap.count({ case (_, ci) => ci.state == state }) }
-
-    // Sample container name: wsk1_1_joeibmcomhelloWorldDemo_20150901T202701852Z
-    private def makeContainerName(localName: String): ContainerName =
-        ContainerCounter.containerName(invokerInstance.toInt.toString, localName)
-
-    private def makeContainerName(action: ExecutableWhiskAction): ContainerName =
-        makeContainerName(action.fullyQualifiedName(true).toString)
-
-    /**
-     * dockerLock is a fair lock used to serialize all docker operations except pull.
-     * However, a non-pull operation can run concurrently with a pull operation.
-     */
-    val dockerLock = new ReentrantLock(true)
-
-    /**
-     * dockerPullLock is used to serialize all pull operations.
-     */
-    val dockerPullLock = new ReentrantLock(true)
-
-    /* A background thread that
-     *   1. Kills leftover action containers on startup
-     *   2. (actually a Future) Periodically re-populates the container pool with fresh (un-instantiated) nodejs containers.
-     *   3. Periodically tears down containers that have logically been removed from the system
-     *   4. Each of the significant method subcalls are guarded to not throw an exception.
-     */
-    private def nannyThread(allContainers: Seq[ContainerState]) = new Thread {
-        override def run {
-            implicit val tid = TransactionId.invokerNanny
-            if (mounted) {
-                killStragglers(allContainers)
-                // Create a new stem cell if the number of warm containers is less than the count allowed
-                // as long as there is slack so that any actions that may be waiting to create a container
-                // are not held back; Note since this method is not fully synchronized, it is possible to
-                // start this operation while there is slack and end up waiting on the docker lock later
-                val warmupInterval = 100.milliseconds
-                Scheduler.scheduleWaitAtLeast(warmupInterval) { () =>
-                    implicit val tid = TransactionId.invokerWarmup
-                    if (getNumberOfIdleContainers(stemCellNodejsKey) < WARM_NODEJS_CONTAINERS && slack() > 0 && toBeRemoved.size < RM_SLACK) {
-                        addStemCellNodejsContainer()(tid)
-                    } else {
-                        Future.successful(())
-                    }
-                }
-            }
-            while (true) {
-                Thread.sleep(100) // serves to prevent busy looping
-                // We grab the size first so we know there has been enough delay for anything we are shutting down
-                val size = toBeRemoved.size()
-                1 to size foreach { _ =>
-                    val removeJob = toBeRemoved.poll()
-                    if (removeJob != null) {
-                        Thread.sleep(100) // serves to not hog docker lock and add slack
-                        teardownContainer(removeJob)
-                    } else {
-                        logging.error(this, "toBeRemove.poll failed - possibly another concurrent remover?")
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Gracefully terminates by shutting down containers upon SIGTERM.
-     * If one desires to kill the invoker without this, send it SIGKILL.
-     */
-    private def shutdown() = {
-        implicit val id = TransactionId.invokerWarmup
-        shuttingDown = true
-        killStragglers(listAll())
-    }
-
-    /**
-     * All docker operations from the pool must pass through here (except for pull).
-     */
-    private def runDockerOp[T](dockerOp: => T)(implicit transid: TransactionId): T = {
-        runDockerOpWithLock(serializeDockerOp, dockerLock, dockerOp)
-    }
-
-    /**
-     * All docker pull operations from the pool must pass through here.
-     */
-    private def runDockerPull[T](dockerOp: => T)(implicit transid: TransactionId): T = {
-        runDockerOpWithLock(serializeDockerPull, dockerPullLock, dockerOp)
-    }
-
-    /**
-     * All docker operations from the pool must pass through here (except for pull).
-     */
-    private def runDockerOpWithLock[T](useLock: Boolean, lock: ReentrantLock, dockerOp: => T)(implicit transid: TransactionId): T = {
-        if (useLock) {
-            lock.lock()
-        }
-        try {
-            val (elapsed, result) = TimingUtil.time { dockerOp }
-            if (elapsed > slowDockerThreshold) {
-                logging.warn(this, s"Docker operation took $elapsed")
-            }
-            result
-        } finally {
-            if (useLock) {
-                lock.unlock()
-            }
-        }
-    }
-
-    /*
-     * This method will introduce a stem cell container into the system.
-     * If container creation fails, the container will not be entered into the pool.
-     */
-    private def addStemCellNodejsContainer()(implicit transid: TransactionId) = Future {
-        val imageName = NODEJS6_IMAGE.localImageName(config.dockerRegistry, config.dockerImagePrefix, Some(config.dockerImageTag))
-        val limits = ActionLimits(TimeLimit(), defaultMemoryLimit, LogLimit())
-        val containerName = makeContainerName("warmJsContainer")
-        logging.info(this, "Starting warm nodejs container")
-        val con = makeGeneralContainer(stemCellNodejsKey, containerName, imageName, limits, false)
-        this.synchronized {
-            introduceContainer(stemCellNodejsKey, con)
-        }
-        logging.info(this, s"Started warm nodejs container ${con.id}: ${con.containerId}")
-    } andThen {
-        case Failure(t) => logging.warn(this, s"addStemCellNodejsContainer encountered an exception: ${t.getMessage}")
-    }
-
-    private def getStemCellNodejsContainer(key: ActionContainerId)(implicit transid: TransactionId): Option[WhiskContainer] =
-        retrieve(stemCellNodejsKey) match {
-            case CacheHit(con) =>
-                logging.info(this, s"Obtained a pre-warmed container")
-                con.transid = transid
-                val Some(ci) = containerMap.get(con)
-                changeKey(ci, stemCellNodejsKey, key)
-                Some(con)
-            case _ => None
-        }
-
-    // Obtain a container (by creation or promotion) and initialize by sending code.
-    private def makeWhiskContainer(action: ExecutableWhiskAction, auth: AuthKey)(implicit transid: TransactionId): WhiskContainer = {
-        val imageName = getDockerImageName(action)
-        val limits = action.limits
-        val nodeImageName = NODEJS6_IMAGE.localImageName(config.dockerRegistry, config.dockerImagePrefix, Some(config.dockerImageTag))
-        val key = ActionContainerId(auth.uuid, action.fullyQualifiedName(true).toString, action.rev)
-        val warmedContainer = if (limits.memory == defaultMemoryLimit && imageName == nodeImageName) getStemCellNodejsContainer(key) else None
-        val containerName = makeContainerName(action)
-        warmedContainer getOrElse {
-            try {
-                logging.info(this, s"making new container because none available")
-                startingCounter.next()
-                // only Exec instances that are subtypes of CodeExec reach the invoker
-                makeGeneralContainer(key, containerName, imageName, limits, action.exec.asInstanceOf[CodeExec[_]].pull)
-            } finally {
-                val newCount = startingCounter.prev()
-                logging.info(this, s"finished trying to make container, $newCount more containers to start")
-            }
-        }
-    }
-
-    // Make a container somewhat generically without introducing into data structure.
-    // There is access to global settings (docker registry)
-    // and generic settings (image name - static limits) but without access to WhiskAction.
-    private def makeGeneralContainer(
-        key: ActionContainerId, containerName: ContainerName,
-        imageName: String, limits: ActionLimits, pull: Boolean)(
-            implicit transid: TransactionId): WhiskContainer = {
-
-        val network = config.invokerContainerNetwork
-        val cpuShare = ContainerPool.cpuShare(config)
-        val policy = config.invokerContainerPolicy
-        val dnsServers = config.invokerContainerDns
-        val env = getContainerEnvironment()
-
-        // distinguishes between a black box container vs a whisk container
-        def disambiguateContainerError[T](op: => T) = {
-            try { op } catch {
-                case e: ContainerError =>
-                    throw if (pull) BlackBoxContainerError(e.msg) else WhiskContainerError(e.msg)
-            }
-        }
-
-        if (pull) runDockerPull {
-            disambiguateContainerError {
-                // if pull fails, the transaction is aborted by throwing an exception;
-                // a pull is only done for black box container
-                ContainerUtils.pullImage(dockerhost, imageName)
-            }
-        }
-
-        // This will start up the container
-        runDockerOp {
-            disambiguateContainerError {
-                // because of the docker lock, by the time the container gets around to be started
-                // there could be a container to reuse (from a previous run of the same action, or
-                // from a stem cell container); should revisit this logic
-                new WhiskContainer(transid, useRunc, this.dockerhost, mounted, key, containerName, imageName,
-                    network, cpuShare, policy, dnsServers, env, limits)
-            }
-        }
-    }
-
-    // We send the payload here but eventually must also handle morphing a pre-allocated container into the right state.
-    private def initWhiskContainer(action: ExecutableWhiskAction, con: WhiskContainer)(implicit transid: TransactionId): RunResult = {
-        // Then send it the init payload which is code for now
-        con.init(action.containerInitializer, action.limits.timeout.duration)
-    }
-
-    /**
-     * Used through testing only. Creates a container running the command in `args`.
-     */
-    private def makeContainer(key: ActionContainerId, imageName: String, args: Array[String])(implicit transid: TransactionId): WhiskContainer = {
-        runDockerOp {
-            new WhiskContainer(transid, useRunc, this.dockerhost, mounted,
-                key, makeContainerName("testContainer"), imageName,
-                config.invokerContainerNetwork, ContainerPool.cpuShare(config),
-                config.invokerContainerPolicy, config.invokerContainerDns, Map(), ActionLimits(), args)
-        }
-    }
-
-    /**
-     * Adds the container into the data structure in an Idle state.
-     * The caller must have synchronized to maintain data structure atomicity.
-     */
-    private def introduceContainer(key: ActionContainerId, container: WhiskContainer)(implicit transid: TransactionId): ContainerInfo = {
-        val ci = new ContainerInfo(key, container)
-        keyMap.getOrElseUpdate(key, ListBuffer()) += ci
-        containerMap += container -> ci
-        dumpState("introduceContainer")
-        ci
-    }
-
-    private def dumpState(prefix: String)(implicit transid: TransactionId) = {
-        logging.debug(this, s"$prefix: keyMap = ${keyMapToString()}")
-    }
-
-    private def getDockerImageName(action: ExecutableWhiskAction)(implicit transid: TransactionId): String = {
-        // only Exec instances that are subtypes of CodeExec reach the invoker
-        val imageName = if (!action.exec.pull) {
-            action.exec.image.localImageName(config.dockerRegistry, config.dockerImagePrefix, Some(config.dockerImageTag))
-        } else action.exec.image.publicImageName
-        logging.debug(this, s"Using image ${imageName}")
-        imageName
-    }
-
-    private def getContainerEnvironment(): Map[String, String] = {
-        Map("__OW_API_HOST" -> config.wskApiHost)
-    }
-
-    private val defaultMaxIdle = 10
-    private val defaultGCThreshold = 600.seconds
-    private val slowDockerThreshold = 500.millis
-    private val slowDockerPullThreshold = 5.seconds
-
-    val gcFrequency = 1000.milliseconds // this should not be leaked but a test needs this until GC count is implemented
-    private var _maxIdle = defaultMaxIdle
-    private var _maxActive = 0
-    private var _gcThreshold = defaultGCThreshold
-    private var gcOn = true
-    private val gcSync = new Object()
-    resetMaxActive()
-
-    private val timer = new Timer()
-    private val gcTask = new TimerTask {
-        def run() {
-            performGC()(TransactionId.invoker)
-        }
-    }
-    timer.scheduleAtFixedRate(gcTask, 0, gcFrequency.toMillis)
-
-    /**
-     * Removes all idle containers older than the threshold.
-     */
-    private def performGC()(implicit transid: TransactionId) = {
-        val expiration = System.currentTimeMillis() - gcThreshold.toMillis
-        removeAllIdle({ containerInfo => containerInfo.lastUsed <= expiration })
-        dumpState("performGC")
-    }
-
-    /**
-     * Collects all containers that are in the idle state and pass the predicate.
-     * gcSync is used to prevent multiple GC's.
-     */
-    private def removeAllIdle(pred: ContainerInfo => Boolean)(implicit transid: TransactionId) = {
-        gcSync.synchronized {
-            val idleInfo = this.synchronized {
-                val idle = containerMap filter { case (container, ci) => ci.isIdle && pred(ci) }
-                idle.keys foreach { con =>
-                    logging.info(this, s"removeAllIdle removing container ${con.id}")
-                }
-                containerMap --= idle.keys
-                keyMap foreach { case (key, ciList) => ciList --= idle.values }
-                keyMap retain { case (key, ciList) => !ciList.isEmpty }
-                idle.values
-            }
-            idleInfo.foreach { idleCi => toBeRemoved.offer(RemoveJob(!idleCi.isStemCell, idleCi)) }
-        }
-    }
-
-    // Remove containerInfo from data structures but does not perform actual container operation.
-    // Caller must establish synchronization
-    private def removeContainerInfo(conInfo: ContainerInfo)(implicit transid: TransactionId) = {
-        containerMap -= conInfo.container
-        keyMap foreach { case (key, ciList) => ciList -= conInfo }
-        keyMap retain { case (key, ciList) => !ciList.isEmpty }
-    }
-
-    private def removeOldestIdle()(implicit transid: TransactionId) = {
-        // Note that the container removal - if any - is done outside the synchronized block
-        val oldestIdle = this.synchronized {
-            val idle = (containerMap filter { case (container, ci) => ci.isIdle })
-            if (idle.isEmpty)
-                List()
-            else {
-                val oldestConInfo = idle.minBy(_._2.lastUsed)._2
-                logging.info(this, s"removeOldestIdle removing container ${oldestConInfo.container.id}")
-                removeContainerInfo(oldestConInfo)
-                List(oldestConInfo)
-            }
-        }
-        oldestIdle.foreach { ci => toBeRemoved.offer(RemoveJob(!ci.isStemCell, ci)) }
-    }
-
-    // Getter/setter for this are above.
-    private var _logDir = "/logs"
-    private val actionContainerPrefix = s"wsk${invokerInstance.toInt}_"
-
-    /**
-     * Actually deletes the containers.
-     */
-    private def teardownContainer(removeJob: RemoveJob)(implicit transid: TransactionId) = try {
-        val container = removeJob.containerInfo.container
-        if (saveContainerLog) {
-            val size = this.getLogSize(container, mounted)
-            val rawLogBytes = container.synchronized {
-                this.getDockerLogContent(container.containerId, 0, size, mounted)
-            }
-            val filename = s"${_logDir}/${container.nameAsString}.log"
-            Files.write(Paths.get(filename), rawLogBytes)
-            logging.info(this, s"teardownContainers: wrote docker logs to $filename")
-        }
-        container.transid = transid
-        runDockerOp { container.remove(removeJob.needUnpause) }
-    } catch {
-        case t: Throwable => logging.warn(this, s"teardownContainer encountered an exception: ${t.getMessage}")
-    }
-
-    /**
-     * Removes all containers with the actionContainerPrefix to kill leftover action containers.
-     * This is needed for clean startup and shutdown.
-     * Concurrent access from clients must be prevented externally.
-     */
-    private def killStragglers(allContainers: Seq[ContainerState])(implicit transid: TransactionId) = try {
-        val candidates = allContainers.filter { _.name.name.startsWith(actionContainerPrefix) }
-        logging.info(this, s"killStragglers now removing ${candidates.length} leftover containers")
-        candidates foreach { c =>
-            if (useRunc) {
-                RuncUtils.resume(c.id)
-            } else {
-                unpauseContainer(c.name)
-            }
-            rmContainer(c.name)
-        }
-        logging.info(this, s"killStragglers completed")
-    } catch {
-        case t: Throwable => logging.warn(this, s"killStragglers encountered an exception: ${t.getMessage}")
-    }
-
-    /**
-     * Gets the size of the mounted file associated with this whisk container.
-     */
-    def getLogSize(con: Container, mounted: Boolean)(implicit transid: TransactionId): Long = {
-        getDockerLogSize(con.containerId, mounted)
-    }
-
-    nannyThread(listAll()(TransactionId.invokerWarmup)).start
-    if (mounted) {
-        sys addShutdownHook {
-            logging.warn(this, "Shutdown hook activated.  Starting container shutdown")
-            shutdown()
-            logging.warn(this, "Shutdown hook completed.")
-        }
-    }
-
-}
-
-/*
- * These methods are parameterized on the configuration but defined here as an instance of ContainerPool is not
- * always available from other call sites.
- */
-object ContainerPool {
-    def requiredProperties = wskApiHost ++ Map(
-        selfDockerEndpoint -> "localhost",
-        dockerRegistry -> "",
-        dockerImagePrefix -> "",
-        dockerImageTag -> "latest",
-        invokerContainerNetwork -> "bridge",
-        invokerNumCore -> "4",
-        invokerCoreShare -> "2",
-        invokerSerializeDockerOp -> "true",
-        invokerSerializeDockerPull -> "true",
-        invokerUseRunc -> "false",
-        invokerContainerPolicy -> "",
-        invokerContainerDns -> "",
-        invokerContainerNetwork -> null)
-
-    /*
-     * Extract parameters from whisk config.  In the future, these may not be static but
-     * dynamically updated.  They still serve as a starting point for downstream parameters.
-     */
-    def numCore(config: WhiskConfig) = config.invokerNumCore.toInt
-    def shareFactor(config: WhiskConfig) = config.invokerCoreShare.toInt
-
-    /*
-     * The total number of containers is simply the number of cores dilated by the cpu sharing.
-     */
-    def getDefaultMaxActive(config: WhiskConfig) = numCore(config) * shareFactor(config)
-
-    /* The shareFactor indicates the number of containers that would share a single core, on average.
-     * cpuShare is a docker option (-c) whereby a container's CPU access is limited.
-     * A value of 1024 is the full share so a strict resource division with a shareFactor of 2 would yield 512.
-     * On an idle/underloaded system, a container will still get to use underutilized CPU shares.
-     */
-    private val totalShare = 1024.0 // This is a pre-defined value coming from docker and not our hard-coded value.
-    def cpuShare(config: WhiskConfig) = (totalShare / getDefaultMaxActive(config)).toInt
-
-}
diff --git a/core/invoker/src/main/scala/whisk/core/container/ContainerUtils.scala b/core/invoker/src/main/scala/whisk/core/container/ContainerUtils.scala
deleted file mode 100644
index f00d329..0000000
--- a/core/invoker/src/main/scala/whisk/core/container/ContainerUtils.scala
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.container
-
-import java.io.{ File, FileNotFoundException }
-
-import scala.language.postfixOps
-import scala.util.Try
-
-import akka.event.Logging.ErrorLevel
-import spray.json._
-import spray.json.DefaultJsonProtocol._
-import whisk.common.{ Logging, SimpleExec, TransactionId, LoggingMarkers }
-import whisk.common.Logging
-import whisk.core.entity.ActionLimits
-
-/**
- * Information from docker ps.
- */
-case class ContainerState(id: ContainerHash, image: String, name: ContainerName)
-
-trait ContainerUtils {
-
-    implicit val logging: Logging
-
-    /** Defines the docker host, optional **/
-    val dockerhost: String
-
-    def makeEnvVars(env: Map[String, String]): Array[String] = {
-        env.map {
-            kv => s"-e ${kv._1}=${kv._2}"
-        }.mkString(" ").split(" ").filter { x => x.nonEmpty }
-    }
-
-    /**
-     * Creates a container instance and runs it.
-     *
-     * @param image the docker image to run
-     * @return container id and container host
-     */
-    def bringup(mounted: Boolean,
-                name: Option[ContainerName],
-                image: String,
-                network: String,
-                cpuShare: Int,
-                env: Map[String, String],
-                args: Array[String],
-                limits: ActionLimits,
-                policy: Option[String],
-                dnsServer: Seq[String])(
-                    implicit transid: TransactionId): (ContainerHash, Option[ContainerAddr]) = {
-        val id = makeContainer(name, image, network, cpuShare, env, args, limits, policy, dnsServer)
-        val host = getContainerIpAddr(id, mounted, network)
-        (id, host map { ContainerAddr(_, 8080) })
-    }
-
-    /**
-     * Pulls container images.
-     */
-    def pullImage(image: String)(implicit transid: TransactionId): DockerOutput = ContainerUtils.pullImage(dockerhost, image)
-
-    /*
-     * TODO: The file handle and process limits should be moved to some global limits config.
-     */
-    @throws[ContainerError]
-    def makeContainer(name: Option[ContainerName],
-                      image: String,
-                      network: String,
-                      cpuShare: Int,
-                      env: Map[String, String],
-                      args: Seq[String],
-                      limits: ActionLimits,
-                      policy: Option[String],
-                      dnsServers: Seq[String])(
-                          implicit transid: TransactionId): ContainerHash = {
-        val nameOption = name.map(n => Array("--name", n.name)).getOrElse(Array.empty[String])
-        val cpuArg = Array("-c", cpuShare.toString)
-        val memoryArg = Array("-m", s"${limits.memory.megabytes}m", "--memory-swap", s"${limits.memory.megabytes}m")
-        val capabilityArg = Array("--cap-drop", "NET_RAW", "--cap-drop", "NET_ADMIN")
-        val fileHandleLimit = Array("--ulimit", "nofile=1024:1024")
-        val processLimit = Array("--pids-limit", "1024")
-        val securityOpts = policy map { p => Array("--security-opt", s"apparmor:${p}") } getOrElse (Array.empty[String])
-        val dnsOpts = dnsServers.map(Seq("--dns", _)).flatten
-        val containerNetwork = Array("--net", network)
-
-        val cmd = Seq("run") ++ makeEnvVars(env) ++ nameOption ++ cpuArg ++ memoryArg ++
-            capabilityArg ++ fileHandleLimit ++ processLimit ++ securityOpts ++ dnsOpts ++ containerNetwork ++ Seq("-d", image) ++ args
-
-        runDockerCmd(cmd: _*).toOption.map { result =>
-            ContainerHash.fromString(result)
-        } getOrElse {
-            throw new ContainerError("Failed to start container.")
-        }
-    }
-
-    def killContainer(container: ContainerIdentifier)(implicit transid: TransactionId): DockerOutput = {
-        runDockerCmd("kill", container.id)
-    }
-
-    def getContainerLogs(container: ContainerIdentifier)(implicit transid: TransactionId): DockerOutput = {
-        runDockerCmd("logs", container.id)
-    }
-
-    def pauseContainer(container: ContainerIdentifier)(implicit transid: TransactionId): DockerOutput = {
-        runDockerCmd("pause", container.id)
-    }
-
-    def unpauseContainer(container: ContainerIdentifier)(implicit transid: TransactionId): DockerOutput = {
-        runDockerCmd("unpause", container.id)
-    }
-
-    /**
-     * Forcefully removes a container, can be used on a running container but not a paused one.
-     */
-    def rmContainer(container: ContainerIdentifier)(implicit transid: TransactionId): DockerOutput = {
-        runDockerCmd("rm", "-f", container.id)
-    }
-
-    /*
-     * List containers (-a if all).
-     */
-    def listContainers(all: Boolean)(implicit transid: TransactionId): Seq[ContainerState] = {
-        val tmp = Array("ps", "--no-trunc")
-        val cmd = if (all) tmp :+ "-a" else tmp
-        runDockerCmd(cmd: _*).toOption map { output =>
-            val lines = output.split("\n").drop(1).toSeq // skip the header
-            lines.map(parsePsOutput)
-        } getOrElse Seq()
-    }
-
-    def getDockerLogSize(containerId: ContainerHash, mounted: Boolean)(implicit transid: TransactionId): Long = {
-        try {
-            getDockerLogFile(containerId, mounted).length
-        } catch {
-            case e: Exception =>
-                logging.error(this, s"getDockerLogSize failed on ${containerId.id}")
-                0
-        }
-    }
-
-    /**
-     * Reads the contents of the file at the given position.
-     * It is assumed that the contents does exist and that region is not changing concurrently.
-     */
-    def getDockerLogContent(containerHash: ContainerHash, start: Long, end: Long, mounted: Boolean)(implicit transid: TransactionId): Array[Byte] = {
-        var fis: java.io.FileInputStream = null
-        try {
-            val file = getDockerLogFile(containerHash, mounted)
-            fis = new java.io.FileInputStream(file)
-            val channel = fis.getChannel().position(start)
-            var remain = (end - start).toInt
-            val buffer = java.nio.ByteBuffer.allocate(remain)
-            while (remain > 0) {
-                val read = channel.read(buffer)
-                if (read > 0)
-                    remain = read - read.toInt
-            }
-            buffer.array
-        } catch {
-            case e: Exception =>
-                logging.error(this, s"getDockerLogContent failed on ${containerHash.hash}: ${e.getClass}: ${e.getMessage}")
-                Array()
-        } finally {
-            if (fis != null) fis.close()
-        }
-
-    }
-
-    /**
-     * Obtain IP addr by looking at config file but fall back to inspect when in testing mode (where there is no root access)
-     * or as a general fallback if the config file path fails.
-     */
-    def getContainerIpAddr(container: ContainerHash, mounted: Boolean, network: String)(implicit transid: TransactionId): Option[String] =
-        if (!mounted) {
-            getContainerIpAddrViaInspect(container)
-        } else {
-            getContainerIpAddrViaConfig(container, network).toOption orElse {
-                logging.warn(this, "Failed to obtain IP address of container via config file.  Falling back to inspect.")
-                getContainerIpAddrViaInspect(container)
-            }
-        }
-
-    /**
-     * Obtain container IP address with docker inspect.
-     */
-    def getContainerIpAddrViaInspect(container: ContainerHash)(implicit transid: TransactionId): Option[String] = {
-        val inspectFormat = "'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}'"
-        runDockerCmd("inspect", "--format", inspectFormat, container.id).toOption.map { output =>
-            output.substring(1, output.length - 1)
-        }
-    }
-
-    /**
-     * Obtain container IP address by reading docker config file.
-     */
-    def getContainerIpAddrViaConfig(container: ContainerHash, network: String)(implicit transid: TransactionId): Try[String] =
-        getIpAddr(getDockerConfig(container, true), network)
-
-    private def runDockerCmd(args: String*)(implicit transid: TransactionId): DockerOutput = runDockerCmd(false, args)
-
-    /**
-     * Synchronously runs the given docker command returning stdout if successful.
-     */
-    private def runDockerCmd(skipLogError: Boolean, args: Seq[String])(implicit transid: TransactionId): DockerOutput =
-        ContainerUtils.runDockerCmd(dockerhost, skipLogError, args)(transid, logging)
-
-    /**
-     * Obtain the per container directory Docker maintains for each container.
-     *
-     * If this component is running outside a container, then logs files are in docker's own
-     * /var/lib/docker/containers.  If running inside a container, is mounted at /containers.
-     * Root access is needed when running outside the container.
-     */
-    private def dockerContainerDir(mounted: Boolean, containerId: ContainerHash) = {
-        (if (mounted) "/containers/" else "/var/lib/docker/containers/") + containerId.hash.toString
-    }
-
-    /**
-     * Gets the filename of the docker logs of other containers that is mapped back into the invoker.
-     */
-    private def getDockerLogFile(containerId: ContainerHash, mounted: Boolean) = {
-        new java.io.File(s"""${dockerContainerDir(mounted, containerId)}/${containerId.hash}-json.log""").getCanonicalFile()
-    }
-
-    /**
-     * Return docker config as a JsObject by reading the config.v2.json file for the given container.
-     */
-    private def getDockerConfig(containerId: ContainerHash, mounted: Boolean): JsObject = {
-        val configFile = s"${dockerContainerDir(mounted, containerId)}/config.v2.json"
-        val contents = scala.io.Source.fromFile(configFile).mkString
-        contents.parseJson.asJsObject
-    }
-
-    /**
-     * Extracts the IP addr from the docker config object with projection rather than converting the entire config.
-     */
-    private def getIpAddr(config: JsObject, network: String): Try[String] = Try {
-        val networks = config.fields("NetworkSettings").asJsObject.fields("Networks").asJsObject
-        val userland = networks.fields(network).asJsObject
-        val ipAddr = userland.fields("IPAddress")
-        ipAddr.convertTo[String]
-    }
-
-    private def parsePsOutput(line: String): ContainerState = {
-        val tokens = line.split("\\s+")
-        val hash = ContainerHash.fromString(tokens(0))
-        val name = ContainerName.fromString(tokens.last)
-        ContainerState(hash, tokens(1), name)
-    }
-}
-
-object ContainerUtils {
-
-    /**
-     * Synchronously runs the given docker command returning stdout if successful.
-     */
-    def runDockerCmd(dockerhost: String, skipLogError: Boolean, args: Seq[String])(implicit transid: TransactionId, logging: Logging): DockerOutput = {
-        val start = transid.started(this, LoggingMarkers.INVOKER_DOCKER_CMD(args(0)))
-
-        try {
-            val fullCmd = getDockerCmd(dockerhost) ++ args
-
-            val (stdout, stderr, exitCode) = SimpleExec.syncRunCmd(fullCmd)
-
-            if (exitCode == 0) {
-                transid.finished(this, start)
-                DockerOutput(stdout.trim)
-            } else {
-                if (!skipLogError) {
-                    transid.failed(this, start, s"stdout:\n$stdout\nstderr:\n$stderr", ErrorLevel)
-                } else {
-                    transid.failed(this, start)
-                }
-                DockerOutput.unavailable
-            }
-        } catch {
-            case t: Throwable =>
-                transid.failed(this, start, "error: " + t.getMessage, ErrorLevel)
-                DockerOutput.unavailable
-        }
-    }
-
-    @throws[FileNotFoundException]
-    private def getDockerCmd(dockerhost: String): Seq[String] = {
-        def file(path: String) = Try { new File(path) } filter { _.exists } toOption
-
-        val dockerLoc = file("/usr/bin/docker") orElse file("/usr/local/bin/docker")
-
-        val dockerBin = dockerLoc.map(_.toString).getOrElse {
-            throw new FileNotFoundException("Failed to locate docker binary.")
-        }
-
-        if (dockerhost == "localhost") {
-            Seq(dockerBin)
-        } else {
-            Seq(dockerBin, "--host", s"tcp://$dockerhost")
-        }
-    }
-
-    /**
-     * Pulls container images.
-     */
-    @throws[ContainerError]
-    def pullImage(dockerhost: String, image: String)(implicit transid: TransactionId, logging: Logging): DockerOutput = {
-        val cmd = Array("pull", image)
-        val result = runDockerCmd(dockerhost, false, cmd)
-        if (result != DockerOutput.unavailable) {
-            result
-        } else {
-            throw new ContainerError(s"Failed to pull container image '$image'.")
-        }
-    }
-
-}
diff --git a/core/invoker/src/main/scala/whisk/core/container/RuncUtils.scala b/core/invoker/src/main/scala/whisk/core/container/RuncUtils.scala
deleted file mode 100644
index fe24d3a..0000000
--- a/core/invoker/src/main/scala/whisk/core/container/RuncUtils.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.container
-
-import akka.event.Logging.ErrorLevel
-import whisk.common.{ Logging, LoggingMarkers, SimpleExec, TransactionId }
-
-object RuncUtils {
-
-    def list()(implicit transid: TransactionId, logging: Logging): (Int, String) = {
-        runRuncCmd(false, Seq("list"))
-    }
-
-    def pause(id: ContainerIdentifier)(implicit transid: TransactionId, logging: Logging): (Int, String) = {
-        runRuncCmd(false, Seq("pause", id.toString))
-    }
-
-    def resume(id: ContainerIdentifier)(implicit transid: TransactionId, logging: Logging): (Int, String) = {
-        runRuncCmd(false, Seq("resume", id.toString))
-    }
-
-    /**
-     * Synchronously runs the given runc command returning stdout if successful.
-     */
-    def runRuncCmd(skipLogError: Boolean, args: Seq[String])(implicit transid: TransactionId, logging: Logging): (Int, String) = {
-        val start = transid.started(this, LoggingMarkers.INVOKER_RUNC_CMD(args(0)))
-        try {
-            val fullCmd = getRuncCmd() ++ args
-
-            val (stdout, stderr, exitCode) = SimpleExec.syncRunCmd(fullCmd)
-
-            if (exitCode == 0) {
-                transid.finished(this, start)
-                (exitCode, stdout.trim)
-            } else {
-                if (!skipLogError) {
-                    transid.failed(this, start, s"stdout:\n$stdout\nstderr:\n$stderr", ErrorLevel)
-                } else {
-                    transid.failed(this, start)
-                }
-                (exitCode, (stdout + stderr).trim)
-            }
-        } catch {
-            case t: Throwable =>
-                val errorMsg = "error: " + t.getMessage
-                transid.failed(this, start, errorMsg, ErrorLevel)
-                (-1, errorMsg)
-        }
-    }
-
-    def isSuccessful(result : (Int, String)) : Boolean =
-        result match {
-            case (0, _) => true
-            case _ => false
-        }
-
-    /*
-     *  Any global flags are added here.
-     */
-    private def getRuncCmd(): Seq[String] = {
-        val runcBin = "/usr/bin/docker-runc"
-        Seq(runcBin)
-    }
-
-}
diff --git a/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala b/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala
deleted file mode 100644
index 610726f..0000000
--- a/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.container
-
-import java.time.Clock
-import java.time.Instant
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
-import akka.actor.ActorSystem
-import spray.json._
-import whisk.common.Logging
-import whisk.common.LoggingMarkers
-import whisk.common.TransactionId
-import whisk.core.connector.ActivationMessage
-import whisk.core.entitlement.Privilege
-import whisk.core.entity._
-import whisk.core.entity.ActionLimits
-import whisk.core.entity.ActivationResponse._
-
-/**
- * Reifies a whisk container - one that respects the whisk container API.
- */
-class WhiskContainer(
-    originalId: TransactionId,
-    useRunc: Boolean,
-    dockerhost: String,
-    mounted: Boolean,
-    key: ActionContainerId,
-    containerName: ContainerName,
-    image: String,
-    network: String,
-    cpuShare: Int,
-    policy: Option[String],
-    dnsServers: Seq[String],
-    env: Map[String, String],
-    limits: ActionLimits,
-    args: Array[String] = Array())(
-        override implicit val logging: Logging)
-    extends Container(originalId, useRunc, dockerhost, mounted, key, Some(containerName), image, network, cpuShare, policy, dnsServers, limits, env, args) {
-
-    var lastLogSize = 0L
-
-    /** HTTP connection to container. Initialized on /init. */
-    private var connection: Option[HttpUtils] = None
-
-    /**
-     * Sends initialization payload to container.
-     */
-    def init(args: JsObject, timeout: FiniteDuration)(implicit system: ActorSystem, transid: TransactionId): RunResult = {
-        val startMarker = transid.started("Invoker", LoggingMarkers.INVOKER_ACTIVATION_INIT, s"sending initialization to ${this.details}")
-        // when invoking /init, don't wait longer than the timeout configured for this action
-        val result = sendPayload("/init", JsObject("value" -> args), timeout, retry = true)
-        val RunResult(Interval(startActivation, endActivation), _) = result
-        transid.finished("Invoker", startMarker.copy(startActivation), s"initialization result: ${result.toBriefString}", endTime = endActivation)
-        result
-    }
-
-    private def constructActivationMetadata(msg: ActivationMessage, args: JsObject, timeout: FiniteDuration): JsObject = {
-        JsObject(
-            "value" -> args,
-            "api_key" -> msg.user.authkey.compact.toJson,
-            "namespace" -> msg.user.namespace.toJson,
-            "action_name" -> msg.action.qualifiedNameWithLeadingSlash.toJson,
-            "activation_id" -> msg.activationId.toString.toJson,
-            // compute deadline on invoker side avoids discrepancies inside container
-            // but potentially under-estimates actual deadline
-            "deadline" -> (Instant.now(Clock.systemUTC()).toEpochMilli + timeout.toMillis).toString.toJson)
-    }
-
-    /**
-     * Sends a run command to action container to run once.
-     *
-     * @param state the value of the status to compare the actual state against
-     * @return triple of start time, end time, response for user action.
-     */
-    def run(msg: ActivationMessage, args: JsObject, timeout: FiniteDuration)(implicit system: ActorSystem, transid: TransactionId): RunResult = {
-        val startMarker = transid.started("Invoker", LoggingMarkers.INVOKER_ACTIVATION_RUN, s"sending arguments to ${msg.action} $details")
-        val result = sendPayload("/run", constructActivationMetadata(msg, args, timeout), timeout, retry = false)
-        // Use start and end time of the activation
-        val RunResult(Interval(startActivation, endActivation), _) = result
-        transid.finished("Invoker", startMarker.copy(startActivation), s"running result: ${result.toBriefString}", endTime = endActivation)
-        result
-    }
-
-    /**
-     * An alternative entry point for direct testing of action container.
-     */
-    def run(payload: String, activationId: String)(implicit system: ActorSystem): RunResult = {
-        val params = JsObject("payload" -> JsString(payload))
-        val meta = JsObject("activationId" -> JsString(activationId))
-        val msg = ActivationMessage(
-            TransactionId.testing,
-            FullyQualifiedEntityName(EntityPath("no_namespace"), EntityName("no_action")),
-            DocRevision.empty,
-            Identity(Subject(), EntityName("no_namespace"), AuthKey(), Privilege.ALL),
-            ActivationId(),
-            EntityPath("no_namespace"),
-            InstanceId(0),
-            None)
-        run(msg, params, 30000.milliseconds)(system, TransactionId.testing)
-    }
-
-    /**
-     * Tear down the container and retrieve the logs.
-     */
-    def teardown()(implicit transid: TransactionId): String = {
-        connection.foreach(_.close())
-        getContainerLogs(containerName).toOption.getOrElse("none")
-    }
-
-    /**
-     * Posts a message to the container.
-     *
-     * @param msg the message to post
-     * @param retry whether or not to retry on connection failure
-     * @return response from container if any as array of byte
-     */
-    private def sendPayload(endpoint: String, msg: JsObject, timeout: FiniteDuration, retry: Boolean)(implicit system: ActorSystem): RunResult = {
-        sendPayloadApache(endpoint, msg, timeout, retry)
-    }
-
-    private def sendPayloadApache(endpoint: String, msg: JsObject, timeout: FiniteDuration, retry: Boolean): RunResult = {
-        val start = ContainerCounter.now()
-
-        val result = for {
-            hp <- containerHostAndPort
-            c <- connection orElse {
-                val hostWithPort = s"${hp.host}:${hp.port}"
-                connection = Some(new HttpUtils(hostWithPort, timeout, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT))
-                connection
-            }
-        } yield {
-            c.post(endpoint, msg, retry)
-        }
-
-        val end = ContainerCounter.now()
-        RunResult(Interval(start, end), result getOrElse Left(NoHost()))
-    }
-}
-
-/**
- * Singleton to thread-safely count containers.
- */
-protected[container] object ContainerCounter {
-    private val cnt = new AtomicInteger(0)
-    private def next(): Int = {
-        cnt.incrementAndGet()
-    }
-    private def cut(): Int = {
-        cnt.get()
-    }
-
-    def now() = Instant.now(Clock.systemUTC())
-
-    def containerName(containerPrefix: String, containerSuffix: String): ContainerName = {
-        val name = s"wsk${containerPrefix}_${ContainerCounter.next()}_${containerSuffix}_${now()}".replaceAll("[^a-zA-Z0-9_]", "")
-        ContainerName.fromString(name)
-    }
-}
diff --git a/core/invoker/src/main/scala/whisk/core/container/package.scala b/core/invoker/src/main/scala/whisk/core/container/package.scala
deleted file mode 100644
index c8fede6..0000000
--- a/core/invoker/src/main/scala/whisk/core/container/package.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core
-
-import java.time.Instant
-
-import scala.concurrent.duration._
-
-import whisk.core.entity.ActivationResponse._
-import whisk.core.entity.DocRevision
-import whisk.core.entity.UUID
-
-/**
- * This object contains type definitions that are useful when observing and timing container operations.
- */
-package object container {
-
-    /**
-     * Identifies a combination of owner+action+version (except special cases)
-     */
-    class ActionContainerId(val stringRepr: String) extends AnyVal
-
-    object ActionContainerId {
-        // Convenience "constructor" since this is the most common case.
-        def apply(uuid: UUID, actionFullyQualifiedName: String, actionRevision: DocRevision) =
-            new ActionContainerId(s"instantiated.${uuid}.${actionFullyQualifiedName}.${actionRevision}")
-    }
-
-    /**
-     * Special case for stem cell containers
-     */
-    val StemCellNodeJsActionContainerId = new ActionContainerId("stemcell.nodejs")
-
-    /**
-     * Represents a time interval, which can be viewed as a duration for which
-     *  the start/end instants are fully known (as opposed to being relative).
-     */
-    case class Interval(start: Instant, end: Instant) {
-        def duration = Duration.create(end.toEpochMilli() - start.toEpochMilli(), MILLISECONDS)
-    }
-
-    object Interval {
-        /** An interval starting now with zero duration. */
-        def zero = {
-            val now = Instant.now
-            Interval(now, now)
-        }
-    }
-
-    /**
-     * Represents the result of accessing an endpoint in a container:
-     * Start time, End time, Some(response) from container consisting of status code and payload
-     * If there is no response or an exception, then None.
-     */
-    case class RunResult(interval: Interval, response: Either[ContainerConnectionError, ContainerResponse]) {
-        def duration = interval.duration
-        def ok = response.right.exists(_.ok)
-        def errored = !ok
-        def toBriefString = response.fold(_.toString, _.toString)
-    }
-
-    /**
-     * The result of trying to obtain a container that has already run this user+action in the past.
-     */
-    sealed trait CacheResult
-
-    case object CacheMiss extends CacheResult
-    case object CacheBusy extends CacheResult
-    case class CacheHit(con: WhiskContainer) extends CacheResult
-
-    /**
-     * The result of trying to obtain a container which is known to exist or to create one.
-     * Capacity constraints have been passed by this point so there are no Busy's.
-     * Initiailization is performed later so no field for initResult here.
-     */
-    sealed trait ContainerResult
-
-    case class Warm(con: WhiskContainer) extends ContainerResult
-    case class Cold(con: WhiskContainer) extends ContainerResult
-
-    // Note: not using InetAddress here because we don't want to do any lookup
-    // until used for something.
-    case class ContainerAddr(host: String, port: Int) {
-        override def toString() = s"$host:$port"
-    }
-
-    sealed abstract class ContainerIdentifier(val id: String) {
-        override def toString = id
-    }
-    class ContainerName(val name: String) extends ContainerIdentifier(name) {
-        override def toString = id
-    }
-    class ContainerHash(val hash: String) extends ContainerIdentifier(hash) {
-        override def toString = id
-    }
-
-    object ContainerIdentifier {
-        def fromString(str: String): ContainerIdentifier = {
-            val s = str.trim
-            require(!s.contains("\n"))
-            if (s.matches("^[0-9a-fA-F]+$")) {
-                new ContainerHash(s)
-            } else {
-                new ContainerName(s)
-            }
-        }
-    }
-
-    object ContainerName {
-        def fromString(str: String) = new ContainerName(str)
-    }
-
-    object ContainerHash {
-        def fromString(str: String) = {
-            require(str.matches("^[0-9a-fA-F]+$"))
-            new ContainerHash(str)
-        }
-    }
-
-    final class DockerOutput(val toOption: Option[String]) extends AnyVal
-    object DockerOutput {
-        def apply(content: String) = new DockerOutput(Some(content))
-        def unavailable = new DockerOutput(None)
-
-        def isSuccessful(output: DockerOutput): Boolean =
-            output match {
-                case output if output == DockerOutput.unavailable => false
-                case _ => true
-            }
-    }
-
-    sealed class ContainerError(val msg: String) extends Throwable(msg)
-    case class WhiskContainerError(override val msg: String) extends ContainerError(msg)
-    case class BlackBoxContainerError(override val msg: String) extends ContainerError(msg)
-}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/Container.scala b/core/invoker/src/main/scala/whisk/core/containerpool/Container.scala
index 2dc33e9..13d0306 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/Container.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/Container.scala
@@ -22,9 +22,10 @@ import scala.concurrent.duration.FiniteDuration
 
 import spray.json.JsObject
 import whisk.common.TransactionId
-import whisk.core.container.Interval
 import whisk.core.entity.ActivationResponse
 import whisk.core.entity.ByteSize
+import java.time.Instant
+import scala.concurrent.duration._
 
 /**
  * An OpenWhisk biased container abstraction. This is **not only** an abstraction
@@ -65,3 +66,15 @@ case class BlackboxStartupError(msg: String) extends ContainerStartupError(msg)
 
 /** Indicates an error while initializing a container */
 case class InitializationError(interval: Interval, response: ActivationResponse) extends Exception(response.toString)
+
+case class Interval(start: Instant, end: Instant) {
+    def duration = Duration.create(end.toEpochMilli() - start.toEpochMilli(), MILLISECONDS)
+}
+
+object Interval {
+    /** An interval starting now with zero duration. */
+    def zero = {
+        val now = Instant.now
+        Interval(now, now)
+    }
+}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index 7e0f698..1c3c4f1 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -33,7 +33,6 @@ import spray.json._
 import spray.json.DefaultJsonProtocol._
 import whisk.common.TransactionId
 import whisk.core.connector.ActivationMessage
-import whisk.core.container.Interval
 import whisk.core.entity._
 import whisk.core.entity.size._
 import whisk.common.Counter
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/ActionLogDriver.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerActionLogDriver.scala
similarity index 95%
rename from core/invoker/src/main/scala/whisk/core/invoker/ActionLogDriver.scala
rename to core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerActionLogDriver.scala
index eebe1fc..68469e4 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/ActionLogDriver.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerActionLogDriver.scala
@@ -15,13 +15,10 @@
  * limitations under the License.
  */
 
-package whisk.core.invoker
+package whisk.core.containerpool.docker
 
 import java.nio.charset.StandardCharsets
-
-import scala.Vector
 import scala.util.{ Failure, Success, Try }
-
 import spray.json._
 import spray.json.DefaultJsonProtocol
 import whisk.common.{ Logging, TransactionId }
@@ -45,12 +42,12 @@ protected[core] object LogLine extends DefaultJsonProtocol {
     implicit val serdes = jsonFormat3(LogLine.apply)
 }
 
-protected[core] object ActionLogDriver {
+protected[core] object DockerActionLogDriver {
     // The action proxies inserts this line in the logs at the end of each activation for stdout/stderr
     val LOG_ACTIVATION_SENTINEL = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
 }
 
-protected[core] trait ActionLogDriver {
+protected[core] trait DockerActionLogDriver {
 
     /**
      * Given the JSON driver's raw output of a docker container, convert it into our own
@@ -80,7 +77,7 @@ protected[core] trait ActionLogDriver {
             Try(lines.next().parseJson.convertTo[LogLine]) match {
                 case Success(t) =>
                     // if sentinels are expected, do not account for their size, otherwise, all bytes are accounted for
-                    if (requireSentinel && t.log.trim != ActionLogDriver.LOG_ACTIVATION_SENTINEL || !requireSentinel) {
+                    if (requireSentinel && t.log.trim != DockerActionLogDriver.LOG_ACTIVATION_SENTINEL || !requireSentinel) {
                         // ignore empty log lines
                         if (t.log.nonEmpty) {
                             bytesSoFar += t.log.sizeInBytes
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index e659937..143c5e9 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -19,21 +19,17 @@ package whisk.core.containerpool.docker
 
 import java.nio.charset.StandardCharsets
 import java.time.Instant
-
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.util.Failure
 import scala.util.Success
-
 import spray.json._
 import spray.json.DefaultJsonProtocol._
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
-import whisk.core.container.HttpUtils
-import whisk.core.container.Interval
-import whisk.core.container.RunResult
+import whisk.core.containerpool.Interval
 import whisk.core.containerpool.BlackboxStartupError
 import whisk.core.containerpool.Container
 import whisk.core.containerpool.InitializationError
@@ -41,8 +37,9 @@ import whisk.core.containerpool.WhiskContainerStartupError
 import whisk.core.entity.ActivationResponse
 import whisk.core.entity.ByteSize
 import whisk.core.entity.size._
-import whisk.core.invoker.ActionLogDriver
 import whisk.http.Messages
+import whisk.core.entity.ActivationResponse.ContainerConnectionError
+import whisk.core.entity.ActivationResponse.ContainerResponse
 
 object DockerContainer {
     /**
@@ -124,7 +121,7 @@ object DockerContainer {
  * @param ip the ip of the container
  */
 class DockerContainer(id: ContainerId, ip: ContainerIp)(
-    implicit docker: DockerApiWithFileAccess, runc: RuncApi, ec: ExecutionContext, logger: Logging) extends Container with ActionLogDriver {
+    implicit docker: DockerApiWithFileAccess, runc: RuncApi, ec: ExecutionContext, logger: Logging) extends Container with DockerActionLogDriver {
 
     /** The last read-position in the log file */
     private var logFileOffset = 0L
@@ -255,3 +252,8 @@ class DockerContainer(id: ContainerId, ip: ContainerIp)(
         }
     }
 }
+
+case class RunResult(interval: Interval, response: Either[ContainerConnectionError, ContainerResponse]) {
+    def ok = response.right.exists(_.ok)
+    def toBriefString = response.fold(_.toString, _.toString)
+}
diff --git a/core/invoker/src/main/scala/whisk/core/container/HttpUtils.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/HttpUtils.scala
similarity index 98%
rename from core/invoker/src/main/scala/whisk/core/container/HttpUtils.scala
rename to core/invoker/src/main/scala/whisk/core/containerpool/docker/HttpUtils.scala
index 1530d69..3ca7861 100644
--- a/core/invoker/src/main/scala/whisk/core/container/HttpUtils.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/HttpUtils.scala
@@ -15,16 +15,14 @@
  * limitations under the License.
  */
 
-package whisk.core.container
+package whisk.core.containerpool.docker
 
 import java.nio.charset.StandardCharsets
-
 import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.duration.DurationInt
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
-
 import org.apache.commons.io.IOUtils
 import org.apache.http.HttpHeaders
 import org.apache.http.client.config.RequestConfig
@@ -34,12 +32,12 @@ import org.apache.http.client.utils.URIBuilder
 import org.apache.http.conn.HttpHostConnectException
 import org.apache.http.entity.StringEntity
 import org.apache.http.impl.client.HttpClientBuilder
-
 import spray.json._
-
 import whisk.core.entity.ActivationResponse._
 import whisk.core.entity.ByteSize
 import whisk.core.entity.size.SizeLong
+import scala.Left
+import scala.Right
 
 /**
  * This HTTP client is used only in the invoker to communicate with the action container.
diff --git a/core/invoker/src/main/scala/whisk/core/dispatcher/Dispatcher.scala b/core/invoker/src/main/scala/whisk/core/dispatcher/Dispatcher.scala
deleted file mode 100644
index fa0e74d..0000000
--- a/core/invoker/src/main/scala/whisk/core/dispatcher/Dispatcher.scala
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.dispatcher
-
-import java.nio.charset.StandardCharsets
-
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.Future
-import scala.concurrent.duration.FiniteDuration
-import scala.util.Failure
-import scala.util.Success
-
-import akka.actor.ActorSystem
-import akka.actor.Props
-import akka.actor.actorRef2Scala
-import whisk.common.Counter
-import whisk.common.Logging
-import whisk.core.connector.ActivationMessage
-import whisk.core.connector.MessageConsumer
-import whisk.core.connector.MessageFeed
-
-/**
- * Creates a dispatcher that pulls messages from the message pub/sub connector.
- * This is currently used by invoker only. It may be removed in the future and
- * its functionality merged directly with the invoker. The current model allows
- * for different message types to be received by more than one consumer in the
- * same process (via handler registration).
- *
- * @param verbosity level for logging
- * @param consumer the consumer providing messages
- * @param pollDuration the long poll duration (max duration to wait for new messages)
- * @param maxPipelineDepth the maximum number of messages allowed in the queued (even >=2)
- * @param actorSystem an actor system to create actor
- */
-@throws[IllegalArgumentException]
-class Dispatcher(
-    consumer: MessageConsumer,
-    pollDuration: FiniteDuration,
-    maxPipelineDepth: Int,
-    actorSystem: ActorSystem)(
-        implicit logging: Logging)
-    extends Registrar {
-
-    // create activation request feed but do not start it, until the invoker is registered
-    val activationFeed = actorSystem.actorOf(Props(new MessageFeed("activation", logging, consumer, maxPipelineDepth, pollDuration, process, autoStart = false)))
-
-    def start() = activationFeed ! MessageFeed.Ready
-    def stop() = consumer.close()
-
-    /**
-     * Consumes activation messages from the bus using a streaming consumer
-     * interface. Each message is a JSON object serialization of ActivationMessage.
-     *
-     * For every message that is received, process it with all attached handlers.
-     * A handler is registered via addHandler and unregistered via removeHandler.
-     * There is typically only one handler.
-     */
-    def process(bytes: Array[Byte]): Future[Unit] = Future {
-        val raw = new String(bytes, StandardCharsets.UTF_8)
-        ActivationMessage.parse(raw) match {
-            case Success(m) =>
-                handlers foreach {
-                    case (name, handler) => handleMessage(handler, m)
-                }
-            case Failure(t) => logging.info(this, errorMsg(raw, t))
-        }
-    }
-
-    private def handleMessage(handler: MessageHandler, msg: ActivationMessage): Unit = {
-        implicit val tid = msg.transid
-
-        Future {
-            val count = counter.next()
-            logging.debug(this, s"activeCount = $count while handling ${handler.name}")
-            handler.onMessage(msg) // returns a future which is flat-mapped via identity to hang onComplete
-        } flatMap (identity) onComplete {
-            case Success(a) => logging.debug(this, s"activeCount = ${counter.prev()} after handling ${handler.name}")
-            case Failure(t) => logging.error(this, s"activeCount = ${counter.prev()} ${errorMsg(handler, t)}")
-        }
-    }
-
-    private def errorMsg(handler: MessageHandler, e: Throwable): String = {
-        s"failed applying handler '${handler.name}': ${errorMsg(e)}"
-    }
-
-    private def errorMsg(msg: String, e: Throwable): String = {
-        s"failed processing message: $msg $e${e.getStackTrace.mkString("", " ", "")}"
-    }
-
-    private def errorMsg(e: Throwable): String = {
-        if (e.isInstanceOf[java.util.concurrent.ExecutionException]) {
-            s"$e${e.getCause.getStackTrace.mkString("", " ", "")}"
-        } else {
-            s"$e${e.getStackTrace.mkString("", " ", "")}"
-        }
-    }
-
-    private val counter = new Counter()
-    private implicit val executionContext = actorSystem.dispatcher
-}
-
-trait Registrar {
-    /**
-     * Adds handler for a message. The handler name must be unique, else
-     * the new handler replaces a previously added one unless this behavior
-     * is overridden by setting replace to false.
-     *
-     * @param handler is the message handler to add override
-     * @param replace indicates whether a new handler should replace an older handler by the same name
-     * @return an option dispatch rule, the previous value of the rule if any
-     */
-    def addHandler(handler: MessageHandler, replace: Boolean): Option[MessageHandler] = {
-        if (handler != null) {
-            if (replace) handlers.put(handler.name, handler)
-            else handlers.putIfAbsent(handler.name, handler)
-        } else None
-    }
-
-    /**
-     * Removes handlers by name if it exists.
-     *
-     * @param name is the name of the handler to remove
-     * @return the handler just removed if any
-     */
-    def removeHandler(name: String): Option[MessageHandler] = {
-        if (name != null && name.trim.nonEmpty)
-            handlers.remove(name)
-        else None
-    }
-
-    /**
-     * Removes handler if it exists.
-     *
-     * @param handler is the message handler to remove
-     * @return the handler just removed if any
-     */
-    def removeHandler(handler: MessageHandler): Option[MessageHandler] = {
-        if (handler != null) {
-            handlers.remove(handler.name)
-        } else None
-    }
-
-    protected val handlers = new TrieMap[String, MessageHandler]
-}
diff --git a/core/invoker/src/main/scala/whisk/core/dispatcher/MessageHandler.scala b/core/invoker/src/main/scala/whisk/core/dispatcher/MessageHandler.scala
deleted file mode 100644
index e5ca487..0000000
--- a/core/invoker/src/main/scala/whisk/core/dispatcher/MessageHandler.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.dispatcher
-
-import scala.concurrent.Future
-import whisk.common.TransactionId
-import whisk.core.connector.{ ActivationMessage => Message }
-
-/**
- * Abstract base class for a handler for a connector (e.g., Kafka) message.
- */
-abstract class MessageHandler(val name: String) {
-
-    /**
-     * Runs handler for a Kafka message. This method is run inside a future.
-     * If the method fails with an exception, the exception completes
-     * the wrapping future within which the method is run.
-     *
-     * @param msg the Message object to process
-     * @param transid the transaction id for the Kafka message
-     * @return Future that executes the handler
-     */
-    def onMessage(msg: Message)(implicit transid: TransactionId): Future[Any]
-}
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 57e7110..b336c27 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -17,431 +17,45 @@
 
 package whisk.core.invoker
 
-import java.nio.charset.StandardCharsets
-import java.time.{Clock, Instant}
-import scala.concurrent.{Await, ExecutionContext, Future}
-import scala.concurrent.Promise
-import scala.concurrent.duration.{Duration, DurationInt}
-import scala.language.postfixOps
-import scala.util.{Failure, Success}
-import scala.util.Try
-import org.apache.kafka.common.errors.RecordTooLargeException
-import akka.actor.{ActorRef, ActorSystem, actorRef2Scala}
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.util.Failure
+
+import akka.actor.ActorSystem
 import akka.japi.Creator
-import spray.json._
-import spray.json.DefaultJsonProtocol._
-import whisk.common.{Counter, Logging, LoggingMarkers, TransactionId}
 import whisk.common.AkkaLogging
 import whisk.common.Scheduler
 import whisk.core.WhiskConfig
-import whisk.core.WhiskConfig.{dockerImagePrefix, dockerRegistry, invokerUseReactivePool, kafkaHost, logsDir, servicePort}
-import whisk.core.connector.{ActivationMessage, CompletionMessage}
-import whisk.core.connector.MessageFeed
-import whisk.core.connector.MessageProducer
+import whisk.core.WhiskConfig._
 import whisk.core.connector.MessagingProvider
 import whisk.core.connector.PingMessage
-import whisk.core.container._
-import whisk.core.dispatcher.{Dispatcher, MessageHandler}
-import whisk.core.entity._
+import whisk.core.entity.ExecManifest
+import whisk.core.entity.InstanceId
+import whisk.core.entity.WhiskActivationStore
+import whisk.core.entity.WhiskEntityStore
 import whisk.http.BasicHttpService
-import whisk.http.Messages
 import whisk.spi.SpiLoader
 import whisk.utils.ExecutionContextFactory
 
-/**
- * A message handler that invokes actions as directed by message on topic "/actions/invoke".
- * The message path must contain a fully qualified action name and an optional revision id.
- *
- * @param config the whisk configuration
- * @param instance the invoker instance number
- * @param runningInContainer if false, invoker is run outside a container -- for testing
- */
-class Invoker(
-    config: WhiskConfig,
-    instance: InstanceId,
-    activationFeed: ActorRef,
-    producer: MessageProducer,
-    runningInContainer: Boolean = true)(implicit actorSystem: ActorSystem, logging: Logging)
-    extends MessageHandler(s"invoker${instance.toInt}")
-    with ActionLogDriver {
-
-    private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
-
-    TransactionId.invoker.mark(this, LoggingMarkers.INVOKER_STARTUP(instance.toInt), s"starting invoker instance ${instance.toInt}")
-
-    /**
-     * This is the handler for the kafka message
-     *
-     * @param msg is the kafka message payload as Json
-     * @param matches contains the regex matches
-     */
-    override def onMessage(msg: ActivationMessage)(implicit transid: TransactionId): Future[DocInfo] = {
-        require(msg != null, "message undefined")
-        require(msg.action.version.isDefined, "action version undefined")
-
-        val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION)
-        val namespace = msg.action.path
-        val name = msg.action.name
-        val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision)
-        val tran = Transaction(msg)
-        val subject = msg.user.subject
-
-        logging.info(this, s"${actionid.id} $subject ${msg.activationId}")
-
-        // the activation must terminate with only one attempt to write an activation record to the datastore
-        // hence when the transaction is fully processed, this method will complete a promise with the datastore
-        // future writing back the activation record and for which there are three cases:
-        // 1. success: there were no exceptions and hence the invoke path operated normally,
-        // 2. error during invocation: an exception occurred while trying to run the action (failed to bring up a container for example),
-        // 3. error fetching action: an exception occurred reading from the db, didn't get to run.
-        // 4. internal error: the controller passed a wrong action to the invoker.
-        val transactionPromise = Promise[DocInfo]
-
-        // caching is enabled since actions have revision id and an updated
-        // action will not hit in the cache due to change in the revision id;
-        // if the doc revision is missing, then bypass cache
-        if (actionid.rev == DocRevision.empty) {
-            logging.error(this, s"revision was not provided for ${actionid.id}")
-        }
-
-        WhiskAction.get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty) onComplete {
-            case Success(action) =>
-                // only Exec instances that are subtypes of CodeExec reach the invoker
-                action.toExecutableWhiskAction match {
-                    case Some(executable) =>
-                        invokeAction(tran, executable) onComplete {
-                            case Success(activation) =>
-                                transactionPromise.completeWith {
-                                    // this completes the successful activation case (1)
-                                    completeTransaction(tran, activation)
-                                }
-
-                            case Failure(t) =>
-                                logging.info(this, s"activation failed")
-                                val failure = disambiguateActivationException(t, executable)
-                                transactionPromise.completeWith {
-                                    // this completes the failed activation case (2)
-                                    completeTransactionWithError(action.docid, action.version, tran, failure.activationResponse, Some(action.limits))
-                                }
-                        }
-                    case None =>
-                        logging.error(this, s"non-primitive action reached the invoker: $action")
-                        val failureResponse = ActivationResponse.whiskError(s"Invalid action.")
-                        transactionPromise.completeWith {
-                            // this completes the wrong action case (4)
-                            completeTransactionWithError(actionid.id, msg.action.version.get, tran, failureResponse, None)
-                        }
-                }
-
-            case Failure(t) =>
-                logging.error(this, s"failed to fetch action from db: ${t.getMessage}")
-                val failureResponse = ActivationResponse.whiskError(s"Failed to fetch action.")
-                transactionPromise.completeWith {
-                    // this completes the failed to fetch case (3)
-                    completeTransactionWithError(actionid.id, msg.action.version.get, tran, failureResponse, None)
-                }
-        }
-
-        transactionPromise.future
-    }
-
-    /*
-     * Creates a whisk activation out of the errorMsg and finish the transaction.
-     * Failing with an error can involve multiple futures but the effecting call is completeTransaction which is guarded.
-     */
-    protected def completeTransactionWithError(name: DocId, version: SemVer, tran: Transaction, response: ActivationResponse, limits: Option[ActionLimits])(
-        implicit transid: TransactionId): Future[DocInfo] = {
-        val msg = tran.msg
-        val interval = computeActivationInterval(tran)
-        val activationResult = makeWhiskActivation(msg, EntityPath(name.id), version, response, interval, limits)
-
-        // send activate ack for failed activations
-        sendActiveAck(tran, activationResult)
-
-        completeTransaction(tran, activationResult)
-    }
-
-    /*
-     * Action that must be taken when an activation completes (with or without error).
-     *
-     * Invariant: Only one call to here succeeds.  Even though the sync block wrap WhiskActivation.put,
-     *            it is only blocking this transaction which is finishing anyway.
-     */
-    protected def completeTransaction(tran: Transaction, activation: WhiskActivation)(
-        implicit transid: TransactionId): Future[DocInfo] = {
-        tran.synchronized {
-            tran.result match {
-                case Some(res) => res
-                case None => {
-                    activationCounter.next() // this is the global invoker counter
-                    // Send a message to the activation feed indicating there is a free resource to handle another activation.
-                    // Since all transaction completions flow through this method and the invariant is that the transaction is
-                    // completed only once, there is only one completion message sent to the feed as a result.
-                    activationFeed ! MessageFeed.Processed
-                    // Since there is no active action taken for completion from the invoker, writing activation record is it.
-                    logging.info(this, "recording the activation result to the data store")
-                    val result = WhiskActivation.put(activationStore, activation) andThen {
-                        case Success(id) => logging.info(this, s"recorded activation")
-                        case Failure(t)  => logging.error(this, s"failed to record activation")
-                    }
-                    tran.result = Some(result)
-                    result
-                }
-            }
-        }
-    }
-
-    /**
-     * Executes the action: gets a container (new or recycled), initializes it if necessary, and runs the action.
-     *
-     * @return WhiskActivation
-     */
-    protected def invokeAction(tran: Transaction, action: ExecutableWhiskAction)(
-        implicit transid: TransactionId): Future[WhiskActivation] = {
-        Future { pool.getAction(action, tran.msg.user.authkey) } map {
-            case (con, initResultOpt) => runAction(tran, action, con, initResultOpt)
-        } map {
-            case (failedInit, con, result) =>
-                // process the result and send active ack message
-                val activationResult = makeActivationResultForSuccess(tran, action, failedInit, result)
-                sendActiveAck(tran, activationResult)
-
-                // after sending active ack, drain logs and return container
-                val contents = getContainerLogs(con, action.exec.asInstanceOf[CodeExec[_]].sentinelledLogs, action.limits.logs)
-
-                Future {
-                    // Force delete the container instead of just pausing it iff the initialization failed or the container
-                    // failed otherwise. An example of a ContainerError is the timeout of an action in which case the
-                    // container is to be removed to prevent leaking of an activation across to new activations.
-                    // Since putting back the container involves pausing, run this in a Future so as not to block transaction
-                    // completion but also return resources promptly.
-                    // Note: using infinite thread pool so using a future here for a long/blocking operation is acceptable.
-                    val deleteContainer = failedInit || result.errored
-                    pool.putBack(con, deleteContainer)
-                }
-
-                activationResult withLogs ActivationLogs(contents)
-        }
-    }
-
-    /**
-     * Runs the action in the container if the initialization succeeded and returns a triple
-     * (initialization failed?, the container, the init result if initialization failed else the run result)
-     */
-    private def runAction(tran: Transaction, action: ExecutableWhiskAction, con: WhiskContainer, initResultOpt: Option[RunResult])(
-        implicit transid: TransactionId): (Boolean, WhiskContainer, RunResult) = {
-        def run() = {
-            val msg = tran.msg
-            val auth = msg.user.authkey
-            val payload = msg.content getOrElse JsObject()
-            val boundParams = action.parameters.toJsObject
-            val params = JsObject(boundParams.fields ++ payload.fields)
-            val timeout = action.limits.timeout.duration
-            con.run(msg, params, timeout)
-        }
-
-        initResultOpt match {
-            // cached container
-            case None => (false, con, run())
-
-            // new container
-            case Some(init @ RunResult(interval, response)) =>
-                tran.initInterval = Some(interval)
-                if (init.ok) {
-                    (false, con, run())
-                } else {
-                    (true, con, initResultOpt.get)
-                }
-        }
-    }
-
-    /**
-     * Creates WhiskActivation for the "run result" (which could be a failed initialization); this
-     * method is only reached if the action actually ran with no invoker exceptions).
-     *
-     * @return WhiskActivation
-     */
-    private def makeActivationResultForSuccess(tran: Transaction, action: ExecutableWhiskAction, failedInit: Boolean, result: RunResult)(
-        implicit transid: TransactionId): WhiskActivation = {
-        if (!failedInit) tran.runInterval = Some(result.interval)
-
-        val msg = tran.msg
-        val activationInterval = computeActivationInterval(tran)
-        val activationResponse = getActivationResponse(activationInterval, action.limits.timeout.duration, result, failedInit)
-        makeWhiskActivation(msg, EntityPath(action.fullyQualifiedName(false).toString), action.version, activationResponse, activationInterval, Some(action.limits))
-    }
-
-    /**
-     * Sends ActiveAck message for a completed activation.
-     * If for some reason posting to the message bus fails, an active ack may not be sent.
-     */
-    private def sendActiveAck(tran: Transaction, activationResult: WhiskActivation)(
-        implicit transid: TransactionId): Unit = {
-
-        def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {
-            val msg = CompletionMessage(transid, res, instance)
-            producer.send(s"completed${tran.msg.rootControllerIndex.toInt}", msg).andThen {
-                case Success(_) =>
-                    logging.info(this, s"posted ${if (recovery) "recovery" else ""} completion of activation ${activationResult.activationId}")
-            }
-        }
-
-        send(Right(activationResult)).onFailure {
-            case t if t.getCause.isInstanceOf[RecordTooLargeException] =>
-                send(Left(activationResult.activationId), recovery = true)
-        }
-    }
-
-    // The nodeJsAction runner inserts this line in the logs at the end
-    // of each activation
-    private val LogRetryCount = 15
-    private val LogRetry = 100 // millis
-
-    /**
-     * Waits for log cursor to advance. This will retry up to tries times
-     * if the cursor has not yet advanced. This will penalize docker actions
-     * that do not log. It is OK for proxied containers because the runtime emits
-     * the END_OF_ACTIVATION_MARKER automatically and that advances the cursor.
-     *
-     * Note: Updates the container's log cursor to indicate consumption of log.
-     * It is possible that log messages form one activation spill over into the
-     * next activation if the marker is not observed but the log limit is reached.
-     */
-    private def getContainerLogs(con: WhiskContainer, sentinelled: Boolean, loglimit: LogLimit, tries: Int = LogRetryCount)(
-        implicit transid: TransactionId): Vector[String] = {
-        val size = pool.getLogSize(con, runningInContainer)
-        val advanced = size != con.lastLogSize
-        if (tries <= 0 || advanced) {
-            val rawLogBytes = con.synchronized {
-                pool.getDockerLogContent(con.containerId, con.lastLogSize, size, runningInContainer)
-            }
-
-            val rawLog = new String(rawLogBytes, StandardCharsets.UTF_8)
-
-            val (complete, isTruncated, logs) = processJsonDriverLogContents(rawLog, sentinelled, loglimit.asMegaBytes)
-
-            if (tries > 0 && !complete && !isTruncated) {
-                logging.info(this, s"log cursor advanced but missing sentinel, trying $tries more times")
-                Thread.sleep(LogRetry)
-                // note this is not an incremental read - will re-process the entire log file
-                getContainerLogs(con, sentinelled, loglimit, tries - 1)
-            } else {
-                con.lastLogSize = size
-                logs
-            }
-        } else {
-            logging.info(this, s"log cursor has not advanced, trying $tries more times")
-            Thread.sleep(LogRetry)
-            getContainerLogs(con, sentinelled, loglimit, tries - 1)
-        }
-    }
-
-    // -------------------------------------------------------------------------------------------------------------
-
-    /**
-     * Interprets the responses from the container and maps it to an appropriate ActivationResponse.
-     * Note: it is possible for result.response to be None if the container timed out.
-     */
-    private def getActivationResponse(
-        interval: Interval,
-        timeout: Duration,
-        runResult: RunResult,
-        failedInit: Boolean)(
-            implicit transid: TransactionId): ActivationResponse = {
-        if (interval.duration >= timeout) {
-            ActivationResponse.applicationError(Messages.timedoutActivation(timeout, failedInit))
-        } else if (!failedInit) {
-            ActivationResponse.processRunResponseContent(runResult.response, logging)
-        } else {
-            ActivationResponse.processInitResponseContent(runResult.response, logging)
-        }
-    }
-
-    /**
-     * Creates a WhiskActivation for the given action, response and duration.
-     */
-    private def makeWhiskActivation(
-        msg: ActivationMessage,
-        actionName: EntityPath,
-        actionVersion: SemVer,
-        activationResponse: ActivationResponse,
-        interval: Interval,
-        limits: Option[ActionLimits]) = {
-        val causedBy = if (msg.causedBySequence) Parameters("causedBy", "sequence".toJson) else Parameters()
-        WhiskActivation(
-            namespace = msg.activationNamespace,
-            name = actionName.last,
-            version = actionVersion,
-            publish = false,
-            subject = msg.user.subject,
-            activationId = msg.activationId,
-            cause = msg.cause,
-            start = interval.start,
-            end = interval.end,
-            response = activationResponse,
-            logs = ActivationLogs(),
-            annotations = {
-                limits.map(l => Parameters("limits", l.toJson)).getOrElse(Parameters()) ++
-                    Parameters("path", actionName.toJson) ++ causedBy
-            },
-            duration = Some(interval.duration.toMillis))
-
-    }
-
-    /**
-     * Reconstructs an interval based on the time spent in the various operations.
-     * The goal is for the interval to have a duration corresponding to the sum of all durations
-     * and an endtime corresponding to the latest endtime.
-     *
-     * @param transaction the transaction object containing metadata
-     * @return interval for the transaction with start/end times computed
-     */
-    private def computeActivationInterval(transaction: Transaction): Interval = {
-        (transaction.initInterval, transaction.runInterval) match {
-            case (None, Some(run))  => run
-            case (Some(init), None) => init
-            case (None, None)       => Interval(Instant.now(Clock.systemUTC()), Instant.now(Clock.systemUTC()))
-            case (Some(init), Some(Interval(runStart, runEnd))) =>
-                Interval(runStart.minusMillis(init.duration.toMillis), runEnd)
-        }
-    }
-
-    /**
-     * Rewrites exceptions during invocation into new exceptions.
-     */
-    private def disambiguateActivationException(t: Throwable, action: ExecutableWhiskAction)(
-        implicit transid: TransactionId): ActivationException = {
-        t match {
-            // in case of container pull/run operations that fail to execute, assign an appropriate error response
-            case BlackBoxContainerError(msg) => ActivationException(msg, internalError = false)
-            case WhiskContainerError(msg)    => ActivationException(msg)
-            case _ =>
-                logging.error(this, s"failed during invoke: $t")
-                ActivationException(s"Failed to run action '${action.docid}': ${t.getMessage}")
-        }
-    }
-
-    private val entityStore = WhiskEntityStore.datastore(config)
-    private val activationStore = WhiskActivationStore.datastore(config)
-    private val pool = new ContainerPool(config, instance)
-    private val activationCounter = new Counter() // global activation counter
-}
-
 object Invoker {
     /**
      * An object which records the environment variables required for this component to run.
      */
     def requiredProperties = Map(
         servicePort -> 8080.toString(),
-        logsDir -> null,
         dockerRegistry -> null,
-        dockerImagePrefix -> null,
-        invokerUseReactivePool -> false.toString) ++
+        dockerImagePrefix -> null) ++
         ExecManifest.requiredProperties ++
         WhiskEntityStore.requiredProperties ++
         WhiskActivationStore.requiredProperties ++
-        ContainerPool.requiredProperties ++
-        kafkaHost
+        kafkaHost ++
+        wskApiHost ++ Map(
+            dockerImageTag -> "latest",
+            invokerNumCore -> "4",
+            invokerCoreShare -> "2",
+            invokerContainerPolicy -> "",
+            invokerContainerDns -> "",
+            invokerContainerNetwork -> null)
 
     def main(args: Array[String]): Unit = {
         require(args.length == 1, "invoker instance required")
@@ -473,22 +87,9 @@ object Invoker {
             abort()
         }
 
-        val topic = s"invoker${invokerInstance.toInt}"
-        val maxdepth = ContainerPool.getDefaultMaxActive(config)
         val msgProvider = SpiLoader.get[MessagingProvider]()
-        val consumer = msgProvider.getConsumer(config, "invokers", topic, maxdepth, maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
         val producer = msgProvider.getProducer(config, ec)
-        val dispatcher = new Dispatcher(consumer, 500 milliseconds, maxdepth, actorSystem)
-
-        val invoker = if (Try(config.invokerUseReactivePool.toBoolean).getOrElse(false)) {
-            new InvokerReactive(config, invokerInstance, dispatcher.activationFeed, producer)
-        } else {
-            new Invoker(config, invokerInstance, dispatcher.activationFeed, producer)
-        }
-        logger.info(this, s"using $invoker")
-
-        dispatcher.addHandler(invoker, true)
-        dispatcher.start()
+        val invoker = new InvokerReactive(config, invokerInstance, producer)
 
         Scheduler.scheduleWaitAtMost(1.seconds)(() => {
             producer.send("health", PingMessage(invokerInstance)).andThen {
@@ -503,24 +104,3 @@ object Invoker {
         })
     }
 }
-
-/**
- * Tracks the state of the transaction by wrapping the Message object.
- * Note that var fields cannot be added to Message as it leads to serialization issues and
- * doesn't make sense to mix local mutable state with the value being passed around.
- *
- * See completeTransaction for why complete is needed.
- */
-private case class Transaction(msg: ActivationMessage) {
-    var result: Option[Future[DocInfo]] = None
-    var initInterval: Option[Interval] = None
-    var runInterval: Option[Interval] = None
-}
-
-private case class ActivationException(msg: String, internalError: Boolean = true) extends Throwable {
-    val activationResponse = if (internalError) {
-        ActivationResponse.whiskError(msg)
-    } else {
-        ActivationResponse.applicationError(msg)
-    }
-}
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 3203594..1f0d760 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -17,6 +17,9 @@
 
 package whisk.core.invoker
 
+import java.nio.charset.StandardCharsets
+import java.time.Instant
+
 import scala.concurrent.Await
 import scala.concurrent.Future
 import scala.concurrent.duration._
@@ -25,9 +28,9 @@ import scala.util.Success
 
 import org.apache.kafka.common.errors.RecordTooLargeException
 
-import akka.actor.ActorRef
 import akka.actor.ActorRefFactory
 import akka.actor.ActorSystem
+import akka.actor.Props
 import spray.json._
 import spray.json.DefaultJsonProtocol._
 import whisk.common.Logging
@@ -38,8 +41,7 @@ import whisk.core.connector.ActivationMessage
 import whisk.core.connector.CompletionMessage
 import whisk.core.connector.MessageFeed
 import whisk.core.connector.MessageProducer
-import whisk.core.container.{ ContainerPool => OldContainerPool }
-import whisk.core.container.Interval
+import whisk.core.connector.MessagingProvider
 import whisk.core.containerpool.ContainerPool
 import whisk.core.containerpool.ContainerProxy
 import whisk.core.containerpool.PrewarmingConfig
@@ -48,25 +50,32 @@ import whisk.core.containerpool.docker.DockerClientWithFileAccess
 import whisk.core.containerpool.docker.DockerContainer
 import whisk.core.containerpool.docker.RuncClient
 import whisk.core.database.NoDocumentException
-import whisk.core.dispatcher.MessageHandler
 import whisk.core.entity._
 import whisk.core.entity.ExecManifest.ImageName
 import whisk.core.entity.size._
 import whisk.http.Messages
+import whisk.spi.SpiLoader
 
-
-class InvokerReactive(
-    config: WhiskConfig,
-    instance: InstanceId,
-    activationFeed: ActorRef,
-    producer: MessageProducer)(implicit actorSystem: ActorSystem, logging: Logging)
-    extends MessageHandler(s"invoker${instance.toInt}") {
+class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: MessageProducer)(implicit actorSystem: ActorSystem, logging: Logging) {
 
     implicit val ec = actorSystem.dispatcher
 
+    /** Initialize needed databases */
     private val entityStore = WhiskEntityStore.datastore(config)
     private val activationStore = WhiskActivationStore.datastore(config)
 
+    /** Initialize message consumers */
+    val topic = s"invoker${instance.toInt}"
+    val maximumContainers = config.invokerNumCore.toInt * config.invokerCoreShare.toInt
+    val msgProvider = SpiLoader.get[MessagingProvider]()
+    val consumer = msgProvider.getConsumer(config, "invokers", topic, maximumContainers, maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+    val activationFeed = actorSystem.actorOf(Props {
+        new MessageFeed("activation", logging,
+            consumer, maximumContainers, 500.milliseconds, processActivationMessage)
+    })
+
+    /** Initialize container clients */
     implicit val docker = new DockerClientWithFileAccess()(ec)
     implicit val runc = new RuncClient(ec)
 
@@ -102,7 +111,7 @@ class InvokerReactive(
             image = image,
             userProvidedImage = userProvidedImage,
             memory = memory,
-            cpuShares = OldContainerPool.cpuShare(config),
+            cpuShares = config.invokerCoreShare.toInt,
             environment = Map("__OW_API_HOST" -> config.wskApiHost),
             network = config.invokerContainerNetwork,
             dnsServers = config.invokerContainerDns,
@@ -147,70 +156,82 @@ class InvokerReactive(
 
     val pool = actorSystem.actorOf(ContainerPool.props(
         childFactory,
-        OldContainerPool.getDefaultMaxActive(config),
-        OldContainerPool.getDefaultMaxActive(config),
+        maximumContainers,
+        maximumContainers,
         activationFeed,
         Some(PrewarmingConfig(2, prewarmExec, 256.MB))))
 
     /** Is called when an ActivationMessage is read from Kafka */
-    override def onMessage(msg: ActivationMessage)(implicit transid: TransactionId): Future[Unit] = {
-        require(msg != null, "message undefined")
-        require(msg.action.version.isDefined, "action version undefined")
-
-        val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION)
-        val namespace = msg.action.path
-        val name = msg.action.name
-        val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision)
-        val tran = Transaction(msg)
-        val subject = msg.user.subject
-
-        logging.info(this, s"${actionid.id} $subject ${msg.activationId}")
-
-        // caching is enabled since actions have revision id and an updated
-        // action will not hit in the cache due to change in the revision id;
-        // if the doc revision is missing, then bypass cache
-        if (actionid.rev == DocRevision.empty) {
-            logging.warn(this, s"revision was not provided for ${actionid.id}")
-        }
-        WhiskAction.get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty).flatMap { action =>
-            action.toExecutableWhiskAction match {
-                case Some(executable) =>
-                    pool ! Run(executable, msg)
+    def processActivationMessage(bytes: Array[Byte]): Future[Unit] = {
+        Future(ActivationMessage.parse(new String(bytes, StandardCharsets.UTF_8)))
+            .flatMap(Future.fromTry(_))
+            .filter(_.action.version.isDefined)
+            .flatMap { msg =>
+                implicit val transid = msg.transid
+
+                val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION)
+                val namespace = msg.action.path
+                val name = msg.action.name
+                val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision)
+                val subject = msg.user.subject
+
+                logging.info(this, s"${actionid.id} $subject ${msg.activationId}")
+
+                // caching is enabled since actions have revision id and an updated
+                // action will not hit in the cache due to change in the revision id;
+                // if the doc revision is missing, then bypass cache
+                if (actionid.rev == DocRevision.empty) {
+                    logging.warn(this, s"revision was not provided for ${actionid.id}")
+                }
+
+                WhiskAction.get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty).flatMap { action =>
+                    action.toExecutableWhiskAction match {
+                        case Some(executable) =>
+                            pool ! Run(executable, msg)
+                            Future.successful(())
+                        case None =>
+                            logging.error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}")
+                            Future.failed(new IllegalStateException("non-executable action reached the invoker"))
+                    }
+                }.recoverWith {
+                    case t =>
+                        // If the action cannot be found, the user has concurrently deleted it,
+                        // making this an application error. All other errors are considered system
+                        // errors and should cause the invoker to be considered unhealthy.
+                        val response = t match {
+                            case _: NoDocumentException => ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking)
+                            case _                      => ActivationResponse.whiskError(Messages.actionRemovedWhileInvoking)
+                        }
+                        val now = Instant.now
+                        val causedBy = if (msg.causedBySequence) Parameters("causedBy", "sequence".toJson) else Parameters()
+                        val activation = WhiskActivation(
+                            activationId = msg.activationId,
+                            namespace = msg.activationNamespace,
+                            subject = msg.user.subject,
+                            cause = msg.cause,
+                            name = msg.action.name,
+                            version = msg.action.version.getOrElse(SemVer()),
+                            start = now,
+                            end = now,
+                            duration = Some(0),
+                            response = response,
+                            annotations = {
+                                Parameters("path", msg.action.toString.toJson) ++ causedBy
+                            })
+
+                        activationFeed ! MessageFeed.Processed
+                        ack(msg.transid, activation, msg.rootControllerIndex)
+                        store(msg.transid, activation)
+                        Future.successful(())
+                }
+            }.recoverWith {
+                case t =>
+                    // Iff everything above failed, we have a terminal error at hand. Either the message failed
+                    // to deserialize, or something threw an error where it is not expected to throw.
+                    activationFeed ! MessageFeed.Processed
+                    logging.error(this, s"terminal failure while processing message: $t")
                     Future.successful(())
-                case None =>
-                    logging.error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}")
-                    Future.failed(new IllegalStateException())
             }
-        }.recover {
-            case t =>
-                // If the action cannot be found, the user has concurrently deleted it,
-                // making this an application error. All other errors are considered system
-                // errors and should cause the invoker to be considered unhealthy.
-                val response = t match {
-                    case _: NoDocumentException => ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking)
-                    case _                      => ActivationResponse.whiskError(Messages.actionRemovedWhileInvoking)
-                }
-                val interval = Interval.zero
-                val causedBy = if (msg.causedBySequence) Parameters("causedBy", "sequence".toJson) else Parameters()
-                val activation = WhiskActivation(
-                    activationId = msg.activationId,
-                    namespace = msg.activationNamespace,
-                    subject = msg.user.subject,
-                    cause = msg.cause,
-                    name = msg.action.name,
-                    version = msg.action.version.getOrElse(SemVer()),
-                    start = interval.start,
-                    end = interval.end,
-                    duration = Some(interval.duration.toMillis),
-                    response = response,
-                    annotations = {
-                        Parameters("path", msg.action.toString.toJson) ++ causedBy
-                    })
-
-                activationFeed ! MessageFeed.Processed
-                ack(msg.transid, activation, msg.rootControllerIndex)
-                store(msg.transid, activation)
-        }
     }
 
 }
diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala b/tests/src/test/scala/actionContainers/ActionContainer.scala
index e10c792..e9cc939 100644
--- a/tests/src/test/scala/actionContainers/ActionContainer.scala
+++ b/tests/src/test/scala/actionContainers/ActionContainer.scala
@@ -159,7 +159,7 @@ object ActionContainer {
     }
 
     private def syncPost(host: String, port: Int, endPoint: String, content: JsValue): (Int, Option[JsObject]) = {
-        whisk.core.container.HttpUtils.post(host, port, endPoint, content)
+        whisk.core.containerpool.docker.HttpUtils.post(host, port, endPoint, content)
     }
 
     private class ActionContainerImpl() extends ActionContainer {
diff --git a/tests/src/test/scala/whisk/core/container/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/container/test/ContainerPoolTests.scala
deleted file mode 100644
index abd2899..0000000
--- a/tests/src/test/scala/whisk/core/container/test/ContainerPoolTests.scala
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.container.test
-
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.junit.runner.RunWith
-import org.scalatest.BeforeAndAfter
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.FlatSpec
-import org.scalatest.junit.JUnitRunner
-
-import common.StreamLogging
-import common.WskActorSystem
-import whisk.common.TransactionId
-import whisk.core.WhiskConfig
-import whisk.core.WhiskConfig.dockerEndpoint
-import whisk.core.WhiskConfig.edgeHostName
-import whisk.core.WhiskConfig.invokerSerializeDockerOp
-import whisk.core.WhiskConfig.invokerSerializeDockerPull
-import whisk.core.WhiskConfig.selfDockerEndpoint
-import whisk.core.container.Container
-import whisk.core.container.ContainerPool
-import whisk.core.entity.AuthKey
-import whisk.core.entity.EntityName
-import whisk.core.entity.EntityPath
-import whisk.core.entity.InstanceId
-import whisk.core.entity.WhiskAuthStore
-import whisk.core.entity.WhiskEntityStore
-import whisk.core.entity.test.ExecHelpers
-import whisk.utils.retry
-import whisk.core.entity.ExecutableWhiskAction
-import whisk.core.entity.CodeExec
-
-/**
- * Unit tests for ContainerPool and, by association, Container and WhiskContainer.
- */
-@RunWith(classOf[JUnitRunner])
-class ContainerPoolTests extends FlatSpec
-    with BeforeAndAfter
-    with BeforeAndAfterAll
-    with WskActorSystem
-    with ExecHelpers
-    with StreamLogging {
-
-    implicit val transid = TransactionId.testing
-
-    val config = new WhiskConfig(
-        WhiskEntityStore.requiredProperties ++
-            WhiskAuthStore.requiredProperties ++
-            ContainerPool.requiredProperties ++
-            Map(selfDockerEndpoint -> "localhost",
-                dockerEndpoint -> null,
-                edgeHostName -> "localhost",
-                invokerSerializeDockerOp -> "true",
-                invokerSerializeDockerPull -> "true"))
-
-    assert(config.isValid)
-
-    val pool = new ContainerPool(config, InstanceId(0), true, true)
-    pool.logDir = "/tmp"
-
-    val datastore = WhiskEntityStore.datastore(config)
-
-    override def afterAll() {
-        println("Shutting down store connections")
-        datastore.shutdown()
-        super.afterAll()
-    }
-
-    /**
-     * Starts (and returns) a container running ubuntu image running echo on the given test word.
-     * Also checks that the test word shows up in the docker logs.
-     */
-    def getEcho(word: String): Container = {
-        val conOpt = pool.getByImageName("ubuntu", Array("/bin/echo", word))
-        assert(conOpt isDefined) // we must be able to start the container
-        val con = conOpt.getOrElse(null)
-        // the word must be in the docker logs
-        retry(con.getLogs() should include(word), 5, Some(500.milliseconds)) // docker run has no guarantee how far along the process is
-        con
-    }
-
-    /*
-     * Start a new container that stays around via sleep.
-     */
-    def getSleep(duration: Int): Container = {
-        val conOpt = pool.getByImageName("ubuntu", Array("/bin/sleep", duration.toString()))
-        assert(conOpt isDefined) // we must be able to start the container
-        conOpt.getOrElse(null)
-    }
-
-    /*
-     * Ensure pool is empty/clean.
-     */
-    def ensureClean() = {
-        pool.enableGC()
-        pool.forceGC()
-        Thread.sleep(2 * pool.gcFrequency.toMillis + 1500L) // GC should collect this by now
-        assert(pool.idleCount() == 0)
-        assert(pool.activeCount() == 0)
-    }
-
-    /*
-     * Does a container with the given prefix exist?
-     */
-    def poolHasContainerIdPrefix(containerIdPrefix: String) = {
-        val states = pool.listAll()
-        states.find { _.id.hash.contains(containerIdPrefix) }.isDefined
-    }
-
-    behavior of "ContainerPool"
-
-    after {
-        ensureClean()
-    }
-
-    it should "be empty when it starts" in {
-        assert(pool.idleCount() == 0)
-        assert(pool.activeCount() == 0)
-    }
-
-    it should "allow getting container by image name, run it, retrieve logs, return it, force GC, check via docker ps" in {
-        pool.disableGC()
-        val startIdleCount = pool.idleCount()
-        val container = getEcho("abracadabra")
-        val containerIdPrefix = container.containerIdPrefix
-        assert(poolHasContainerIdPrefix(containerIdPrefix)) // container must be around
-        pool.putBack(container) // contractually, user must let go of con at this point
-        assert(pool.idleCount() == startIdleCount + 1)
-        pool.enableGC()
-        pool.forceGC() // force all containers in pool to be freed
-        Thread.sleep(2 * pool.gcFrequency.toMillis + 1500L) // GC should collect this by now
-        assert(!poolHasContainerIdPrefix(containerIdPrefix)) // container must be gone by now
-        assert(pool.idleCount() == 0)
-    }
-
-    it should "respect maxIdle by shooting a container on a putBack that could exceed it" in {
-        ensureClean()
-        pool.maxIdle = 1
-        val c1 = getEcho("quasar")
-        val c2 = getEcho("pulsar")
-        val p1 = c1.containerIdPrefix
-        val p2 = c2.containerIdPrefix
-        assert(pool.activeCount() == 2)
-        assert(pool.idleCount() == 0)
-        pool.putBack(c1)
-        assert(pool.activeCount() == 1)
-        assert(pool.idleCount() == 1)
-        pool.putBack(c2)
-        assert(pool.activeCount() == 0)
-        assert(pool.idleCount() == 1) // because c1 got shot
-        pool.resetMaxIdle()
-    }
-
-    it should "respect activeIdle by blocking a getContainer until another is returned" in {
-        ensureClean()
-        pool.maxActive = 1
-        val c1 = getEcho("hocus")
-        var c1Back = false
-        val f = Future { Thread.sleep(3000); c1Back = true; pool.putBack(c1) }
-        val c2 = getEcho("pocus")
-        assert(c1Back) // make sure c2 is not available before c1 is put back
-        pool.putBack(c2)
-        pool.resetMaxActive()
-    }
-
-    it should "also perform automatic GC with a settable threshold, invoke same action afterwards, another GC" in {
-        ensureClean();
-        pool.gcThreshold = 1.seconds
-        val container = getEcho("hocus pocus")
-        val containerIdPrefix = container.containerIdPrefix
-        assert(poolHasContainerIdPrefix(containerIdPrefix)) // container must be around
-        pool.putBack(container); // contractually, user must let go of con at this point
-        // TODO: replace this with GC count so we don't break abstraction by knowing the GC check freq.  (!= threshold)
-        Thread.sleep(2 * pool.gcFrequency.toMillis + 4000L) // GC should collect this by now
-        assert(!poolHasContainerIdPrefix(containerIdPrefix)) // container must be gone by now
-        // Do it again now
-        val container2 = getEcho("hocus pocus")
-        val containerIdPrefix2 = container2.containerIdPrefix
-        assert(poolHasContainerIdPrefix(containerIdPrefix2)) // container must be around
-        pool.putBack(container2)
-        pool.resetGCThreshold()
-    }
-
-    // Lower it some more by parameterizing GC thresholds
-
-    it should "be able to go through 15 containers without thrashing the system" in {
-        ensureClean()
-        val max = 15
-        for (i <- List.range(0, max)) {
-            val name = "foobar" + i
-            val action = makeHelloAction(name, i)
-            pool.getAction(action, defaultAuth) match {
-                case (con, initResult) => {
-                    val str = "QWERTY" + i.toString()
-                    con.run(str, (20000 + i).toString()) // payload + activationId
-                    if (i == max - 1) {
-                        retry(con.getLogs() should include(str), 5, Some(500.milliseconds))
-                    }
-                    pool.putBack(con)
-                }
-            } // match
-        } // for
-    }
-
-    private val defaultNamespace = EntityPath("container pool test")
-    private val defaultAuth = AuthKey()
-
-    /*
-     * Create an action with the given name that print hello_N payload !
-     * where N is specified.
-     */
-    private def makeHelloAction(name: String, index: Integer): ExecutableWhiskAction = {
-        val code = """console.log('ABCXYZ'); function main(msg) { console.log('hello_${index}', msg.payload+'!');} """
-        ExecutableWhiskAction(defaultNamespace, EntityName(name), jsDefault(code).asInstanceOf[CodeExec[String]])
-    }
-
-    it should "be able to start a nodejs action with init, do a run, return to pool, do another get testing reuse, another run" in {
-        ensureClean()
-        val action = makeHelloAction("foobar", 0)
-        // Make a whisk container and test init and a push
-        val (con, initRes) = pool.getAction(action, defaultAuth)
-        retry(con.getLogs() should include("ABCXYZ"), 5, Some(500.milliseconds))
-        con.run("QWERTY", "55555") // payload + activationId
-        retry(con.getLogs() should include("QWERTY"), 5, Some(500.milliseconds))
-        pool.putBack(con)
-        // Test container reuse
-        val (con2, _) = pool.getAction(action, defaultAuth)
-        assert(con == con2) // check re-use
-        con.run("ASDFGH", "4444") // payload + activationId
-        retry(con.getLogs() should include("ASDFGH"), 5, Some(500.milliseconds))
-        pool.putBack(con)
-    }
-
-    /*
-     * Create an action that will crash the container after succesfully returning
-     */
-    private def makeCrashingAction(name: String, timeToLiveMs: Integer): ExecutableWhiskAction = {
-        val code = s"""function main(msg) { console.log('I expect you to die'); setTimeout(function(){ process.exit(1); }, ${timeToLiveMs}); }"""
-        ExecutableWhiskAction(defaultNamespace, EntityName(name), jsDefault(code))
-    }
-
-    it should "cleanly handle a container that crashes" in {
-        ensureClean()
-        val action = makeCrashingAction("NoMrBond", 50)
-
-        val (con, initRes) = pool.getAction(action, defaultAuth)
-        Thread.sleep(1000)
-
-        con.run("NoMrBond", "1007")
-        retry(con.getLogs() should include("I expect you to die"), 5, Some(500.milliseconds))
-        pool.putBack(con)
-
-        // create a new container for this action. However, since the previous
-        // container crashed, this should not be the same container
-        Thread.sleep(2000)
-        val (con2, _) = pool.getAction(action, defaultAuth)
-        assert(con != con2)
-        pool.putBack(con2)
-    }
-}
diff --git a/tests/src/test/scala/whisk/core/invoker/ActionLogDriverTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/ActionLogDriverTests.scala
similarity index 89%
rename from tests/src/test/scala/whisk/core/invoker/ActionLogDriverTests.scala
rename to tests/src/test/scala/whisk/core/containerpool/docker/test/ActionLogDriverTests.scala
index e0e9ea1..4db49b6 100644
--- a/tests/src/test/scala/whisk/core/invoker/ActionLogDriverTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/ActionLogDriverTests.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package whisk.core.invoker
+package whisk.core.containerpool.docker.test
 
 import java.nio.charset.StandardCharsets
 
@@ -30,20 +30,22 @@ import spray.json.pimpAny
 import whisk.common.TransactionId
 import whisk.core.entity.size._
 import whisk.http.Messages
+import whisk.core.containerpool.docker.DockerActionLogDriver
+import whisk.core.containerpool.docker.LogLine
 
 @RunWith(classOf[JUnitRunner])
 class ActionLogDriverTests
     extends FlatSpec
     with BeforeAndAfter
     with Matchers
-    with ActionLogDriver
+    with DockerActionLogDriver
     with StreamLogging {
 
     private def makeLogMsgs(lines: Seq[String], stream: String = "stdout", addSentinel: Boolean = true) = {
         val msgs = if (addSentinel) {
             lines.map((stream, _)) :+
-                ("stdout", s"${ActionLogDriver.LOG_ACTIVATION_SENTINEL}") :+
-                ("stderr", s"${ActionLogDriver.LOG_ACTIVATION_SENTINEL}")
+                ("stdout", s"${DockerActionLogDriver.LOG_ACTIVATION_SENTINEL}") :+
+                ("stderr", s"${DockerActionLogDriver.LOG_ACTIVATION_SENTINEL}")
         } else {
             lines.map((stream, _))
         }
@@ -78,8 +80,8 @@ class ActionLogDriverTests
             raw"""|{"time":"","stream":"stdout","log":"a"}
                   |{"time":"","stream":"stdout","log":"b"}
                   |{"time":"","stream":"stdout","log":"c"}
-                  |{"time":"","stream":"stdout","log":"${ActionLogDriver.LOG_ACTIVATION_SENTINEL}"}
-                  |{"time":"","stream":"stderr","log":"${ActionLogDriver.LOG_ACTIVATION_SENTINEL}"}""".stripMargin('|')
+                  |{"time":"","stream":"stdout","log":"${DockerActionLogDriver.LOG_ACTIVATION_SENTINEL}"}
+                  |{"time":"","stream":"stderr","log":"${DockerActionLogDriver.LOG_ACTIVATION_SENTINEL}"}""".stripMargin('|')
         }
     }
 
diff --git a/tests/src/test/scala/whisk/core/container/test/ContainerConnectionTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
similarity index 95%
rename from tests/src/test/scala/whisk/core/container/test/ContainerConnectionTests.scala
rename to tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
index 168c08d..0a32f46 100644
--- a/tests/src/test/scala/whisk/core/container/test/ContainerConnectionTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package whisk.core.container.test
+package whisk.core.containerpool.docker.test
 
 import java.time.Instant
 import java.nio.charset.StandardCharsets
@@ -38,9 +38,9 @@ import org.apache.http.entity.StringEntity
 
 import spray.json.JsObject
 
-import whisk.core.container.HttpUtils
 import whisk.core.entity.size._
 import whisk.core.entity.ActivationResponse._
+import whisk.core.containerpool.docker.HttpUtils
 
 /**
  * Unit tests for HttpUtils which communicate with containers.
@@ -94,7 +94,7 @@ class ContainerConnectionTests
         val connection = new HttpUtils(hostWithPort, timeout, 1.B)
         testHang = timeout * 2
         val start = Instant.now()
-        val result = connection.post("/init", JsObject(), retry=true)
+        val result = connection.post("/init", JsObject(), retry = true)
         val end = Instant.now()
         val waited = end.toEpochMilli - start.toEpochMilli
         result.isLeft shouldBe true
@@ -109,7 +109,7 @@ class ContainerConnectionTests
             Seq(null, "", "abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
                 testStatusOK = code
                 testResponse = r
-                val result = connection.post("/init", JsObject(), retry=true)
+                val result = connection.post("/init", JsObject(), retry = true)
                 result shouldBe Right {
                     ContainerResponse(okStatus = testStatusOK, if (r != null) r else "", None)
                 }
@@ -126,7 +126,7 @@ class ContainerConnectionTests
             Seq("abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
                 testStatusOK = code
                 testResponse = r
-                val result = connection.post("/init", JsObject(), retry=true)
+                val result = connection.post("/init", JsObject(), retry = true)
                 result shouldBe Right {
                     ContainerResponse(okStatus = testStatusOK, r.take(limit.toBytes.toInt), Some((r.length.B, limit)))
                 }
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 67dfa33..87a61eb 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -41,16 +41,12 @@ import spray.json._
 import whisk.common.LoggingMarkers._
 import whisk.common.LogMarker
 import whisk.common.TransactionId
-import whisk.core.container.Interval
-import whisk.core.container.RunResult
 import whisk.core.containerpool._
 import whisk.core.containerpool.docker._
 import whisk.core.entity.ActivationResponse
 import whisk.core.entity.ActivationResponse.ContainerResponse
 import whisk.core.entity.ActivationResponse.Timeout
 import whisk.core.entity.size._
-import whisk.core.invoker.ActionLogDriver
-import whisk.core.invoker.LogLine
 import whisk.http.Messages
 
 /**
@@ -375,8 +371,8 @@ class DockerContainerTests extends FlatSpec
         val appendedLog = if (appendSentinel) {
             val lastTime = log.lastOption.map { case LogLine(time, _, _) => time }.getOrElse(Instant.EPOCH.toString)
             log :+
-                LogLine(lastTime, "stderr", s"${ActionLogDriver.LOG_ACTIVATION_SENTINEL}\n") :+
-                LogLine(lastTime, "stdout", s"${ActionLogDriver.LOG_ACTIVATION_SENTINEL}\n")
+                LogLine(lastTime, "stderr", s"${DockerActionLogDriver.LOG_ACTIVATION_SENTINEL}\n") :+
+                LogLine(lastTime, "stdout", s"${DockerActionLogDriver.LOG_ACTIVATION_SENTINEL}\n")
         } else {
             log
         }
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 7191590..e3dc148 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -43,7 +43,6 @@ import spray.json._
 import spray.json.DefaultJsonProtocol._
 import whisk.common.TransactionId
 import whisk.core.connector.ActivationMessage
-import whisk.core.container.Interval
 import whisk.core.containerpool._
 import whisk.core.entity._
 import whisk.core.entity.ExecManifest.RuntimeManifest
diff --git a/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala b/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala
deleted file mode 100644
index 2b6e037..0000000
--- a/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.dispatcher.test
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.concurrent.Future
-import scala.concurrent.duration.DurationInt
-import scala.language.postfixOps
-
-import org.junit.runner.RunWith
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-import org.scalatest.junit.JUnitRunner
-
-import akka.actor.actorRef2Scala
-import common.StreamLogging
-import common.WskActorSystem
-import spray.json.JsNumber
-import spray.json.JsObject
-import whisk.common.TransactionId
-import whisk.core.connector.{ ActivationMessage => Message }
-import whisk.core.connector.MessageFeed
-import whisk.core.connector.test.TestConnector
-import whisk.core.controller.test.WhiskAuthHelpers
-import whisk.core.dispatcher.Dispatcher
-import whisk.core.dispatcher.MessageHandler
-import whisk.core.entity._
-import whisk.utils.retry
-
-@RunWith(classOf[JUnitRunner])
-class DispatcherTests
-    extends FlatSpec
-    with Matchers
-    with WskActorSystem
-    with StreamLogging {
-
-    implicit val transid = TransactionId.testing
-
-    behavior of "Dispatcher"
-
-    def logContains(w: String)(implicit stream: java.io.ByteArrayOutputStream): Boolean = {
-        retry({
-            val log = stream.toString()
-            val result = log.contains(w)
-            assert(result) // throws exception required to retry
-            result
-        }, 10, Some(100 milliseconds))
-    }
-
-    def sendMessage(connector: TestConnector, count: Int) = {
-        val content = JsObject("payload" -> JsNumber(count))
-        val user = WhiskAuthHelpers.newIdentity()
-        val path = FullyQualifiedEntityName(EntityPath("test"), EntityName(s"count-$count"), Some(SemVer()))
-        val msg = Message(TransactionId.testing, path, DocRevision.empty, user, ActivationId(), EntityPath(user.subject.asString), InstanceId(0), Some(content))
-        connector.send(msg)
-    }
-
-    class TestRule(dosomething: Message => Any) extends MessageHandler("test message handler") {
-        override def onMessage(msg: Message)(implicit transid: TransactionId): Future[Any] = {
-            logging.debug(this, s"received: ${msg.content.get.compactPrint}")
-            Future.successful {
-                dosomething(msg)
-            }
-        }
-    }
-
-    it should "send and receive a message from connector bus" in {
-        val capacity = 4
-        val connector = new TestConnector("test connector", capacity, false)
-
-        val messagesProcessed = new AtomicInteger()
-        val handler = new TestRule({ msg => messagesProcessed.incrementAndGet() })
-        val dispatcher = new Dispatcher(connector, 100 milliseconds, capacity, actorSystem)
-        dispatcher.addHandler(handler, true)
-        dispatcher.start()
-
-        try {
-            withClue("commit exception must be caught") {
-                connector.throwCommitException = true
-                Console.withErr(stream) {
-                    retry({
-                        val logs = stream.toString()
-                        logs should include regex (s"exception while pulling new activation records *.* commit failed")
-                    }, 10, Some(100 milliseconds))
-
-                    connector.throwCommitException = false
-                }
-            }
-
-            for (i <- 0 until (2 * capacity + 1)) {
-                sendMessage(connector, i + 1)
-            }
-
-            // only process as many messages as we have downstream capacity
-            withClue("messages processed") {
-                retry({
-                    messagesProcessed.get should be(capacity)
-                }, 20, Some(100 milliseconds))
-            }
-
-            withClue("confirming dispatcher is in overflow state") {
-                val logs = stream.toString()
-                logs should include regex (s"activation pipeline must drain: ${capacity + 1} > $capacity")
-            }
-
-            // send one message and check later that it remains in the connector
-            // at this point, total messages sent = 2 * capacity + 2
-            connector.occupancy shouldBe 0
-            sendMessage(connector, 2 * capacity + 2)
-            Thread.sleep(1.second.toMillis)
-
-            withClue("expecting message to still be in the queue") {
-                retry({
-                    connector.occupancy shouldBe 1
-                }, 10, Some(100 milliseconds))
-            }
-
-            // unblock the pipeline by draining 1 activations and check
-            // that dispatcher refilled the pipeline
-            stream.reset()
-            Console.withOut(stream) {
-                dispatcher.activationFeed ! MessageFeed.Processed
-                // wait until additional message is drained
-                retry({
-                    withClue("additional messages processed") {
-                        messagesProcessed.get shouldBe capacity + 1
-                    }
-                }, 10, Some(100 milliseconds))
-            }
-
-            withClue("confirm dispatcher tried to fill the pipeline") {
-                val logs = stream.toString()
-                logs should include regex (s"activation pipeline has capacity: $capacity <= $capacity")
-            }
-        } finally {
-            dispatcher.stop()
-        }
-    }
-}
diff --git a/tests/src/test/scala/whisk/core/dispatcher/test/ActivationResponseTests.scala b/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
similarity index 99%
rename from tests/src/test/scala/whisk/core/dispatcher/test/ActivationResponseTests.scala
rename to tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
index eb042d8..2890768 100644
--- a/tests/src/test/scala/whisk/core/dispatcher/test/ActivationResponseTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
@@ -15,21 +15,21 @@
  * limitations under the License.
  */
 
-package whisk.core.dispatcher.test
+package whisk.core.entity.test
 
 import scala.Vector
-
 import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.junit.JUnitRunner
-
 import spray.json.pimpAny
 import spray.json.pimpString
 import whisk.common.PrintStreamLogging
 import whisk.core.entity.ActivationResponse._
 import whisk.core.entity.size.SizeInt
 import whisk.http.Messages._
+import scala.Left
+import scala.Right
 
 @RunWith(classOf[JUnitRunner])
 class ActivationResponseTests extends FlatSpec with Matchers {

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Mime
View raw message