openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From markusthoem...@apache.org
Subject [incubator-openwhisk] branch master updated: Memory based loadbalancing. (#3747)
Date Thu, 23 Aug 2018 09:07:51 GMT
This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b3e0b6  Memory based loadbalancing. (#3747)
5b3e0b6 is described below

commit 5b3e0b6a334b78fc783a2cd655f0f30ea58a68e8
Author: Christian Bickel <github@cbickel.de>
AuthorDate: Thu Aug 23 11:07:47 2018 +0200

    Memory based loadbalancing. (#3747)
---
 ansible/group_vars/all                             |   4 +-
 ansible/roles/controller/tasks/deploy.yml          |   4 +-
 ansible/roles/invoker/tasks/deploy.yml             |   3 +-
 ansible/templates/whisk.properties.j2              |   4 -
 .../src/main/scala/whisk/core/WhiskConfig.scala    |   3 -
 .../scala/whisk/core/containerpool/Container.scala |  32 ++--
 .../core/containerpool/ContainerFactory.scala      |  20 +-
 .../src/main/scala/whisk/core/entity/Size.scala    |  17 ++
 core/controller/src/main/resources/reference.conf  |   2 +-
 .../ShardingContainerPoolBalancer.scala            |  47 +++--
 core/invoker/src/main/resources/application.conf   |   3 +-
 .../whisk/core/containerpool/ContainerPool.scala   | 207 +++++++++++++-------
 .../whisk/core/containerpool/ContainerProxy.scala  |  38 ++--
 .../scala/whisk/core/invoker/InvokerReactive.scala |   9 +-
 .../test/DockerToActivationFileLogStoreTests.scala |   2 +-
 .../mesos/test/MesosContainerFactoryTest.scala     |  27 +--
 .../containerpool/test/ContainerPoolTests.scala    | 209 +++++++++++++++++++--
 .../containerpool/test/ContainerProxyTests.scala   |   2 +-
 .../scala/whisk/core/entity/test/SizeTests.scala   |  42 +++++
 .../test/ShardingContainerPoolBalancerTests.scala  | 105 ++++++++---
 20 files changed, 561 insertions(+), 219 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 2114630..ffc658b 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -173,9 +173,7 @@ invoker:
   port: 12001
   heap: "{{ invoker_heap | default('2g') }}"
   arguments: "{{ invoker_arguments | default('') }}"
-  numcore: 2
-  coreshare: 2
-  busyThreshold: "{{ invoker_busy_threshold | default(16) }}"
+  userMemory: "{{ invoker_user_memory | default('1024 m') }}"
   instances: "{{ groups['invokers'] | length }}"
   # Specify if it is allowed to deploy more than 1 invoker on a single machine.
   allowMultipleInstances: "{{ invoker_allow_multiple_instances | default(false) }}"
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index b39c7c2..d7f4f59 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -219,8 +219,8 @@
         "{{ controller.ssl.storeFlavor }}"
       "CONFIG_whisk_controller_https_clientAuth":
         "{{ controller.ssl.clientAuth }}"
-      "CONFIG_whisk_loadbalancer_invokerBusyThreshold":
-        "{{ invoker.busyThreshold }}"
+      "CONFIG_whisk_loadbalancer_invokerUserMemory":
+        "{{ invoker.userMemory }}"
       "CONFIG_whisk_loadbalancer_blackboxFraction":
         "{{ controller.blackboxFraction }}"
       "CONFIG_whisk_loadbalancer_timeoutFactor":
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 8fd93bf..ba17b70 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -204,8 +204,7 @@
       "CONFIG_whisk_runtimes_localImagePrefix": "{{ runtimes_local_image_prefix | default() }}"
       "CONFIG_whisk_containerFactory_containerArgs_network": "{{ invoker_container_network_name | default('bridge') }}"
       "INVOKER_CONTAINER_POLICY": "{{ invoker_container_policy_name | default()}}"
-      "CONFIG_whisk_containerPool_numCore": "{{ invoker.numcore }}"
-      "CONFIG_whisk_containerPool_coreShare": "{{ invoker.coreshare }}"
+      "CONFIG_whisk_containerPool_userMemory": "{{ invoker.userMemory }}"
       "CONFIG_whisk_docker_client_parallelRuns": "{{ invoker_parallel_runs | default() }}"
       "CONFIG_whisk_docker_containerFactory_useRunc": "{{ invoker.useRunc }}"
       "WHISK_LOGS_DIR": "{{ whisk_logs_dir }}"
diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2
index c18e22a..6b79896 100644
--- a/ansible/templates/whisk.properties.j2
+++ b/ansible/templates/whisk.properties.j2
@@ -59,8 +59,6 @@ controller.protocol={{ controller.protocol }}
 invoker.container.network=bridge
 invoker.container.policy={{ invoker_container_policy_name | default()}}
 invoker.container.dns={{ invoker_container_network_dns_servers | default()}}
-invoker.numcore={{ invoker.numcore }}
-invoker.coreshare={{ invoker.coreshare }}
 invoker.useRunc={{ invoker.useRunc }}
 
 main.docker.endpoint={{ hostvars[groups["controllers"]|first].ansible_host }}:{{ docker.port }}
@@ -92,5 +90,3 @@ db.instances={{ db.instances }}
 apigw.auth.user={{apigw_auth_user}}
 apigw.auth.pwd={{apigw_auth_pwd}}
 apigw.host.v2={{apigw_host_v2}}
-
-loadbalancer.invokerBusyThreshold={{ invoker.busyThreshold }}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 5f7a8db..df24317 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -57,7 +57,6 @@ class WhiskConfig(requiredProperties: Map[String, String],
   val wskApiHost = this(WhiskConfig.wskApiProtocol) + "://" + this(WhiskConfig.wskApiHostname) + ":" + this(
     WhiskConfig.wskApiPort)
   val controllerBlackboxFraction = this.getAsDouble(WhiskConfig.controllerBlackboxFraction, 0.10)
-  val loadbalancerInvokerBusyThreshold = this.getAsInt(WhiskConfig.loadbalancerInvokerBusyThreshold, 16)
   val controllerInstances = this(WhiskConfig.controllerInstances)
 
   val edgeHost = this(WhiskConfig.edgeHostName) + ":" + this(WhiskConfig.edgeHostApiPort)
@@ -163,8 +162,6 @@ object WhiskConfig {
   val controllerInstances = "controller.instances"
   val dbInstances = "db.instances"
 
-  val loadbalancerInvokerBusyThreshold = "loadbalancer.invokerBusyThreshold"
-
   val kafkaHostList = "kafka.hosts"
   val zookeeperHostList = "zookeeper.hosts"
 
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
index 7c46615..ff0124e 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
@@ -17,31 +17,25 @@
 
 package whisk.core.containerpool
 
-import akka.actor.ActorSystem
 import java.time.Instant
+
+import akka.actor.ActorSystem
+import akka.event.Logging.InfoLevel
 import akka.stream.scaladsl.Source
 import akka.util.ByteString
 import pureconfig._
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.duration.Duration
-import scala.concurrent.duration.FiniteDuration
-import scala.concurrent.duration._
-import scala.util.Failure
-import scala.util.Success
-import spray.json.JsObject
 import spray.json.DefaultJsonProtocol._
-import whisk.common.Logging
-import whisk.common.LoggingMarkers
-import whisk.common.TransactionId
-import whisk.core.entity.ActivationResponse
-import whisk.core.entity.ActivationResponse.ContainerConnectionError
-import whisk.core.entity.ActivationResponse.ContainerResponse
-import whisk.core.entity.ByteSize
-import whisk.http.Messages
-import akka.event.Logging.InfoLevel
+import spray.json.JsObject
+import whisk.common.{Logging, LoggingMarkers, TransactionId}
 import whisk.core.ConfigKeys
-import whisk.core.entity.ActivationEntityLimit
+import whisk.core.entity.ActivationResponse.{ContainerConnectionError, ContainerResponse}
+import whisk.core.entity.{ActivationEntityLimit, ActivationResponse, ByteSize}
+import whisk.core.entity.size._
+import whisk.http.Messages
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.{Duration, FiniteDuration, _}
+import scala.util.{Failure, Success}
 
 /**
  * An OpenWhisk biased container abstraction. This is **not only** an abstraction
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
index 3c56cf9..7b77f5f 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
@@ -18,25 +18,18 @@
 package whisk.core.containerpool
 
 import akka.actor.ActorSystem
-import scala.concurrent.Future
-import whisk.common.Logging
-import whisk.common.TransactionId
+import whisk.common.{Logging, TransactionId}
 import whisk.core.WhiskConfig
-import whisk.core.entity.ByteSize
-import whisk.core.entity.ExecManifest
-import whisk.core.entity.InvokerInstanceId
+import whisk.core.entity.{ByteSize, ExecManifest, InvokerInstanceId}
 import whisk.spi.Spi
 
+import scala.concurrent.Future
+
 case class ContainerArgsConfig(network: String,
                                dnsServers: Seq[String] = Seq.empty,
                                extraArgs: Map[String, Set[String]] = Map.empty)
 
-case class ContainerPoolConfig(numCore: Int, coreShare: Int, akkaClient: Boolean) {
-
-  /**
-   * The total number of containers is simply the number of cores dilated by the cpu sharing.
-   */
-  def maxActiveContainers = numCore * coreShare
+case class ContainerPoolConfig(userMemory: ByteSize, akkaClient: Boolean) {
 
   /**
    * The shareFactor indicates the number of containers that would share a single core, on average.
@@ -45,7 +38,8 @@ case class ContainerPoolConfig(numCore: Int, coreShare: Int, akkaClient: Boolean
    * On an idle/underloaded system, a container will still get to use underutilized CPU shares.
    */
   private val totalShare = 1024.0 // This is a pre-defined value coming from docker and not our hard-coded value.
-  def cpuShare = (totalShare / maxActiveContainers).toInt
+  // Grant more CPU to a container if it allocates more memory.
+  def cpuShare(reservedMemory: ByteSize) = (totalShare / (userMemory.toBytes / reservedMemory.toBytes)).toInt
 }
 
 /**
diff --git a/common/scala/src/main/scala/whisk/core/entity/Size.scala b/common/scala/src/main/scala/whisk/core/entity/Size.scala
index a51eb2e..34f5bc6 100644
--- a/common/scala/src/main/scala/whisk/core/entity/Size.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/Size.scala
@@ -69,6 +69,23 @@ case class ByteSize(size: Long, unit: SizeUnits.Unit) extends Ordered[ByteSize]
     ByteSize(commonSize, commonUnit)
   }
 
+  def *(other: Int): ByteSize = {
+    ByteSize(toBytes * other, SizeUnits.BYTE)
+  }
+
+  def /(other: ByteSize): Double = {
+    // Without throwing the exception the result would be `Infinity` here
+    if (other.toBytes == 0) {
+      throw new ArithmeticException
+    } else {
+      (1.0 * toBytes) / (1.0 * other.toBytes)
+    }
+  }
+
+  def /(other: Int): ByteSize = {
+    ByteSize(toBytes / other, SizeUnits.BYTE)
+  }
+
   def compare(other: ByteSize) = toBytes compare other.toBytes
 
   override def equals(that: Any): Boolean = that match {
diff --git a/core/controller/src/main/resources/reference.conf b/core/controller/src/main/resources/reference.conf
index 3cf073c..c2e329e 100644
--- a/core/controller/src/main/resources/reference.conf
+++ b/core/controller/src/main/resources/reference.conf
@@ -6,7 +6,7 @@ whisk {
     use-cluster-bootstrap: false
   }
   loadbalancer {
-    invoker-busy-threshold: 4
+    user-memory: 1024 m
     blackbox-fraction: 10%
     # factor to increase the timeout for forced active acks
     # timeout = time-limit.std * timeoutfactor + 1m
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 47be740..4adeb2a 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -89,13 +89,13 @@ import scala.util.{Failure, Success}
  *
  * ## Capacity checking
  *
- * The maximum capacity per invoker is configured using `invoker-busy-threshold`, which is the maximum amount of actions
+ * The maximum capacity per invoker is configured using `user-memory`, which is the maximum amount of memory of actions
  * running in parallel on that invoker.
  *
  * Spare capacity is determined by what the loadbalancer thinks it scheduled to each invoker. Upon scheduling, an entry
- * is made to update the books and a slot in a Semaphore is taken. That slot is only released after the response from
- * the invoker (active-ack) arrives **or** after the active-ack times out. The Semaphore has as many slots as are
- * configured via `invoker-busy-threshold`.
+ * is made to update the books and a slot for each MB of the actions memory limit in a Semaphore is taken. These slots
+ * are only released after the response from the invoker (active-ack) arrives **or** after the active-ack times out.
+ * The Semaphore has as many slots as MBs are configured in `user-memory`.
  *
  * Known caveats:
  * - In an overload scenario, activations are queued directly to the invokers, which makes the active-ack timeout
@@ -232,7 +232,12 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
       val hash = ShardingContainerPoolBalancer.generateHash(msg.user.namespace.name, action.fullyQualifiedName(false))
       val homeInvoker = hash % invokersToUse.size
       val stepSize = stepSizes(hash % stepSizes.size)
-      ShardingContainerPoolBalancer.schedule(invokersToUse, schedulingState.invokerSlots, homeInvoker, stepSize)
+      ShardingContainerPoolBalancer.schedule(
+        invokersToUse,
+        schedulingState.invokerSlots,
+        action.limits.memory.megabytes,
+        homeInvoker,
+        stepSize)
     } else {
       None
     }
@@ -381,7 +386,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
         totalActivations.decrement()
         totalActivationMemory.add(entry.memory.toMB * (-1))
         activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
-        schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release())
+        schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release(entry.memory.toMB.toInt))
 
         if (!forced) {
           entry.timeoutHandler.cancel()
@@ -456,6 +461,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
    *
    * @param invokers a list of available invokers to search in, including their state
    * @param dispatched semaphores for each invoker to give the slots away from
+   * @param slots Number of slots, that need to be acquired (e.g. memory in MB)
    * @param index the index to start from (initially should be the "homeInvoker"
    * @param step stable identifier of the entity to be scheduled
    * @return an invoker to schedule to or None of no invoker is available
@@ -463,6 +469,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
   @tailrec
   def schedule(invokers: IndexedSeq[InvokerHealth],
                dispatched: IndexedSeq[ForcibleSemaphore],
+               slots: Int,
                index: Int,
                step: Int,
                stepsDone: Int = 0)(implicit logging: Logging, transId: TransactionId): Option[InvokerInstanceId] = {
@@ -471,7 +478,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
     if (numInvokers > 0) {
       val invoker = invokers(index)
       // If the current invoker is healthy and we can get a slot
-      if (invoker.status.isUsable && dispatched(invoker.id.toInt).tryAcquire()) {
+      if (invoker.status.isUsable && dispatched(invoker.id.toInt).tryAcquire(slots)) {
         Some(invoker.id)
       } else {
         // If we've gone through all invokers
@@ -480,7 +487,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
           if (healthyInvokers.nonEmpty) {
             // Choose a healthy invoker randomly
             val random = healthyInvokers(ThreadLocalRandom.current().nextInt(healthyInvokers.size)).id
-            dispatched(random.toInt).forceAcquire()
+            dispatched(random.toInt).forceAcquire(slots)
             logging.warn(this, s"system is overloaded. Chose invoker${random.toInt} by random assignment.")
             Some(random)
           } else {
@@ -488,7 +495,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
           }
         } else {
           val newIndex = (index + step) % numInvokers
-          schedule(invokers, dispatched, newIndex, step, stepsDone + 1)
+          schedule(invokers, dispatched, slots, newIndex, step, stepsDone + 1)
         }
       }
     } else {
@@ -518,7 +525,7 @@ case class ShardingContainerPoolBalancerState(
   lbConfig: ShardingContainerPoolBalancerConfig =
     loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer))(implicit logging: Logging) {
 
-  private val totalInvokerThreshold = lbConfig.invokerBusyThreshold
+  private val totalInvokerThreshold = lbConfig.invokerUserMemory
   private var currentInvokerThreshold = totalInvokerThreshold
 
   private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0, lbConfig.blackboxFraction))
@@ -564,7 +571,7 @@ case class ShardingContainerPoolBalancerState(
       if (oldSize < newSize) {
         // Keeps the existing state..
         _invokerSlots = _invokerSlots ++ IndexedSeq.fill(newSize - oldSize) {
-          new ForcibleSemaphore(currentInvokerThreshold)
+          new ForcibleSemaphore(currentInvokerThreshold.toMB.toInt)
         }
       }
     }
@@ -587,9 +594,17 @@ case class ShardingContainerPoolBalancerState(
     val actualSize = newSize max 1 // if a cluster size < 1 is reported, falls back to a size of 1 (alone)
     if (_clusterSize != actualSize) {
       _clusterSize = actualSize
-      val newTreshold = (totalInvokerThreshold / actualSize) max 1 // letting this fall below 1 doesn't make sense
+      val newTreshold = if (totalInvokerThreshold / actualSize < MemoryLimit.minMemory) {
+        logging.warn(
+          this,
+          s"registered controllers: ${_clusterSize}: the slots per invoker fall below the min memory of one action.")(
+          TransactionId.loadbalancer)
+        MemoryLimit.minMemory // letting this fall below minMemory doesn't make sense
+      } else {
+        totalInvokerThreshold / actualSize
+      }
       currentInvokerThreshold = newTreshold
-      _invokerSlots = _invokerSlots.map(_ => new ForcibleSemaphore(currentInvokerThreshold))
+      _invokerSlots = _invokerSlots.map(_ => new ForcibleSemaphore(currentInvokerThreshold.toMB.toInt))
 
       logging.info(
         this,
@@ -610,10 +625,12 @@ case class ClusterConfig(useClusterBootstrap: Boolean)
  * Configuration for the sharding container pool balancer.
  *
  * @param blackboxFraction the fraction of all invokers to use exclusively for blackboxes
- * @param invokerBusyThreshold how many slots an invoker has available in total
+ * @param invokerUserMemory how many Bytes of memory an invoker has available in total for user containers
  * @param timeoutFactor factor to influence the timeout period for forced active acks (time-limit.std * timeoutFactor + 1m)
  */
-case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int, timeoutFactor: Int)
+case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double,
+                                               invokerUserMemory: ByteSize,
+                                               timeoutFactor: Int)
 
 /**
  * State kept for each activation until completion.
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index 244220a..57989a8 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -37,8 +37,7 @@ whisk {
   }
 
   container-pool {
-    num-core: 4      # used for computing --cpushares, and max number of containers allowed
-    core-share: 2    # used for computing --cpushares, and max number of containers allowed
+    user-memory: 1024 m
     akka-client:  false # if true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient)
   }
 
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index 90f5d73..fd50a3c 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -17,14 +17,15 @@
 
 package whisk.core.containerpool
 
-import scala.collection.immutable
-import whisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
 import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
+import whisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
+import whisk.core.connector.MessageFeed
 import whisk.core.entity._
 import whisk.core.entity.size._
-import whisk.core.connector.MessageFeed
 
+import scala.collection.immutable
 import scala.concurrent.duration._
+import scala.util.Try
 
 sealed trait WorkerState
 case object Busy extends WorkerState
@@ -57,11 +58,17 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
                     prewarmConfig: List[PrewarmingConfig] = List.empty,
                     poolConfig: ContainerPoolConfig)
     extends Actor {
+  import ContainerPool.memoryConsumptionOf
+
   implicit val logging = new AkkaLogging(context.system.log)
 
   var freePool = immutable.Map.empty[ActorRef, ContainerData]
   var busyPool = immutable.Map.empty[ActorRef, ContainerData]
   var prewarmedPool = immutable.Map.empty[ActorRef, ContainerData]
+  // If all memory slots are occupied and if there is currently no container to be removed, than the actions will be
+  // buffered here to keep order of computation.
+  // Otherwise actions with small memory-limits could block actions with large memory limits.
+  var runBuffer = immutable.Queue.empty[Run]
   val logMessageInterval = 10.seconds
 
   prewarmConfig.foreach { config =>
@@ -91,62 +98,87 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
     // their requests and send them back to the pool for rescheduling (this may happen if "docker" operations
     // fail for example, or a container has aged and was destroying itself when a new request was assigned)
     case r: Run =>
-      val createdContainer = if (busyPool.size < poolConfig.maxActiveContainers) {
+      // Check if the message is resent from the buffer. Only the first message on the buffer can be resent.
+      val isResentFromBuffer = runBuffer.nonEmpty && runBuffer.dequeueOption.exists(_._1.msg == r.msg)
 
-        // Schedule a job to a warm container
-        ContainerPool
-          .schedule(r.action, r.msg.user.namespace.name, freePool)
-          .map(container => {
-            (container, "warm")
-          })
-          .orElse {
-            if (busyPool.size + freePool.size < poolConfig.maxActiveContainers) {
-              takePrewarmContainer(r.action)
-                .map(container => {
-                  (container, "prewarmed")
-                })
-                .orElse {
-                  Some(createContainer(), "cold")
-                }
-            } else None
-          }
-          .orElse {
-            // Remove a container and create a new one for the given job
-            ContainerPool.remove(freePool).map { toDelete =>
-              removeContainer(toDelete)
-              takePrewarmContainer(r.action)
-                .map(container => {
-                  (container, "recreated")
-                })
-                .getOrElse {
-                  (createContainer(), "recreated")
-                }
-            }
-          }
-      } else None
+      // Only process request, if there are no other requests waiting for free slots, or if the current request is the
+      // next request to process
+      // It is guaranteed, that only the first message on the buffer is resent.
+      if (runBuffer.isEmpty || isResentFromBuffer) {
+        val createdContainer =
+          // Is there enough space on the invoker for this action to be executed.
+          if (hasPoolSpaceFor(busyPool, r.action.limits.memory.megabytes.MB)) {
+            // Schedule a job to a warm container
+            ContainerPool
+              .schedule(r.action, r.msg.user.namespace.name, freePool)
+              .map(container => (container, "warm"))
+              .orElse(
+                // There was no warm container. Try to take a prewarm container or a cold container.
+
+                // Is there enough space to create a new container or do other containers have to be removed?
+                if (hasPoolSpaceFor(busyPool ++ freePool, r.action.limits.memory.megabytes.MB)) {
+                  takePrewarmContainer(r.action)
+                    .map(container => (container, "prewarmed"))
+                    .orElse(Some(createContainer(r.action.limits.memory.megabytes.MB), "cold"))
+                } else None)
+              .orElse(
+                // Remove a container and create a new one for the given job
+                ContainerPool
+                // Only free up the amount, that is really needed to free up
+                  .remove(freePool, Math.min(r.action.limits.memory.megabytes, memoryConsumptionOf(freePool)).MB)
+                  .map(removeContainer)
+                  // If the list had at least one entry, enough containers were removed to start the new container. After
+                  // removing the containers, we are not interested anymore in the containers that have been removed.
+                  .headOption
+                  .map(_ =>
+                    takePrewarmContainer(r.action)
+                      .map(container => (container, "recreatedPrewarm"))
+                      .getOrElse(createContainer(r.action.limits.memory.megabytes.MB), "recreated")))
+          } else None
 
-      createdContainer match {
-        case Some(((actor, data), containerState)) =>
-          busyPool = busyPool + (actor -> data)
-          freePool = freePool - actor
-          actor ! r // forwards the run request to the container
-          logContainerStart(r, containerState)
-        case None =>
-          // this can also happen if createContainer fails to start a new container, or
-          // if a job is rescheduled but the container it was allocated to has not yet destroyed itself
-          // (and a new container would over commit the pool)
-          val isErrorLogged = r.retryLogDeadline.map(_.isOverdue).getOrElse(true)
-          val retryLogDeadline = if (isErrorLogged) {
-            logging.error(
-              this,
-              s"Rescheduling Run message, too many message in the pool, freePoolSize: ${freePool.size}, " +
-                s"busyPoolSize: ${busyPool.size}, maxActiveContainers ${poolConfig.maxActiveContainers}, " +
-                s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}")(r.msg.transid)
-            Some(logMessageInterval.fromNow)
-          } else {
-            r.retryLogDeadline
-          }
-          self ! Run(r.action, r.msg, retryLogDeadline)
+        createdContainer match {
+          case Some(((actor, data), containerState)) =>
+            busyPool = busyPool + (actor -> data)
+            freePool = freePool - actor
+            // Remove the action that get's executed now from the buffer and execute the next one afterwards.
+            if (isResentFromBuffer) {
+              // It is guaranteed that the currently executed messages is the head of the queue, if the message comes
+              // from the buffer
+              val (_, newBuffer) = runBuffer.dequeue
+              runBuffer = newBuffer
+              runBuffer.dequeueOption.foreach { case (run, _) => self ! run }
+            }
+            actor ! r // forwards the run request to the container
+            logContainerStart(r, containerState)
+          case None =>
+            // this can also happen if createContainer fails to start a new container, or
+            // if a job is rescheduled but the container it was allocated to has not yet destroyed itself
+            // (and a new container would over commit the pool)
+            val isErrorLogged = r.retryLogDeadline.map(_.isOverdue).getOrElse(true)
+            val retryLogDeadline = if (isErrorLogged) {
+              logging.error(
+                this,
+                s"Rescheduling Run message, too many message in the pool, " +
+                  s"freePoolSize: ${freePool.size} containers and ${memoryConsumptionOf(freePool)} MB, " +
+                  s"busyPoolSize: ${busyPool.size} containers and ${memoryConsumptionOf(busyPool)} MB, " +
+                  s"maxContainersMemory ${poolConfig.userMemory.toMB} MB, " +
+                  s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}, " +
+                  s"needed memory: ${r.action.limits.memory.megabytes} MB")(r.msg.transid)
+              Some(logMessageInterval.fromNow)
+            } else {
+              r.retryLogDeadline
+            }
+            if (!isResentFromBuffer) {
+              // Add this request to the buffer, as it is not there yet.
+              runBuffer = runBuffer.enqueue(r)
+            }
+            // As this request is the first one in the buffer, try again to execute it.
+            self ! Run(r.action, r.msg, retryLogDeadline)
+        }
+      } else {
+        // There are currently actions waiting to be executed before this action gets executed.
+        // These waiting actions were not able to free up enough memory.
+        runBuffer = runBuffer.enqueue(r)
       }
 
     // Container is free to take more work
@@ -181,22 +213,22 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
   }
 
   /** Creates a new container and updates state accordingly. */
-  def createContainer(): (ActorRef, ContainerData) = {
+  def createContainer(memoryLimit: ByteSize): (ActorRef, ContainerData) = {
     val ref = childFactory(context)
-    val data = NoData()
+    val data = MemoryData(memoryLimit)
     freePool = freePool + (ref -> data)
     ref -> data
   }
 
   /** Creates a new prewarmed container */
-  def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize) =
+  def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize): Unit =
     childFactory(context) ! Start(exec, memoryLimit)
 
   /**
    * Takes a prewarm container out of the prewarmed pool
-   * iff a container with a matching kind is found.
+   * iff a container with a matching kind and memory is found.
    *
-   * @param kind the kind you want to invoke
+   * @param action the action that holds the kind and the required memory.
    * @return the container iff found
    */
   def takePrewarmContainer(action: ExecutableWhiskAction): Option[(ActorRef, ContainerData)] = {
@@ -213,7 +245,8 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
           freePool = freePool + (ref -> data)
           prewarmedPool = prewarmedPool - ref
           // Create a new prewarm container
-          // NOTE: prewarming ignores the action code in exec, but this is dangerous as the field is accessible to the factory
+          // NOTE: prewarming ignores the action code in exec, but this is dangerous as the field is accessible to the
+          // factory
           prewarmContainer(action.exec, memory)
           (ref, data)
       }
@@ -225,11 +258,32 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
     freePool = freePool - toDelete
     busyPool = busyPool - toDelete
   }
+
+  /**
+   * Calculate if there is enough free memory within a given pool.
+   *
+   * @param pool The pool, that has to be checked, if there is enough free memory.
+   * @param memory The amount of memory to check.
+   * @return true, if there is enough space for the given amount of memory.
+   */
+  def hasPoolSpaceFor[A](pool: Map[A, ContainerData], memory: ByteSize): Boolean = {
+    memoryConsumptionOf(pool) + memory.toMB <= poolConfig.userMemory.toMB
+  }
 }
 
 object ContainerPool {
 
   /**
+   * Calculate the memory of a given pool.
+   *
+   * @param pool The pool with the containers.
+   * @return The memory consumption of all containers in the pool in Megabytes.
+   */
+  protected[containerpool] def memoryConsumptionOf[A](pool: Map[A, ContainerData]): Long = {
+    pool.map(_._2.memoryLimit.toMB).sum
+  }
+
+  /**
    * Finds the best container for a given job to run on.
    *
    * Selects an arbitrary warm container from the passed pool of idle containers
@@ -255,22 +309,37 @@ object ContainerPool {
 
   /**
    * Finds the oldest previously used container to remove to make space for the job passed to run.
+   * Depending on the space that has to be allocated, several containers might be removed.
    *
    * NOTE: This method is never called to remove an action that is in the pool already,
    * since this would be picked up earlier in the scheduler and the container reused.
    *
    * @param pool a map of all free containers in the pool
-   * @return a container to be removed iff found
+   * @param memory the amount of memory that has to be freed up
+   * @return a list of containers to be removed iff found
    */
-  protected[containerpool] def remove[A](pool: Map[A, ContainerData]): Option[A] = {
+  protected[containerpool] def remove[A](pool: Map[A, ContainerData], memory: ByteSize): List[A] = {
     val freeContainers = pool.collect {
+      // Only warm containers will be removed. Prewarmed containers will stay always.
       case (ref, w: WarmedData) => ref -> w
     }
 
-    if (freeContainers.nonEmpty) {
-      val (ref, _) = freeContainers.minBy(_._2.lastUsed)
-      Some(ref)
-    } else None
+    if (memory > 0.B && freeContainers.nonEmpty && memoryConsumptionOf(freeContainers) >= memory.toMB) {
+      // Remove the oldest container if:
+      // - there is more memory required
+      // - there are still containers that can be removed
+      // - there are enough free containers that can be removed
+      val (ref, data) = freeContainers.minBy(_._2.lastUsed)
+      // Catch exception if remaining memory will be negative
+      val remainingMemory = Try(memory - data.memoryLimit).getOrElse(0.B)
+      List(ref) ++ remove(freeContainers - ref, remainingMemory)
+    } else {
+      // If this is the first call: All containers are in use currently, or there is more memory needed than
+      // containers can be removed.
+      // Or, if this is one of the recursions: Enough containers are found to get the memory, that is
+      // necessary. -> Abort recursion
+      List.empty
+    }
   }
 
   def props(factory: ActorRefFactory => ActorRef,
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index 92eda45..0ddd666 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -19,28 +19,26 @@ package whisk.core.containerpool
 
 import java.time.Instant
 
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.util.Success
-import scala.util.Failure
-import akka.actor.FSM
-import akka.actor.Props
-import akka.actor.Stash
 import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{FSM, Props, Stash}
+import akka.event.Logging.InfoLevel
 import akka.pattern.pipe
-import spray.json._
+import pureconfig.loadConfigOrThrow
 import spray.json.DefaultJsonProtocol._
+import spray.json._
 import whisk.common.{AkkaLogging, Counter, LoggingMarkers, TransactionId}
+import whisk.core.ConfigKeys
 import whisk.core.connector.ActivationMessage
 import whisk.core.containerpool.logging.LogCollectingException
+import whisk.core.database.UserContext
+import whisk.core.entity.ExecManifest.ImageName
 import whisk.core.entity._
 import whisk.core.entity.size._
-import whisk.core.entity.ExecManifest.ImageName
 import whisk.http.Messages
-import akka.event.Logging.InfoLevel
-import pureconfig.loadConfigOrThrow
-import whisk.core.ConfigKeys
-import whisk.core.database.UserContext
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.{Failure, Success}
 
 // States
 sealed trait ContainerState
@@ -54,14 +52,16 @@ case object Paused extends ContainerState
 case object Removing extends ContainerState
 
 // Data
-sealed abstract class ContainerData(val lastUsed: Instant)
-case class NoData() extends ContainerData(Instant.EPOCH)
-case class PreWarmedData(container: Container, kind: String, memoryLimit: ByteSize) extends ContainerData(Instant.EPOCH)
+sealed abstract class ContainerData(val lastUsed: Instant, val memoryLimit: ByteSize)
+case class NoData() extends ContainerData(Instant.EPOCH, 0.B)
+case class MemoryData(override val memoryLimit: ByteSize) extends ContainerData(Instant.EPOCH, memoryLimit)
+case class PreWarmedData(container: Container, kind: String, override val memoryLimit: ByteSize)
+    extends ContainerData(Instant.EPOCH, memoryLimit)
 case class WarmedData(container: Container,
                       invocationNamespace: EntityName,
                       action: ExecutableWhiskAction,
                       override val lastUsed: Instant)
-    extends ContainerData(lastUsed)
+    extends ContainerData(lastUsed, action.limits.memory.megabytes.MB)
 
 // Events received by the actor
 case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
@@ -120,7 +120,7 @@ class ContainerProxy(
         job.exec.image,
         job.exec.pull,
         job.memoryLimit,
-        poolConfig.cpuShare)
+        poolConfig.cpuShare(job.memoryLimit))
         .map(container => PreWarmedData(container, job.exec.kind, job.memoryLimit))
         .pipeTo(self)
 
@@ -137,7 +137,7 @@ class ContainerProxy(
         job.action.exec.image,
         job.action.exec.pull,
         job.action.limits.memory.megabytes.MB,
-        poolConfig.cpuShare)
+        poolConfig.cpuShare(job.action.limits.memory.megabytes.MB))
 
       // container factory will either yield a new container ready to execute the action, or
       // starting up the container failed; for the latter, it's either an internal error starting
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 634e416..5f4fd8d 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -25,23 +25,24 @@ import akka.event.Logging.InfoLevel
 import akka.stream.ActorMaterializer
 import org.apache.kafka.common.errors.RecordTooLargeException
 import pureconfig._
+import spray.json.DefaultJsonProtocol._
 import spray.json._
 import whisk.common.tracing.WhiskTracerProvider
 import whisk.common._
-import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.core.connector._
 import whisk.core.containerpool._
 import whisk.core.containerpool.logging.LogStoreProvider
 import whisk.core.database._
 import whisk.core.entity._
+import whisk.core.entity.size._
+import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.http.Messages
 import whisk.spi.SpiLoader
 import whisk.core.database.UserContext
 
-import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Failure, Success}
-import DefaultJsonProtocol._
 
 class InvokerReactive(
   config: WhiskConfig,
@@ -99,7 +100,7 @@ class InvokerReactive(
 
   /** Initialize message consumers */
   private val topic = s"invoker${instance.toInt}"
-  private val maximumContainers = poolConfig.maxActiveContainers
+  private val maximumContainers = (poolConfig.userMemory / MemoryLimit.minMemory).toInt
   private val msgProvider = SpiLoader.get[MessagingProvider]
   private val consumer = msgProvider.getConsumer(
     config,
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala
index e539eeb..8548cfc 100644
--- a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala
@@ -26,8 +26,8 @@ import common.{StreamLogging, WskActorSystem}
 import org.junit.runner.RunWith
 import org.scalatest.Matchers
 import org.scalatest.junit.JUnitRunner
-import spray.json._
 import spray.json.DefaultJsonProtocol._
+import spray.json._
 import whisk.common.TransactionId
 import whisk.core.containerpool.logging.{DockerToActivationFileLogStore, LogLine}
 import whisk.core.entity._
diff --git a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
index 73ca88e..d5187ed 100644
--- a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
@@ -78,9 +78,10 @@ class MesosContainerFactoryTest
     lastTaskId
   }
 
-  val poolConfig = ContainerPoolConfig(8, 10, false)
-  val dockerCpuShares = poolConfig.cpuShare
-  val mesosCpus = poolConfig.cpuShare / 1024.0
+  // 80 slots, each 265MB
+  val poolConfig = ContainerPoolConfig(21200.MB, false)
+  val actionMemory = 265.MB
+  val mesosCpus = poolConfig.cpuShare(actionMemory) / 1024.0
 
   val containerArgsConfig =
     new ContainerArgsConfig("net1", Seq("dns1", "dns2"), Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4")))
@@ -134,8 +135,8 @@ class MesosContainerFactoryTest
       "mesosContainer",
       ImageName("fakeImage"),
       false,
-      1.MB,
-      poolConfig.cpuShare)
+      actionMemory,
+      poolConfig.cpuShare(actionMemory))
 
     expectMsg(
       SubmitTask(TaskDef(
@@ -143,7 +144,7 @@ class MesosContainerFactoryTest
         "mesosContainer",
         "fakeImage",
         mesosCpus,
-        1,
+        actionMemory.toMB.toInt,
         List(8080),
         Some(0),
         false,
@@ -184,15 +185,15 @@ class MesosContainerFactoryTest
       "mesosContainer",
       ImageName("fakeImage"),
       false,
-      1.MB,
-      poolConfig.cpuShare)
+      actionMemory,
+      poolConfig.cpuShare(actionMemory))
     probe.expectMsg(
       SubmitTask(TaskDef(
         lastTaskId,
         "mesosContainer",
         "fakeImage",
         mesosCpus,
-        1,
+        actionMemory.toMB.toInt,
         List(8080),
         Some(0),
         false,
@@ -255,8 +256,8 @@ class MesosContainerFactoryTest
       "mesosContainer",
       ImageName("fakeImage"),
       false,
-      1.MB,
-      poolConfig.cpuShare)
+      actionMemory,
+      poolConfig.cpuShare(actionMemory))
 
     probe.expectMsg(
       SubmitTask(TaskDef(
@@ -264,7 +265,7 @@ class MesosContainerFactoryTest
         "mesosContainer",
         "fakeImage",
         mesosCpus,
-        1,
+        actionMemory.toMB.toInt,
         List(8080),
         Some(0),
         false,
@@ -293,7 +294,7 @@ class MesosContainerFactoryTest
     implicit val tid = TransactionId.testing
     implicit val m = ActorMaterializer()
     val logs = container
-      .logs(1.MB, false)
+      .logs(actionMemory, false)
       .via(DockerToActivationLogStore.toFormattedString)
       .runWith(Sink.seq)
     await(logs)(0) should endWith
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index 20bd353..78317ad 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -88,8 +88,13 @@ class ContainerPoolTests
   val differentInvocationNamespace = EntityName("invocationSpace2")
   val action = ExecutableWhiskAction(EntityPath("actionSpace"), EntityName("actionName"), exec)
   val differentAction = action.copy(name = EntityName("actionName2"))
+  val largeAction =
+    action.copy(
+      name = EntityName("largeAction"),
+      limits = ActionLimits(memory = MemoryLimit(MemoryLimit.stdMemory * 2)))
 
   val runMessage = createRunMessage(action, invocationNamespace)
+  val runMessageLarge = createRunMessage(largeAction, invocationNamespace)
   val runMessageDifferentAction = createRunMessage(differentAction, invocationNamespace)
   val runMessageDifferentVersion = createRunMessage(action.copy().revision(DocRevision("v2")), invocationNamespace)
   val runMessageDifferentNamespace = createRunMessage(action, differentInvocationNamespace)
@@ -113,7 +118,7 @@ class ContainerPoolTests
     (containers, factory)
   }
 
-  def poolConfig(numCore: Int, coreShare: Int) = ContainerPoolConfig(numCore, coreShare, false)
+  def poolConfig(userMemory: ByteSize) = ContainerPoolConfig(userMemory, false)
 
   behavior of "ContainerPool"
 
@@ -126,7 +131,8 @@ class ContainerPoolTests
   it should "reuse a warm container" in within(timeout) {
     val (containers, factory) = testContainers(2)
     val feed = TestProbe()
-    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(2, 2), feed.ref))
+    // Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref))
 
     pool ! runMessage
     containers(0).expectMsg(runMessage)
@@ -140,7 +146,8 @@ class ContainerPoolTests
   it should "reuse a warm container when action is the same even if revision changes" in within(timeout) {
     val (containers, factory) = testContainers(2)
     val feed = TestProbe()
-    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(2, 2), feed.ref))
+    // Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref))
 
     pool ! runMessage
     containers(0).expectMsg(runMessage)
@@ -155,7 +162,8 @@ class ContainerPoolTests
     val (containers, factory) = testContainers(2)
     val feed = TestProbe()
 
-    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(2, 2), feed.ref))
+    // Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref))
     pool ! runMessage
     containers(0).expectMsg(runMessage)
     // Note that the container doesn't respond, thus it's not free to take work
@@ -169,7 +177,7 @@ class ContainerPoolTests
     val feed = TestProbe()
 
     // a pool with only 1 slot
-    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(1, 1), feed.ref))
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory), feed.ref))
     pool ! runMessage
     containers(0).expectMsg(runMessage)
     containers(0).send(pool, NeedWork(warmedData()))
