aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject incubator-aurora git commit: Prevent Aurora from creating zero sized Executor tasks.
Date Sat, 22 Nov 2014 01:00:54 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 5116c2209 -> a431b1d6b


Prevent Aurora from creating zero sized Executor tasks.

Bugs closed: AURORA-928

Reviewed at https://reviews.apache.org/r/28193/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/a431b1d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/a431b1d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/a431b1d6

Branch: refs/heads/master
Commit: a431b1d6b98c514448346a9c6af0c90951db9b4d
Parents: 5116c22
Author: Zameer Manji <zmanji@twopensource.com>
Authored: Fri Nov 21 17:00:37 2014 -0800
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Fri Nov 21 17:00:37 2014 -0800

----------------------------------------------------------------------
 .../apache/aurora/scheduler/ResourceSlot.java   |  35 +++----
 .../aurora/scheduler/app/SchedulerMain.java     |  16 ++-
 .../scheduler/mesos/MesosTaskFactory.java       |  94 ++++++++++++++---
 .../aurora/scheduler/app/SchedulerIT.java       |  14 ++-
 .../mesos/MesosTaskFactoryImplTest.java         | 104 ++++++++++++++++---
 5 files changed, 210 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a431b1d6/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
index 46832d5..0b15834 100644
--- a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
+++ b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
@@ -38,34 +38,32 @@ public final class ResourceSlot {
   private final Resources resources;
 
   /**
-   * CPU allocated for each executor.
+   * Extra CPU allocated for each executor.
    */
   @VisibleForTesting
   @CmdLine(name = "thermos_executor_cpu",
       help = "The number of CPU cores to allocate for each instance of the executor.")
-  public static final Arg<Double> EXECUTOR_CPUS = Arg.create(0.25);
+  public static final Arg<Double> EXECUTOR_OVERHEAD_CPUS = Arg.create(0.25);
 
   /**
-   * RAM required for the executor.  Executors in the wild have been observed using 48-54MB
RSS,
-   * setting to 128MB to be extra vigilant initially.
+   * Extra RAM allocated for the executor.
    */
   @VisibleForTesting
   @CmdLine(name = "thermos_executor_ram",
       help = "The amount of RAM to allocate for each instance of the executor.")
-  public static final Arg<Amount<Integer, Data>> EXECUTOR_RAM =
-  Arg.create(Amount.of(128, Data.MB));
+  public static final Arg<Amount<Long, Data>> EXECUTOR_OVERHEAD_RAM =
+      Arg.create(Amount.of(128L, Data.MB));
 
   private ResourceSlot(Resources r) {
     this.resources = r;
   }
 
   public static ResourceSlot from(ITaskConfig task) {
-    double totalCPU = task.getNumCpus() + EXECUTOR_CPUS.get();
-    Amount<Long, Data> totalRAM =
-        Amount.of(task.getRamMb() + EXECUTOR_RAM.get().as(Data.MB), Data.MB);
-    Amount<Long, Data> disk = Amount.of(task.getDiskMb(), Data.MB);
-    return new ResourceSlot(
-        new Resources(totalCPU, totalRAM, disk, task.getRequestedPorts().size()));
+    return from(
+        task.getNumCpus(),
+        Amount.of(task.getRamMb(), Data.MB),
+        Amount.of(task.getDiskMb(), Data.MB),
+        task.getRequestedPorts().size());
   }
 
   public static ResourceSlot from(Offer offer) {
@@ -89,13 +87,14 @@ public final class ResourceSlot {
   }
 
   @VisibleForTesting
-  public static ResourceSlot from(double cpu,
-                                  Amount<Long, Data> ram,
-                                  Amount<Long, Data> disk,
-                                  int ports) {
-    double totalCPU = cpu + EXECUTOR_CPUS.get();
+  public static ResourceSlot from(
+      double cpu,
+      Amount<Long, Data> ram,
+      Amount<Long, Data> disk,
+      int ports) {
+    double totalCPU = cpu + EXECUTOR_OVERHEAD_CPUS.get();
     Amount<Long, Data> totalRAM =
-        Amount.of(ram.as(Data.MB) + EXECUTOR_RAM.get().as(Data.MB), Data.MB);
+        Amount.of(ram.as(Data.MB) + EXECUTOR_OVERHEAD_RAM.get().as(Data.MB), Data.MB);
 
     return new ResourceSlot(new Resources(totalCPU, totalRAM, disk, ports));
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a431b1d6/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index 288e7cb..72c7545 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -35,6 +35,8 @@ import com.twitter.common.args.constraints.NotEmpty;
 import com.twitter.common.args.constraints.NotNull;
 import com.twitter.common.inject.Bindings;
 import com.twitter.common.logging.RootLogConfig;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
 import com.twitter.common.zookeeper.Group;
 import com.twitter.common.zookeeper.SingletonService;
 import com.twitter.common.zookeeper.SingletonService.LeadershipListener;
@@ -46,11 +48,12 @@ import org.apache.aurora.auth.CapabilityValidator;
 import org.apache.aurora.auth.SessionValidator;
 import org.apache.aurora.auth.UnsecureAuthModule;
 import org.apache.aurora.scheduler.SchedulerLifecycle;
+import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.cron.quartz.CronModule;
 import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;
 import org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule;
 import org.apache.aurora.scheduler.mesos.LibMesosLoadingModule;
-import org.apache.aurora.scheduler.mesos.MesosTaskFactory.ExecutorConfig;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory.ExecutorSettings;
 import org.apache.aurora.scheduler.storage.backup.BackupModule;
 import org.apache.aurora.scheduler.storage.db.DbModule;
 import org.apache.aurora.scheduler.storage.db.MigrationModule;
@@ -64,6 +67,9 @@ import org.apache.aurora.scheduler.thrift.auth.ThriftAuthModule;
 
 import static com.twitter.common.logging.RootLogConfig.Configuration;
 
+import static org.apache.aurora.scheduler.ResourceSlot.EXECUTOR_OVERHEAD_CPUS;
+import static org.apache.aurora.scheduler.ResourceSlot.EXECUTOR_OVERHEAD_RAM;
+
 /**
  * Launcher for the aurora scheduler.
  */
@@ -178,7 +184,13 @@ public class SchedulerMain extends AbstractApplication {
         .add(new AbstractModule() {
           @Override
           protected void configure() {
-            bind(ExecutorConfig.class).toInstance(new ExecutorConfig(THERMOS_EXECUTOR_PATH.get()));
+            Resources executorOverhead = new Resources(
+                EXECUTOR_OVERHEAD_CPUS.get(),
+                EXECUTOR_OVERHEAD_RAM.get(),
+                Amount.of(0L, Data.MB),
+                0);
+            bind(ExecutorSettings.class)
+                .toInstance(new ExecutorSettings(THERMOS_EXECUTOR_PATH.get(), executorOverhead));
           }
         })
         .add(getMesosModules())

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a431b1d6/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
index bb227fd..e0332c0 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
@@ -23,11 +23,11 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.protobuf.ByteString;
+import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 
 import org.apache.aurora.Protobufs;
 import org.apache.aurora.codec.ThriftBinaryCodec;
-import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.CommandUtil;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.SchedulerException;
@@ -62,16 +62,22 @@ public interface MesosTaskFactory {
    */
   TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException;
 
-  class ExecutorConfig {
+  class ExecutorSettings {
     private final String executorPath;
+    private final Resources executorOverhead;
 
-    public ExecutorConfig(String executorPath) {
+    public ExecutorSettings(String executorPath, Resources executorOverhead) {
       this.executorPath = checkNotBlank(executorPath);
+      this.executorOverhead = requireNonNull(executorOverhead);
     }
 
     String getExecutorPath() {
       return executorPath;
     }
+
+    Resources getExecutorOverhead() {
+      return executorOverhead;
+    }
   }
 
   // TODO(wfarner): Move this class to its own file to reduce visibility to package private.
@@ -80,16 +86,39 @@ public interface MesosTaskFactory {
     private static final String EXECUTOR_PREFIX = "thermos-";
 
     /**
+     * Minimum resources required to run Thermos. In the wild Thermos needs about 0.01 CPU
and
+     * about 100MB of RAM. The RAM requirement has been rounded up to a power of 2.
+     */
+    @VisibleForTesting
+    static final Resources MIN_THERMOS_RESOURCES = new Resources(
+        0.01,
+        Amount.of(128L, Data.MB),
+        Amount.of(0L, Data.MB),
+        0);
+
+    /**
+     * Minimum resources to allocate for a task. Mesos rejects tasks that have no CPU or
no RAM.
+     */
+    @VisibleForTesting
+    static final Resources MIN_TASK_RESOURCES = new Resources(
+        0.01,
+        Amount.of(1L, Data.MB),
+        Amount.of(0L, Data.MB),
+        0);
+
+    /**
      * Name to associate with task executors.
      */
     @VisibleForTesting
     static final String EXECUTOR_NAME = "aurora.task";
 
     private final String executorPath;
+    private final Resources executorOverhead;
 
     @Inject
-    MesosTaskFactoryImpl(ExecutorConfig executorConfig) {
-      this.executorPath = executorConfig.getExecutorPath();
+    MesosTaskFactoryImpl(ExecutorSettings executorSettings) {
+      this.executorPath = executorSettings.getExecutorPath();
+      this.executorOverhead = executorSettings.getExecutorOverhead();
     }
 
     @VisibleForTesting
@@ -97,21 +126,46 @@ public interface MesosTaskFactory {
       return ExecutorID.newBuilder().setValue(EXECUTOR_PREFIX + taskId).build();
     }
 
-    public static String getJobSourceName(IJobKey jobkey) {
+    private static String getJobSourceName(IJobKey jobkey) {
       return String.format("%s.%s.%s", jobkey.getRole(), jobkey.getEnvironment(), jobkey.getName());
     }
 
-    public static String getJobSourceName(ITaskConfig task) {
+    private static String getJobSourceName(ITaskConfig task) {
       return getJobSourceName(task.getJob());
     }
 
-    public static String getInstanceSourceName(ITaskConfig task, int instanceId) {
+    @VisibleForTesting
+    static String getInstanceSourceName(ITaskConfig task, int instanceId) {
       return String.format("%s.%s", getJobSourceName(task), instanceId);
     }
 
+    /**
+     * Generates a Resource where each resource component is a max out of the two components.
+     *
+     * @param a A resource to compare.
+     * @param b A resource to compare.
+     *
+     * @return Returns a Resources instance where each component is a max of the two components.
+     */
+    @VisibleForTesting
+    static Resources maxElements(Resources a, Resources b) {
+      double maxCPU = Math.max(a.getNumCpus(), b.getNumCpus());
+      Amount<Long, Data> maxRAM = Amount.of(
+          Math.max(a.getRam().as(Data.MB), b.getRam().as(Data.MB)),
+          Data.MB);
+      Amount<Long, Data> maxDisk = Amount.of(
+          Math.max(a.getDisk().as(Data.MB), b.getDisk().as(Data.MB)),
+          Data.MB);
+      int maxPorts = Math.max(a.getNumPorts(), b.getNumPorts());
+
+      return new Resources(maxCPU, maxRAM, maxDisk, maxPorts);
+    }
+
     @Override
     public TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException
{
       requireNonNull(task);
+      requireNonNull(slaveId);
+
       byte[] taskInBytes;
       try {
         taskInBytes = ThriftBinaryCodec.encode(task.newBuilder());
@@ -120,9 +174,25 @@ public interface MesosTaskFactory {
         throw new SchedulerException("Internal error.", e);
       }
 
+      // The objective of the below code is to allocate a task and executor that is in a
container
+      // of task + executor overhead size. Mesos stipulates that we cannot allocate 0 sized
tasks or
+      // executors and we should always ensure the ExecutorInfo has enough resources to launch
or
+      // run an executor. Therefore the total desired container size (task + executor overhead)
is
+      // partitioned to a small portion that is always allocated to the executor and the
rest to the
+      // task. If the remaining resources are not enough for the task a small epsilon is
allocated
+      // to the task.
+
       ITaskConfig config = task.getTask();
+      Resources taskResources = Resources.from(config);
+      Resources containerResources = Resources.sum(taskResources, executorOverhead);
+
+      taskResources = Resources.subtract(containerResources, MIN_THERMOS_RESOURCES);
+      // It is possible that the final task resources will be negative.
+      // This ensures the task resources are positive.
+      Resources finalTaskResources = maxElements(taskResources, MIN_TASK_RESOURCES);
+
       // TODO(wfarner): Re-evaluate if/why we need to continue handling unset assignedPorts
field.
-      List<Resource> resources = Resources.from(config)
+      List<Resource> resources = finalTaskResources
           .toResourceList(task.isSetAssignedPorts()
               ? ImmutableSet.copyOf(task.getAssignedPorts().values())
               : ImmutableSet.<Integer>of());
@@ -144,11 +214,7 @@ public interface MesosTaskFactory {
           .setExecutorId(getExecutorId(task.getTaskId()))
           .setName(EXECUTOR_NAME)
           .setSource(getInstanceSourceName(config, task.getInstanceId()))
-          .addResources(
-              Resources.makeMesosResource(Resources.CPUS, ResourceSlot.EXECUTOR_CPUS.get()))
-          .addResources(Resources.makeMesosResource(
-              Resources.RAM_MB,
-              ResourceSlot.EXECUTOR_RAM.get().as(Data.MB)))
+          .addAllResources(MIN_THERMOS_RESOURCES.toResourceList())
           .build();
       return taskBuilder
           .setExecutor(executor)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a431b1d6/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index bf6cfad..5e54364 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -49,6 +49,7 @@ import com.twitter.common.application.modules.LifecycleModule;
 import com.twitter.common.base.ExceptionalCommand;
 import com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor;
 import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.Stats;
 import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
@@ -76,13 +77,14 @@ import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.gen.storage.Transaction;
 import org.apache.aurora.gen.storage.storageConstants;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
+import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.log.Log;
 import org.apache.aurora.scheduler.log.Log.Entry;
 import org.apache.aurora.scheduler.log.Log.Position;
 import org.apache.aurora.scheduler.log.Log.Stream;
 import org.apache.aurora.scheduler.mesos.DriverFactory;
 import org.apache.aurora.scheduler.mesos.DriverSettings;
-import org.apache.aurora.scheduler.mesos.MesosTaskFactory.ExecutorConfig;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory.ExecutorSettings;
 import org.apache.aurora.scheduler.storage.backup.BackupModule;
 import org.apache.aurora.scheduler.storage.log.EntrySerializer;
 import org.apache.aurora.scheduler.storage.log.LogStorageModule;
@@ -104,6 +106,8 @@ import org.junit.Test;
 
 import static com.twitter.common.testing.easymock.EasyMockTest.createCapture;
 
+import static org.apache.aurora.scheduler.ResourceSlot.EXECUTOR_OVERHEAD_CPUS;
+import static org.apache.aurora.scheduler.ResourceSlot.EXECUTOR_OVERHEAD_RAM;
 import static org.apache.mesos.Protos.FrameworkInfo;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.createControl;
@@ -194,7 +198,13 @@ public class SchedulerIT extends BaseZooKeeperTest {
         bind(DriverFactory.class).toInstance(driverFactory);
         bind(DriverSettings.class).toInstance(SETTINGS);
         bind(Log.class).toInstance(log);
-        bind(ExecutorConfig.class).toInstance(new ExecutorConfig("/executor/thermos"));
+        Resources executorOverhead = new Resources(
+            EXECUTOR_OVERHEAD_CPUS.get(),
+            EXECUTOR_OVERHEAD_RAM.get(),
+            Amount.of(0L, Data.MB),
+            0);
+        bind(ExecutorSettings.class)
+            .toInstance(new ExecutorSettings("/executor/thermos", executorOverhead));
         install(new BackupModule(backupDir, SnapshotStoreImpl.class));
       }
     };

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a431b1d6/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
index 953c1ed..22fb991 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
@@ -15,15 +15,15 @@ package org.apache.aurora.scheduler.mesos;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.mesos.MesosTaskFactory.ExecutorConfig;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory.ExecutorSettings;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.mesos.Protos.CommandInfo;
@@ -31,10 +31,12 @@ import org.apache.mesos.Protos.CommandInfo.URI;
 import org.apache.mesos.Protos.ExecutorInfo;
 import org.apache.mesos.Protos.SlaveID;
 import org.apache.mesos.Protos.TaskInfo;
-import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl.MIN_TASK_RESOURCES;
+import static org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl.MIN_THERMOS_RESOURCES;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class MesosTaskFactoryImplTest {
 
@@ -53,22 +55,26 @@ public class MesosTaskFactoryImplTest {
           .setNumCpus(5)
           .setRequestedPorts(ImmutableSet.of("http"))));
   private static final SlaveID SLAVE = SlaveID.newBuilder().setValue("slave-id").build();
+  private static final Resources SOME_EXECUTOR_OVERHEAD = new Resources(
+      0.01,
+      Amount.of(128L, Data.MB),
+      Amount.of(0L, Data.MB),
+      0);
 
-  private MesosTaskFactory taskFactory;
+  private static final Resources NO_EXECUTOR_OVERHEAD = new Resources(
+      0,
+      Amount.of(0L, Data.MB),
+      Amount.of(0L, Data.MB),
+      0);
 
-  @Before
-  public void setUp() {
-    taskFactory = new MesosTaskFactoryImpl(new ExecutorConfig(EXECUTOR_PATH));
-  }
+  private MesosTaskFactory taskFactory;
+  private ExecutorSettings config;
 
   private static final ExecutorInfo DEFAULT_EXECUTOR = ExecutorInfo.newBuilder()
       .setExecutorId(MesosTaskFactoryImpl.getExecutorId(TASK.getTaskId()))
       .setName(MesosTaskFactoryImpl.EXECUTOR_NAME)
       .setSource(MesosTaskFactoryImpl.getInstanceSourceName(TASK.getTask(), TASK.getInstanceId()))
-      .addResources(Resources.makeMesosResource(Resources.CPUS, ResourceSlot.EXECUTOR_CPUS.get()))
-      .addResources(Resources.makeMesosResource(
-          Resources.RAM_MB,
-          ResourceSlot.EXECUTOR_RAM.get().as(Data.MB)))
+      .addAllResources(MIN_THERMOS_RESOURCES.toResourceList())
       .setCommand(CommandInfo.newBuilder()
           .setValue("./executor.sh")
           .addUris(URI.newBuilder().setValue(EXECUTOR_PATH).setExecutable(true)))
@@ -76,12 +82,24 @@ public class MesosTaskFactoryImplTest {
 
   @Test
   public void testExecutorInfoUnchanged() {
+    config = new ExecutorSettings(EXECUTOR_PATH, SOME_EXECUTOR_OVERHEAD);
+    taskFactory = new MesosTaskFactoryImpl(config);
     TaskInfo task = taskFactory.createFrom(TASK, SLAVE);
     assertEquals(DEFAULT_EXECUTOR, task.getExecutor());
+    double taskCPU = config.getExecutorOverhead().getNumCpus()
+        + TASK.getTask().getNumCpus()
+        - MIN_THERMOS_RESOURCES.getNumCpus();
+    long taskRamMB = config.getExecutorOverhead().getRam().as(Data.MB)
+        + TASK.getTask().getRamMb()
+        - MIN_THERMOS_RESOURCES.getRam().as(Data.MB);
+
+    assertTrue(taskCPU > 0.0);
+    assertTrue(taskRamMB > 0);
+
     assertEquals(ImmutableSet.of(
-            Resources.makeMesosResource(Resources.CPUS, TASK.getTask().getNumCpus()),
-            Resources.makeMesosResource(Resources.RAM_MB, TASK.getTask().getRamMb()),
+            Resources.makeMesosResource(Resources.CPUS, taskCPU),
             Resources.makeMesosResource(Resources.DISK_MB, TASK.getTask().getDiskMb()),
+            Resources.makeMesosResource(Resources.RAM_MB, taskRamMB),
             Resources.makeMesosRangeResource(
                 Resources.PORTS,
                 ImmutableSet.copyOf(TASK.getAssignedPorts().values()))
@@ -91,15 +109,67 @@ public class MesosTaskFactoryImplTest {
 
   @Test
   public void testCreateFromPortsUnset() {
+    config = new ExecutorSettings(EXECUTOR_PATH, SOME_EXECUTOR_OVERHEAD);
+    taskFactory = new MesosTaskFactoryImpl(config);
     AssignedTask assignedTask = TASK.newBuilder();
     assignedTask.unsetAssignedPorts();
     TaskInfo task = taskFactory.createFrom(IAssignedTask.build(assignedTask), SLAVE);
+
+    double taskCPU = config.getExecutorOverhead().getNumCpus()
+        + TASK.getTask().getNumCpus()
+        - MIN_THERMOS_RESOURCES.getNumCpus();
+    long taskRamMB = config.getExecutorOverhead().getRam().as(Data.MB)
+        + TASK.getTask().getRamMb()
+        - MIN_THERMOS_RESOURCES.getRam().as(Data.MB);
+
+    assertTrue(taskCPU > 0.0);
+    assertTrue(taskRamMB > 0);
+
     assertEquals(DEFAULT_EXECUTOR, task.getExecutor());
     assertEquals(ImmutableSet.of(
-            Resources.makeMesosResource(Resources.CPUS, TASK.getTask().getNumCpus()),
-            Resources.makeMesosResource(Resources.RAM_MB, TASK.getTask().getRamMb()),
-            Resources.makeMesosResource(Resources.DISK_MB, TASK.getTask().getDiskMb())
+            Resources.makeMesosResource(Resources.CPUS, taskCPU),
+            Resources.makeMesosResource(Resources.DISK_MB, TASK.getTask().getDiskMb()),
+            Resources.makeMesosResource(Resources.RAM_MB, taskRamMB)
         ),
         ImmutableSet.copyOf(task.getResourcesList()));
   }
+
+  @Test
+  public void testExecutorInfoNoOverhead() {
+    // Here the ram required for the executor is greater than the sum of task resources
+    // + executor overhead. We need to ensure we allocate a non-zero amount of ram in this
case.
+    config = new ExecutorSettings(EXECUTOR_PATH, NO_EXECUTOR_OVERHEAD);
+    taskFactory = new MesosTaskFactoryImpl(config);
+    TaskInfo task = taskFactory.createFrom(TASK, SLAVE);
+    assertEquals(DEFAULT_EXECUTOR, task.getExecutor());
+
+    double taskCPU = config.getExecutorOverhead().getNumCpus()
+        + TASK.getTask().getNumCpus()
+        - MIN_THERMOS_RESOURCES.getNumCpus();
+
+    assertTrue(taskCPU > 0.0);
+
+    assertEquals(ImmutableSet.of(
+      Resources.makeMesosResource(Resources.CPUS, taskCPU),
+      Resources.makeMesosResource(Resources.RAM_MB, MIN_TASK_RESOURCES.getRam().as(Data.MB)),
+      Resources.makeMesosResource(Resources.DISK_MB, TASK.getTask().getDiskMb()),
+      Resources.makeMesosRangeResource(
+          Resources.PORTS,
+          ImmutableSet.copyOf(TASK.getAssignedPorts().values()))
+    ),
+    ImmutableSet.copyOf(task.getResourcesList()));
+  }
+
+  @Test
+  public void testMaxElements() {
+    Resources highRAM = new Resources(1, Amount.of(8L, Data.GB), Amount.of(10L, Data.MB),
0);
+    Resources rest = new Resources(10, Amount.of(1L, Data.MB), Amount.of(10L, Data.GB), 1);
+
+    Resources result = MesosTaskFactoryImpl.maxElements(highRAM, rest);
+    assertEquals(result.getNumCpus(), 10, 0.001);
+    assertEquals(result.getRam(), Amount.of(8L, Data.GB));
+    assertEquals(result.getDisk(), Amount.of(10L, Data.GB));
+    assertEquals(result.getNumPorts(), 1);
+  }
+
 }


Mime
View raw message