@@ -179,12 +187,35 @@ class ContainerPoolTests
     containers(1).expectMsg(runMessageDifferentEverything)
   }
 
+  it should "remove several containers to make space in the pool if it is already full and a different large action arrives" in within(
+    timeout) {
+    val (containers, factory) = testContainers(3)
+    val feed = TestProbe()
+
+    // a pool with slots for 2 actions with default memory limit.
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(512.MB), feed.ref))
+    pool ! runMessage
+    containers(0).expectMsg(runMessage)
+    pool ! runMessageDifferentAction // 2 * stdMemory taken -> full
+    containers(1).expectMsg(runMessageDifferentAction)
+
+    containers(0).send(pool, NeedWork(warmedData())) // first action finished -> 1 * stdMemory taken
+    feed.expectMsg(MessageFeed.Processed)
+    containers(1).send(pool, NeedWork(warmedData())) // second action finished -> 1 * stdMemory taken
+    feed.expectMsg(MessageFeed.Processed)
+
+    pool ! runMessageLarge // need to remove both action to make space for the large action (needs 2 * stdMemory)
+    containers(0).expectMsg(Remove)
+    containers(1).expectMsg(Remove)
+    containers(2).expectMsg(runMessageLarge)
+  }
+
   it should "cache a container if there is still space in the pool" in within(timeout) {
     val (containers, factory) = testContainers(2)
     val feed = TestProbe()
 
     // a pool with only 1 active slot but 2 slots in total
-    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(1, 2), feed.ref))
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref))
 
     // Run the first container
     pool ! runMessage
@@ -210,7 +241,7 @@ class ContainerPoolTests
     val feed = TestProbe()
 
     // a pool with only 1 slot
-    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(1, 1), feed.ref))
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory), feed.ref))
     pool ! runMessage
     containers(0).expectMsg(runMessage)
     containers(0).send(pool, NeedWork(warmedData()))
@@ -225,7 +256,7 @@ class ContainerPoolTests
     val feed = TestProbe()
 
     // a pool with only 1 slot
-    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(1, 1), feed.ref))
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory), feed.ref))
     pool ! runMessage
     containers(0).expectMsg(runMessage)
     containers(0).send(pool, RescheduleJob) // emulate container failure ...
@@ -234,6 +265,34 @@ class ContainerPoolTests
     containers(1).expectMsg(runMessage) // job resent to new actor
   }
 
+  it should "not start a new container if there is not enough space in the pool" in within(timeout) {
+    val (containers, factory) = testContainers(2)
+    val feed = TestProbe()
+
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref))
+
+    // Start first action
+    pool ! runMessage // 1 * stdMemory taken
+    containers(0).expectMsg(runMessage)
+
+    // Send second action to the pool
+    pool ! runMessageLarge // message is too large to be processed immediately.
+    containers(1).expectNoMessage(100.milliseconds)
+
+    // First action is finished
+    containers(0).send(pool, NeedWork(warmedData())) // pool is empty again.
+    feed.expectMsg(MessageFeed.Processed)
+
+    // Second action should run now
+    containers(1).expectMsgPF() {
+      // The `Some` assures, that it has been retried while the first action was still blocking the invoker.
+      case Run(runMessageLarge.action, runMessageLarge.msg, Some(_)) => true
+    }
+
+    containers(1).send(pool, NeedWork(warmedData()))
+    feed.expectMsg(MessageFeed.Processed)
+  }
+
   /*
    * CONTAINER PREWARMING
    */
@@ -244,7 +303,7 @@ class ContainerPoolTests
     val pool =
       system.actorOf(
         ContainerPool
-          .props(factory, poolConfig(0, 0), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit))))
+          .props(factory, poolConfig(0.MB), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit))))
     containers(0).expectMsg(Start(exec, memoryLimit))
   }
 
@@ -255,7 +314,7 @@ class ContainerPoolTests
     val pool =
       system.actorOf(
         ContainerPool
-          .props(factory, poolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit))))
+          .props(factory, poolConfig(MemoryLimit.stdMemory), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit))))
     containers(0).expectMsg(Start(exec, memoryLimit))
     containers(0).send(pool, NeedWork(preWarmedData(exec.kind)))
     pool ! runMessage
@@ -270,7 +329,11 @@ class ContainerPoolTests
 
     val pool = system.actorOf(
       ContainerPool
-        .props(factory, poolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, alternativeExec, memoryLimit))))
+        .props(
+          factory,
+          poolConfig(MemoryLimit.stdMemory),
+          feed.ref,
+          List(PrewarmingConfig(1, alternativeExec, memoryLimit))))
     containers(0).expectMsg(Start(alternativeExec, memoryLimit)) // container0 was prewarmed
     containers(0).send(pool, NeedWork(preWarmedData(alternativeExec.kind)))
     pool ! runMessage
@@ -284,9 +347,8 @@ class ContainerPoolTests
     val alternativeLimit = 128.MB
 
     val pool =
-      system.actorOf(
-        ContainerPool
-          .props(factory, poolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, exec, alternativeLimit))))
+      system.actorOf(ContainerPool
+        .props(factory, poolConfig(MemoryLimit.stdMemory), feed.ref, List(PrewarmingConfig(1, exec, alternativeLimit))))
     containers(0).expectMsg(Start(exec, alternativeLimit)) // container0 was prewarmed
     containers(0).send(pool, NeedWork(preWarmedData(exec.kind, alternativeLimit)))
     pool ! runMessage
@@ -300,7 +362,7 @@ class ContainerPoolTests
     val (containers, factory) = testContainers(2)
     val feed = TestProbe()
 
-    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(2, 2), feed.ref))
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref))
 
     // container0 is created and used
     pool ! runMessage
@@ -319,6 +381,97 @@ class ContainerPoolTests
     pool ! runMessage
     containers(1).expectMsg(runMessage)
   }
+
+  /*
+   * Run buffer
+   */
+  it should "first put messages into the queue and retrying them and then put messages only into the queue" in within(
+    timeout) {
+    val (containers, factory) = testContainers(2)
+    val feed = TestProbe()
+
+    // Pool with 512 MB usermemory
+    val pool =
+      system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref))
+
+    // Send action that blocks the pool
+    pool ! runMessageLarge
+    containers(0).expectMsg(runMessageLarge)
+
+    // Send action that should be written to the queue and retried in invoker
+    pool ! runMessage
+    containers(1).expectNoMessage(100.milliseconds)
+
+    // Send another message that should not be retried, but put into the queue as well
+    pool ! runMessageDifferentAction
+    containers(2).expectNoMessage(100.milliseconds)
+
+    // Action with 512 MB is finished
+    containers(0).send(pool, NeedWork(warmedData()))
+    feed.expectMsg(MessageFeed.Processed)
+
+    // Action 1 should start immediately
+    containers(0).expectMsgPF() {
+      // The `Some` assures, that it has been retried while the first action was still blocking the invoker.
+      case Run(runMessage.action, runMessage.msg, Some(_)) => true
+    }
+    // Action 2 should start immediately as well (without any retries, as there is already enough space in the pool)
+    containers(1).expectMsg(runMessageDifferentAction)
+  }
+
+  it should "process activations in the order they are arriving" in within(timeout) {
+    val (containers, factory) = testContainers(4)
+    val feed = TestProbe()
+
+    // Pool with 512 MB usermemory
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref))
+
+    // Send 4 actions to the ContainerPool (Action 0, Action 2 and Action 3 with each 265 MB and Action 1 with 512 MB)
+    pool ! runMessage
+    containers(0).expectMsg(runMessage)
+    pool ! runMessageLarge
+    containers(1).expectNoMessage(100.milliseconds)
+    pool ! runMessageDifferentNamespace
+    containers(2).expectNoMessage(100.milliseconds)
+    pool ! runMessageDifferentAction
+    containers(3).expectNoMessage(100.milliseconds)
+
+    // Action 0 ist finished -> Large action should be executed now
+    containers(0).send(pool, NeedWork(warmedData()))
+    feed.expectMsg(MessageFeed.Processed)
+    containers(1).expectMsgPF() {
+      // The `Some` assures, that it has been retried while the first action was still blocking the invoker.
+      case Run(runMessageLarge.action, runMessageLarge.msg, Some(_)) => true
+    }
+
+    // Send another action to the container pool, that would fit memory-wise
+    pool ! runMessageDifferentEverything
+    containers(4).expectNoMessage(100.milliseconds)
+
+    // Action 1 is finished -> Action 2 and Action 3 should be executed now
+    containers(1).send(pool, NeedWork(warmedData()))
+    feed.expectMsg(MessageFeed.Processed)
+    containers(2).expectMsgPF() {
+      // The `Some` assures, that it has been retried while the first action was still blocking the invoker.
+      case Run(runMessageDifferentNamespace.action, runMessageDifferentNamespace.msg, Some(_)) => true
+    }
+    // Assert retryLogline = false to check if this request has been stored in the queue instead of retrying in the system
+    containers(3).expectMsg(runMessageDifferentAction)
+
+    // Action 3 is finished -> Action 4 should start
+    containers(3).send(pool, NeedWork(warmedData()))
+    feed.expectMsg(MessageFeed.Processed)
+    containers(4).expectMsgPF() {
+      // The `Some` assures, that it has been retried while the first action was still blocking the invoker.
+      case Run(runMessageDifferentEverything.action, runMessageDifferentEverything.msg, Some(_)) => true
+    }
+
+    // Action 2 and 4 are finished
+    containers(2).send(pool, NeedWork(warmedData()))
+    feed.expectMsg(MessageFeed.Processed)
+    containers(4).send(pool, NeedWork(warmedData()))
+    feed.expectMsg(MessageFeed.Processed)
+  }
 }
 
 /**
@@ -419,18 +572,24 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
   behavior of "ContainerPool remove()"
 
   it should "not provide a container if pool is empty" in {
-    ContainerPool.remove(Map.empty) shouldBe None
+    ContainerPool.remove(Map.empty, MemoryLimit.stdMemory) shouldBe List.empty
   }
 
   it should "not provide a container from busy pool with non-warm containers" in {
     val pool = Map('none -> noData(), 'pre -> preWarmedData())
-    ContainerPool.remove(pool) shouldBe None
+    ContainerPool.remove(pool, MemoryLimit.stdMemory) shouldBe List.empty
+  }
+
+  it should "not provide a container from pool if there is not enough capacity" in {
+    val pool = Map('first -> warmedData())
+
+    ContainerPool.remove(pool, MemoryLimit.stdMemory * 2) shouldBe List.empty
   }
 
   it should "provide a container from pool with one single free container" in {
     val data = warmedData()
     val pool = Map('warm -> data)
-    ContainerPool.remove(pool) shouldBe Some('warm)
+    ContainerPool.remove(pool, MemoryLimit.stdMemory) shouldBe List('warm)
   }
 
   it should "provide oldest container from busy pool with multiple containers" in {
@@ -441,6 +600,18 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
 
     val pool = Map('first -> first, 'second -> second, 'oldest -> oldest)
 
-    ContainerPool.remove(pool) shouldBe Some('oldest)
+    ContainerPool.remove(pool, MemoryLimit.stdMemory) shouldBe List('oldest)
+  }
+
+  it should "provide a list of the oldest containers from pool, if several containers have to be removed" in {
+    val namespace = differentNamespace.asString
+    val first = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(1))
+    val second = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(2))
+    val third = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(3))
+    val oldest = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(0))
+
+    val pool = Map('first -> first, 'second -> second, 'third -> third, 'oldest -> oldest)
+
+    ContainerPool.remove(pool, MemoryLimit.stdMemory * 2) shouldBe List('oldest, 'first)
   }
 }
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index a46647a..4a2a133 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -168,7 +168,7 @@ class ContainerProxyTests
     Future.successful(())
   }
 
-  val poolConfig = ContainerPoolConfig(1, 2, false)
+  val poolConfig = ContainerPoolConfig(2.MB, false)
 
   behavior of "ContainerProxy"
 
diff --git a/tests/src/test/scala/whisk/core/entity/test/SizeTests.scala b/tests/src/test/scala/whisk/core/entity/test/SizeTests.scala
index c74f146..d800f14 100644
--- a/tests/src/test/scala/whisk/core/entity/test/SizeTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/SizeTests.scala
@@ -93,6 +93,48 @@ class SizeTests extends FlatSpec with Matchers {
     }
   }
 
+  // Multiplication
+  it should "2 B * 10 = 20 B" in {
+    2.B * 10 should be(20.B)
+  }
+
+  it should "40 MB * 2 = 80 MB" in {
+    40.MB * 2 should be(80.MB)
+  }
+
+  // Division
+  it should "5 Byte / 2 Byte = 2.5" in {
+    5.B / 2.B should be(2.5)
+  }
+
+  it should "1 KB / 512 Byte = 2" in {
+    1.KB / 512.B should be(2)
+  }
+
+  it should "throw an exception if division is through 0 byte" in {
+    an[ArithmeticException] should be thrownBy {
+      1.MB / 0.B
+    }
+  }
+
+  it should "5 Byte / 2 = 2 Byte" in {
+    5.B / 2 should be(2.B)
+  }
+
+  it should "1 MB / 512 = 2 Byte" in {
+    1.MB / 512 should be(2.KB)
+  }
+
+  it should "not go into integer overflow for a few GB" in {
+    4096.MB / 2 should be(2048.MB)
+  }
+
+  it should "throw an exception if division is through 0" in {
+    an[ArithmeticException] should be thrownBy {
+      1.MB / 0
+    }
+  }
+
   // Conversions
   it should "1024 B to KB = 1" in {
     (1024 B).toKB should be(1)
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 4e906d0..211268f 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -19,12 +19,12 @@ package whisk.core.loadBalancer.test
 
 import common.StreamLogging
 import org.junit.runner.RunWith
-import org.scalatest.{FlatSpec, Matchers}
 import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
 import whisk.common.{ForcibleSemaphore, TransactionId}
-import whisk.core.entity.InvokerInstanceId
-import whisk.core.loadBalancer._
+import whisk.core.entity.{ByteSize, InvokerInstanceId, MemoryLimit}
 import whisk.core.loadBalancer.InvokerState._
+import whisk.core.loadBalancer._
 
 /**
  * Unit tests for the ContainerPool object.
@@ -43,13 +43,15 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
   def semaphores(count: Int, max: Int): IndexedSeq[ForcibleSemaphore] =
     IndexedSeq.fill(count)(new ForcibleSemaphore(max))
 
-  def lbConfig(blackboxFraction: Double, invokerBusyThreshold: Int) =
+  def lbConfig(blackboxFraction: Double, invokerBusyThreshold: ByteSize) =
     ShardingContainerPoolBalancerConfig(blackboxFraction, invokerBusyThreshold, 1)
 
   it should "update invoker's state, growing the slots data and keeping valid old data" in {
     // start empty
     val slots = 10
-    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, slots))
+    val memoryPerSlot = MemoryLimit.minMemory
+    val memory = memoryPerSlot * slots
+    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, memory))
     state.invokers shouldBe 'empty
     state.blackboxInvokers shouldBe 'empty
     state.managedInvokers shouldBe 'empty
@@ -65,13 +67,13 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
     state.blackboxInvokers shouldBe update1 // fallback to at least one
     state.managedInvokers shouldBe update1 // fallback to at least one
     state.invokerSlots should have size update1.size
-    state.invokerSlots.head.availablePermits shouldBe slots
+    state.invokerSlots.head.availablePermits shouldBe memory.toMB
     state.managedStepSizes shouldBe Seq(1)
     state.blackboxStepSizes shouldBe Seq(1)
 
     // aquire a slot to alter invoker state
-    state.invokerSlots.head.tryAcquire()
-    state.invokerSlots.head.availablePermits shouldBe slots - 1
+    state.invokerSlots.head.tryAcquire(memoryPerSlot.toMB.toInt)
+    state.invokerSlots.head.availablePermits shouldBe (memory - memoryPerSlot).toMB.toInt
 
     // apply second update, growing the state
     val update2 = IndexedSeq(healthy(0), healthy(1))
@@ -81,15 +83,15 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
     state.managedInvokers shouldBe IndexedSeq(update2.head)
     state.blackboxInvokers shouldBe IndexedSeq(update2.last)
     state.invokerSlots should have size update2.size
-    state.invokerSlots.head.availablePermits shouldBe slots - 1
-    state.invokerSlots(1).availablePermits shouldBe slots
+    state.invokerSlots.head.availablePermits shouldBe (memory - memoryPerSlot).toMB.toInt
+    state.invokerSlots(1).availablePermits shouldBe memory.toMB
     state.managedStepSizes shouldBe Seq(1)
     state.blackboxStepSizes shouldBe Seq(1)
   }
 
   it should "allow managed partition to overlap with blackbox for small N" in {
     Seq(0.1, 0.2, 0.3, 0.4, 0.5).foreach { bf =>
-      val state = ShardingContainerPoolBalancerState()(lbConfig(bf, 1))
+      val state = ShardingContainerPoolBalancerState()(lbConfig(bf, MemoryLimit.stdMemory))
 
       (1 to 100).toSeq.foreach { i =>
         state.updateInvokers((1 to i).map(_ => healthy(1)))
@@ -116,43 +118,49 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
 
   it should "update the cluster size, adjusting the invoker slots accordingly" in {
     val slots = 10
-    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, slots))
+    val memoryPerSlot = MemoryLimit.minMemory
+    val memory = memoryPerSlot * slots
+    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, memory))
     state.updateInvokers(IndexedSeq(healthy(0)))
 
-    state.invokerSlots.head.tryAcquire()
-    state.invokerSlots.head.availablePermits shouldBe slots - 1
+    state.invokerSlots.head.tryAcquire(memoryPerSlot.toMB.toInt)
+    state.invokerSlots.head.availablePermits shouldBe (memory - memoryPerSlot).toMB
 
     state.updateCluster(2)
-    state.invokerSlots.head.availablePermits shouldBe slots / 2 // state reset + divided by 2
+    state.invokerSlots.head.availablePermits shouldBe memory.toMB / 2 // state reset + divided by 2
   }
 
   it should "fallback to a size of 1 (alone) if cluster size is < 1" in {
     val slots = 10
-    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, slots))
+    val memoryPerSlot = MemoryLimit.minMemory
+    val memory = memoryPerSlot * slots
+    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, memory))
     state.updateInvokers(IndexedSeq(healthy(0)))
 
-    state.invokerSlots.head.availablePermits shouldBe slots
+    state.invokerSlots.head.availablePermits shouldBe memory.toMB
 
     state.updateCluster(2)
-    state.invokerSlots.head.availablePermits shouldBe slots / 2
+    state.invokerSlots.head.availablePermits shouldBe memory.toMB / 2
 
     state.updateCluster(0)
-    state.invokerSlots.head.availablePermits shouldBe slots
+    state.invokerSlots.head.availablePermits shouldBe memory.toMB
 
     state.updateCluster(-1)
-    state.invokerSlots.head.availablePermits shouldBe slots
+    state.invokerSlots.head.availablePermits shouldBe memory.toMB
   }
 
   it should "set the threshold to 1 if the cluster is bigger than there are slots on 1 invoker" in {
     val slots = 10
-    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, slots))
+    val memoryPerSlot = MemoryLimit.minMemory
+    val memory = memoryPerSlot * slots
+    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, memory))
     state.updateInvokers(IndexedSeq(healthy(0)))
 
-    state.invokerSlots.head.availablePermits shouldBe slots
+    state.invokerSlots.head.availablePermits shouldBe memory.toMB
 
     state.updateCluster(20)
 
-    state.invokerSlots.head.availablePermits shouldBe 1
+    state.invokerSlots.head.availablePermits shouldBe MemoryLimit.minMemory.toMB
   }
 
   behavior of "schedule"
@@ -160,7 +168,12 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
   implicit val transId = TransactionId.testing
 
   it should "return None on an empty invoker list" in {
-    ShardingContainerPoolBalancer.schedule(IndexedSeq.empty, IndexedSeq.empty, index = 0, step = 2) shouldBe None
+    ShardingContainerPoolBalancer.schedule(
+      IndexedSeq.empty,
+      IndexedSeq.empty,
+      MemoryLimit.minMemory.toMB.toInt,
+      index = 0,
+      step = 2) shouldBe None
   }
 
   it should "return None if no invokers are healthy" in {
@@ -168,7 +181,12 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
     val invokerSlots = semaphores(invokerCount, 3)
     val invokers = (0 until invokerCount).map(unhealthy)
 
-    ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 0, step = 2) shouldBe None
+    ShardingContainerPoolBalancer.schedule(
+      invokers,
+      invokerSlots,
+      MemoryLimit.minMemory.toMB.toInt,
+      index = 0,
+      step = 2) shouldBe None
   }
 
   it should "choose the first available invoker, jumping in stepSize steps, falling back to randomized scheduling once all invokers are full" in {
@@ -178,13 +196,19 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
 
     val expectedResult = Seq(3, 3, 3, 5, 5, 5, 4, 4, 4)
     val result = expectedResult.map { _ =>
-      ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 0, step = 2).get.toInt
+      ShardingContainerPoolBalancer
+        .schedule(invokers, invokerSlots, 1, index = 0, step = 2)
+        .get
+        .toInt
     }
 
     result shouldBe expectedResult
 
     val bruteResult = (0 to 100).map { _ =>
-      ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 0, step = 2).get.toInt
+      ShardingContainerPoolBalancer
+        .schedule(invokers, invokerSlots, 1, index = 0, step = 2)
+        .get
+        .toInt
     }
 
     bruteResult should contain allOf (3, 4, 5)
@@ -196,20 +220,43 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
 
     val expectedResult = Seq(0, 0, 0, 3, 3, 3)
     val result = expectedResult.map { _ =>
-      ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 0, step = 1).get.toInt
+      ShardingContainerPoolBalancer
+        .schedule(invokers, invokerSlots, 1, index = 0, step = 1)
+        .get
+        .toInt
     }
 
     result shouldBe expectedResult
 
     // more schedules will result in randomized invokers, but the unhealthy and offline invokers should not be part
     val bruteResult = (0 to 100).map { _ =>
-      ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 0, step = 1).get.toInt
+      ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, 1, index = 0, step = 1).get.toInt
     }
 
     bruteResult should contain allOf (0, 3)
     bruteResult should contain noneOf (1, 2)
   }
 
+  it should "only take invokers that have enough free slots" in {
+    val invokerCount = 3
+    // Each invoker has 4 slots
+    val invokerSlots = semaphores(invokerCount, 4)
+    val invokers = (0 until invokerCount).map(i => healthy(i))
+
+    // Ask for three slots -> First invoker should be used
+    ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, 3, index = 0, step = 1).get.toInt shouldBe 0
+    // Ask for two slots -> Second invoker should be used
+    ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, 2, index = 0, step = 1).get.toInt shouldBe 1
+    // Ask for 1 slot -> First invoker should be used
+    ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, 1, index = 0, step = 1).get.toInt shouldBe 0
+    // Ask for 4 slots -> Third invoker should be used
+    ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, 4, index = 0, step = 1).get.toInt shouldBe 2
+    // Ask for 2 slots -> Second invoker should be used
+    ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, 2, index = 0, step = 1).get.toInt shouldBe 1
+
+    invokerSlots.foreach(_.availablePermits shouldBe 0)
+  }
+
   behavior of "pairwiseCoprimeNumbersUntil"
 
   it should "return an empty set for malformed inputs" in {


Mime
View raw message