hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vrush...@apache.org
Subject [05/50] hadoop git commit: YARN-6608. Backport all SLS improvements from trunk to branch-2. (Carlo Curino via wangda)
Date Fri, 20 Oct 2017 18:23:31 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index f40f47d..f999dce 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -31,60 +31,77 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.tools.rumen.JobTraceReader;
 import org.apache.hadoop.tools.rumen.LoggedJob;
 import org.apache.hadoop.tools.rumen.LoggedTask;
 import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
 import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
-import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
-import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper;
+import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher;
 import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
 import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
+import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
 import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
+import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
+import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
 import org.apache.hadoop.yarn.sls.utils.SLSUtils;
+import org.apache.hadoop.yarn.util.UTCClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Private
 @Unstable
-public class SLSRunner {
+public class SLSRunner extends Configured implements Tool {
   // RM, Runner
   private ResourceManager rm;
   private static TaskRunner runner = new TaskRunner();
   private String[] inputTraces;
-  private Configuration conf;
   private Map<String, Integer> queueAppNumMap;
-  
+
   // NM simulator
   private HashMap<NodeId, NMSimulator> nmMap;
   private int nmMemoryMB, nmVCores;
   private String nodeFile;
-  
+
   // AM simulator
   private int AM_ID;
   private Map<String, AMSimulator> amMap;
@@ -99,49 +116,92 @@ public class SLSRunner {
   // other simulation information
   private int numNMs, numRacks, numAMs, numTasks;
   private long maxRuntime;
-  public final static Map<String, Object> simulateInfoMap =
+
+  private final static Map<String, Object> simulateInfoMap =
           new HashMap<String, Object>();
 
   // logger
   public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class);
 
-  // input traces, input-rumen or input-sls
-  private boolean isSLS;
-  
-  public SLSRunner(boolean isSLS, String inputTraces[], String nodeFile,
-                   String outputDir, Set<String> trackedApps,
-                   boolean printsimulation)
-          throws IOException, ClassNotFoundException {
-    this.isSLS = isSLS;
-    this.inputTraces = inputTraces.clone();
-    this.nodeFile = nodeFile;
-    this.trackedApps = trackedApps;
-    this.printSimulation = printsimulation;
-    metricsOutputDir = outputDir;
-    
-    nmMap = new HashMap<NodeId, NMSimulator>();
-    queueAppNumMap = new HashMap<String, Integer>();
-    amMap = new HashMap<String, AMSimulator>();
-    amClassMap = new HashMap<String, Class>();
-    
+  private final static int DEFAULT_MAPPER_PRIORITY = 20;
+  private final static int DEFAULT_REDUCER_PRIORITY = 10;
+
+  private static boolean exitAtTheFinish = false;
+
+  /**
+   * The type of trace in input.
+   */
+  public enum TraceType {
+    SLS, RUMEN, SYNTH
+  }
+
+  private TraceType inputType;
+  private SynthTraceJobProducer stjp;
+
+  public SLSRunner() throws ClassNotFoundException {
+    Configuration tempConf = new Configuration(false);
+    init(tempConf);
+  }
+
+  public SLSRunner(Configuration tempConf) throws ClassNotFoundException {
+    init(tempConf);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    if (null != conf) {
+      // Override setConf to make sure all conf added load sls-runner.xml, see
+      // YARN-6560
+      conf.addResource("sls-runner.xml");
+    }
+    super.setConf(conf);
+  }
+
+  private void init(Configuration tempConf) throws ClassNotFoundException {
+    nmMap = new HashMap<>();
+    queueAppNumMap = new HashMap<>();
+    amMap = new ConcurrentHashMap<>();
+    amClassMap = new HashMap<>();
+
     // runner configuration
-    conf = new Configuration(false);
-    conf.addResource("sls-runner.xml");
+    setConf(tempConf);
+
     // runner
-    int poolSize = conf.getInt(SLSConfiguration.RUNNER_POOL_SIZE,
-                                SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT);
+    int poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE,
+        SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT);
     SLSRunner.runner.setQueueSize(poolSize);
     // <AMType, Class> map
-    for (Map.Entry e : conf) {
+    for (Map.Entry e : tempConf) {
       String key = e.getKey().toString();
       if (key.startsWith(SLSConfiguration.AM_TYPE)) {
         String amType = key.substring(SLSConfiguration.AM_TYPE.length());
-        amClassMap.put(amType, Class.forName(conf.get(key)));
+        amClassMap.put(amType, Class.forName(tempConf.get(key)));
       }
     }
   }
-  
-  public void start() throws Exception {
+
+  /**
+   * @return an unmodifiable view of the simulated info map.
+   */
+  public static Map<String, Object> getSimulateInfoMap() {
+    return Collections.unmodifiableMap(simulateInfoMap);
+  }
+
+  public void setSimulationParams(TraceType inType, String[] inTraces,
+      String nodes, String outDir, Set<String> trackApps,
+      boolean printsimulation) throws IOException, ClassNotFoundException {
+
+    this.inputType = inType;
+    this.inputTraces = inTraces.clone();
+    this.nodeFile = nodes;
+    this.trackedApps = trackApps;
+    this.printSimulation = printsimulation;
+    metricsOutputDir = outDir;
+
+  }
+
+  public void start() throws IOException, ClassNotFoundException, YarnException,
+      InterruptedException {
     // start resource manager
     startRM();
     // start node managers
@@ -149,10 +209,10 @@ public class SLSRunner {
     // start application masters
     startAM();
     // set queue & tracked apps information
-    ((SchedulerWrapper) rm.getResourceScheduler())
-                            .setQueueSet(this.queueAppNumMap.keySet());
-    ((SchedulerWrapper) rm.getResourceScheduler())
-                            .setTrackedAppSet(this.trackedApps);
+    ((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
+        .setQueueSet(this.queueAppNumMap.keySet());
+    ((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
+        .setTrackedAppSet(this.trackedApps);
     // print out simulation info
     printSimulationInfo();
     // blocked until all nodes RUNNING
@@ -160,66 +220,92 @@ public class SLSRunner {
     // starting the runner once everything is ready to go,
     runner.start();
   }
-  
-  private void startRM() throws IOException, ClassNotFoundException {
-    Configuration rmConf = new YarnConfiguration();
+
+  private void startRM() throws ClassNotFoundException, YarnException {
+    Configuration rmConf = new YarnConfiguration(getConf());
     String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER);
 
-    // For CapacityScheduler we use a sub-classing instead of wrapping
-    // to allow scheduler-specific invocations from monitors to work
-    // this can be used for other schedulers as well if we care to
-    // exercise/track behaviors that are not common to the scheduler api
-    if(Class.forName(schedulerClass) == CapacityScheduler.class) {
+    if (Class.forName(schedulerClass) == CapacityScheduler.class) {
       rmConf.set(YarnConfiguration.RM_SCHEDULER,
           SLSCapacityScheduler.class.getName());
       rmConf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
       rmConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
           ProportionalCapacityPreemptionPolicy.class.getName());
-    } else {
+    } else if (Class.forName(schedulerClass) == FairScheduler.class) {
       rmConf.set(YarnConfiguration.RM_SCHEDULER,
-              ResourceSchedulerWrapper.class.getName());
-      rmConf.set(SLSConfiguration.RM_SCHEDULER, schedulerClass);
+          SLSFairScheduler.class.getName());
+    } else if (Class.forName(schedulerClass) == FifoScheduler.class) {
+      // TODO add support for FifoScheduler
+      throw new YarnException("Fifo Scheduler is not supported yet.");
     }
 
     rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir);
-    rm = new ResourceManager();
+
+    final SLSRunner se = this;
+    rm = new ResourceManager() {
+      @Override
+      protected ApplicationMasterLauncher createAMLauncher() {
+        return new MockAMLauncher(se, this.rmContext, amMap);
+      }
+    };
+
+    // Across runs of parametrized tests, the JvmMetrics objects is retained,
+    // but is not registered correctly
+    JvmMetrics jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null);
+    jvmMetrics.registerIfNeeded();
+
+    // Init and start the actual ResourceManager
     rm.init(rmConf);
     rm.start();
   }
 
   private void startNM() throws YarnException, IOException {
     // nm configuration
-    nmMemoryMB = conf.getInt(SLSConfiguration.NM_MEMORY_MB,
-            SLSConfiguration.NM_MEMORY_MB_DEFAULT);
-    nmVCores = conf.getInt(SLSConfiguration.NM_VCORES,
-            SLSConfiguration.NM_VCORES_DEFAULT);
-    int heartbeatInterval = conf.getInt(
-            SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS,
+    nmMemoryMB = getConf().getInt(SLSConfiguration.NM_MEMORY_MB,
+        SLSConfiguration.NM_MEMORY_MB_DEFAULT);
+    nmVCores = getConf().getInt(SLSConfiguration.NM_VCORES,
+        SLSConfiguration.NM_VCORES_DEFAULT);
+    int heartbeatInterval =
+        getConf().getInt(SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS,
             SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT);
     // nm information (fetch from topology file, or from sls/rumen json file)
     Set<String> nodeSet = new HashSet<String>();
     if (nodeFile.isEmpty()) {
-      if (isSLS) {
-        for (String inputTrace : inputTraces) {
+      for (String inputTrace : inputTraces) {
+
+        switch (inputType) {
+        case SLS:
           nodeSet.addAll(SLSUtils.parseNodesFromSLSTrace(inputTrace));
-        }
-      } else {
-        for (String inputTrace : inputTraces) {
+          break;
+        case RUMEN:
           nodeSet.addAll(SLSUtils.parseNodesFromRumenTrace(inputTrace));
+          break;
+        case SYNTH:
+          stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
+          nodeSet.addAll(SLSUtils.generateNodes(stjp.getNumNodes(),
+              stjp.getNumNodes()/stjp.getNodesPerRack()));
+          break;
+        default:
+          throw new YarnException("Input configuration not recognized, "
+              + "trace type should be SLS, RUMEN, or SYNTH");
         }
       }
-
     } else {
       nodeSet.addAll(SLSUtils.parseNodesFromNodeFile(nodeFile));
     }
+
+    if (nodeSet.size() == 0) {
+      throw new YarnException("No node! Please configure nodes.");
+    }
+
     // create NM simulators
     Random random = new Random();
     Set<String> rackSet = new HashSet<String>();
     for (String hostName : nodeSet) {
       // we randomize the heartbeat start time from zero to 1 interval
       NMSimulator nm = new NMSimulator();
-      nm.init(hostName, nmMemoryMB, nmVCores, 
-          random.nextInt(heartbeatInterval), heartbeatInterval, rm);
+      nm.init(hostName, nmMemoryMB, nmVCores, random.nextInt(heartbeatInterval),
+          heartbeatInterval, rm);
       nmMap.put(nm.getNode().getNodeID(), nm);
       runner.schedule(nm);
       rackSet.add(nm.getNode().getRackName());
@@ -234,7 +320,7 @@ public class SLSRunner {
       int numRunningNodes = 0;
       for (RMNode node : rm.getRMContext().getRMNodes().values()) {
         if (node.getState() == NodeState.RUNNING) {
-          numRunningNodes ++;
+          numRunningNodes++;
         }
       }
       if (numRunningNodes == numNMs) {
@@ -250,209 +336,433 @@ public class SLSRunner {
 
   @SuppressWarnings("unchecked")
   private void startAM() throws YarnException, IOException {
-    // application/container configuration
-    int heartbeatInterval = conf.getInt(
-            SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS,
-            SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
-    int containerMemoryMB = conf.getInt(SLSConfiguration.CONTAINER_MEMORY_MB,
-            SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT);
-    int containerVCores = conf.getInt(SLSConfiguration.CONTAINER_VCORES,
-            SLSConfiguration.CONTAINER_VCORES_DEFAULT);
-    Resource containerResource =
-            BuilderUtils.newResource(containerMemoryMB, containerVCores);
-
-    // application workload
-    if (isSLS) {
-      startAMFromSLSTraces(containerResource, heartbeatInterval);
-    } else {
-      startAMFromRumenTraces(containerResource, heartbeatInterval);
+    switch (inputType) {
+    case SLS:
+      for (String inputTrace : inputTraces) {
+        startAMFromSLSTrace(inputTrace);
+      }
+      break;
+    case RUMEN:
+      long baselineTimeMS = 0;
+      for (String inputTrace : inputTraces) {
+        startAMFromRumenTrace(inputTrace, baselineTimeMS);
+      }
+      break;
+    case SYNTH:
+      startAMFromSynthGenerator();
+      break;
+    default:
+      throw new YarnException("Input configuration not recognized, "
+          + "trace type should be SLS, RUMEN, or SYNTH");
     }
+
     numAMs = amMap.size();
     remainingApps = numAMs;
   }
 
   /**
-   * parse workload information from sls trace files
+   * Parse workload from a SLS trace file.
    */
   @SuppressWarnings("unchecked")
-  private void startAMFromSLSTraces(Resource containerResource,
-                                    int heartbeatInterval) throws IOException {
-    // parse from sls traces
+  private void startAMFromSLSTrace(String inputTrace) throws IOException {
     JsonFactory jsonF = new JsonFactory();
     ObjectMapper mapper = new ObjectMapper();
-    for (String inputTrace : inputTraces) {
-      Reader input =
-          new InputStreamReader(new FileInputStream(inputTrace), "UTF-8");
-      try {
-        Iterator<Map> i = mapper.readValues(jsonF.createJsonParser(input),
-                Map.class);
-        while (i.hasNext()) {
-          Map jsonJob = i.next();
-
-          // load job information
-          long jobStartTime = Long.parseLong(
-                  jsonJob.get("job.start.ms").toString());
-          long jobFinishTime = Long.parseLong(
-                  jsonJob.get("job.end.ms").toString());
-
-          String user = (String) jsonJob.get("job.user");
-          if (user == null)  user = "default";
-          String queue = jsonJob.get("job.queue.name").toString();
-
-          String oldAppId = jsonJob.get("job.id").toString();
-          boolean isTracked = trackedApps.contains(oldAppId);
-          int queueSize = queueAppNumMap.containsKey(queue) ?
-                  queueAppNumMap.get(queue) : 0;
-          queueSize ++;
-          queueAppNumMap.put(queue, queueSize);
-          // tasks
-          List tasks = (List) jsonJob.get("job.tasks");
-          if (tasks == null || tasks.size() == 0) {
-            continue;
-          }
-          List<ContainerSimulator> containerList =
-                  new ArrayList<ContainerSimulator>();
-          for (Object o : tasks) {
-            Map jsonTask = (Map) o;
-            String hostname = jsonTask.get("container.host").toString();
-            long taskStart = Long.parseLong(
-                    jsonTask.get("container.start.ms").toString());
-            long taskFinish = Long.parseLong(
-                    jsonTask.get("container.end.ms").toString());
-            long lifeTime = taskFinish - taskStart;
-
-            // Set memory and vcores from job trace file
-            Resource res = Resources.clone(containerResource);
-            if (jsonTask.containsKey("container.memory")) {
-              int containerMemory = Integer.parseInt(
-                  jsonTask.get("container.memory").toString());
-              res.setMemorySize(containerMemory);
-            }
-
-            if (jsonTask.containsKey("container.vcores")) {
-              int containerVCores = Integer.parseInt(
-                  jsonTask.get("container.vcores").toString());
-              res.setVirtualCores(containerVCores);
-            }
-
-            int priority = Integer.parseInt(
-                    jsonTask.get("container.priority").toString());
-            String type = jsonTask.get("container.type").toString();
-            containerList.add(new ContainerSimulator(res,
-                    lifeTime, hostname, priority, type));
-          }
-
-          // create a new AM
-          String amType = jsonJob.get("am.type").toString();
-          AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
-                  amClassMap.get(amType), new Configuration());
-          if (amSim != null) {
-            amSim.init(AM_ID++, heartbeatInterval, containerList, rm,
-                    this, jobStartTime, jobFinishTime, user, queue,
-                    isTracked, oldAppId);
-            runner.schedule(amSim);
-            maxRuntime = Math.max(maxRuntime, jobFinishTime);
-            numTasks += containerList.size();
-            amMap.put(oldAppId, amSim);
-          }
+
+    try (Reader input = new InputStreamReader(
+        new FileInputStream(inputTrace), "UTF-8")) {
+      Iterator<Map> jobIter = mapper.readValues(
+          jsonF.createParser(input), Map.class);
+
+      while (jobIter.hasNext()) {
+        try {
+          createAMForJob(jobIter.next());
+        } catch (Exception e) {
+          LOG.error("Failed to create an AM: {}", e.getMessage());
         }
-      } finally {
-        input.close();
       }
     }
   }
 
+  private void createAMForJob(Map jsonJob) throws YarnException {
+    long jobStartTime = Long.parseLong(jsonJob.get("job.start.ms").toString());
+
+    long jobFinishTime = 0;
+    if (jsonJob.containsKey("job.end.ms")) {
+      jobFinishTime = Long.parseLong(jsonJob.get("job.end.ms").toString());
+    }
+
+    String user = (String) jsonJob.get("job.user");
+    if (user == null) {
+      user = "default";
+    }
+
+    String queue = jsonJob.get("job.queue.name").toString();
+    increaseQueueAppNum(queue);
+
+    String amType = (String)jsonJob.get("am.type");
+    if (amType == null) {
+      amType = SLSUtils.DEFAULT_JOB_TYPE;
+    }
+
+    int jobCount = 1;
+    if (jsonJob.containsKey("job.count")) {
+      jobCount = Integer.parseInt(jsonJob.get("job.count").toString());
+    }
+    jobCount = Math.max(jobCount, 1);
+
+    String oldAppId = (String)jsonJob.get("job.id");
+    // Job id is generated automatically if this job configuration allows
+    // multiple job instances
+    if(jobCount > 1) {
+      oldAppId = null;
+    }
+
+    for (int i = 0; i < jobCount; i++) {
+      runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
+          getTaskContainers(jsonJob), null, getAMContainerResource(jsonJob));
+    }
+  }
+
+  private List<ContainerSimulator> getTaskContainers(Map jsonJob)
+      throws YarnException {
+    List<ContainerSimulator> containers = new ArrayList<>();
+    List tasks = (List) jsonJob.get("job.tasks");
+    if (tasks == null || tasks.size() == 0) {
+      throw new YarnException("No task for the job!");
+    }
+
+    for (Object o : tasks) {
+      Map jsonTask = (Map) o;
+
+      String hostname = (String) jsonTask.get("container.host");
+
+      long duration = 0;
+      if (jsonTask.containsKey("duration.ms")) {
+        duration = Integer.parseInt(jsonTask.get("duration.ms").toString());
+      } else if (jsonTask.containsKey("container.start.ms") &&
+          jsonTask.containsKey("container.end.ms")) {
+        long taskStart = Long.parseLong(jsonTask.get("container.start.ms")
+            .toString());
+        long taskFinish = Long.parseLong(jsonTask.get("container.end.ms")
+            .toString());
+        duration = taskFinish - taskStart;
+      }
+      if (duration <= 0) {
+        throw new YarnException("Duration of a task shouldn't be less or equal"
+            + " to 0!");
+      }
+
+      Resource res = getDefaultContainerResource();
+      if (jsonTask.containsKey("container.memory")) {
+        int containerMemory =
+            Integer.parseInt(jsonTask.get("container.memory").toString());
+        res.setMemorySize(containerMemory);
+      }
+
+      if (jsonTask.containsKey("container.vcores")) {
+        int containerVCores =
+            Integer.parseInt(jsonTask.get("container.vcores").toString());
+        res.setVirtualCores(containerVCores);
+      }
+
+      int priority = DEFAULT_MAPPER_PRIORITY;
+      if (jsonTask.containsKey("container.priority")) {
+        priority = Integer.parseInt(jsonTask.get("container.priority")
+            .toString());
+      }
+
+      String type = "map";
+      if (jsonTask.containsKey("container.type")) {
+        type = jsonTask.get("container.type").toString();
+      }
+
+      int count = 1;
+      if (jsonTask.containsKey("count")) {
+        count = Integer.parseInt(jsonTask.get("count").toString());
+      }
+      count = Math.max(count, 1);
+
+      for (int i = 0; i < count; i++) {
+        containers.add(
+            new ContainerSimulator(res, duration, hostname, priority, type));
+      }
+    }
+
+    return containers;
+  }
+
   /**
-   * parse workload information from rumen trace files
+   * Parse workload from a rumen trace file.
    */
   @SuppressWarnings("unchecked")
-  private void startAMFromRumenTraces(Resource containerResource,
-                                      int heartbeatInterval)
-          throws IOException {
+  private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS)
+      throws IOException {
     Configuration conf = new Configuration();
     conf.set("fs.defaultFS", "file:///");
+    File fin = new File(inputTrace);
+
+    try (JobTraceReader reader = new JobTraceReader(
+        new Path(fin.getAbsolutePath()), conf)) {
+      LoggedJob job = reader.getNext();
+
+      while (job != null) {
+        try {
+          createAMForJob(job, baselineTimeMS);
+        } catch (Exception e) {
+          LOG.error("Failed to create an AM: {}", e.getMessage());
+        }
+
+        job = reader.getNext();
+      }
+    }
+  }
+
+  private void createAMForJob(LoggedJob job, long baselineTimeMs)
+      throws YarnException {
+    String user = job.getUser() == null ? "default" :
+        job.getUser().getValue();
+    String jobQueue = job.getQueue().getValue();
+    String oldJobId = job.getJobID().toString();
+    long jobStartTimeMS = job.getSubmitTime();
+    long jobFinishTimeMS = job.getFinishTime();
+    if (baselineTimeMs == 0) {
+      baselineTimeMs = job.getSubmitTime();
+    }
+    jobStartTimeMS -= baselineTimeMs;
+    jobFinishTimeMS -= baselineTimeMs;
+    if (jobStartTimeMS < 0) {
+      LOG.warn("Warning: reset job {} start time to 0.", oldJobId);
+      jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
+      jobStartTimeMS = 0;
+    }
+
+    increaseQueueAppNum(jobQueue);
+
+    List<ContainerSimulator> containerList = new ArrayList<>();
+    // mapper
+    for (LoggedTask mapTask : job.getMapTasks()) {
+      if (mapTask.getAttempts().size() == 0) {
+        throw new YarnException("Invalid map task, no attempt for a mapper!");
+      }
+      LoggedTaskAttempt taskAttempt =
+          mapTask.getAttempts().get(mapTask.getAttempts().size() - 1);
+      String hostname = taskAttempt.getHostName().getValue();
+      long containerLifeTime = taskAttempt.getFinishTime() -
+          taskAttempt.getStartTime();
+      containerList.add(
+          new ContainerSimulator(getDefaultContainerResource(),
+              containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map"));
+    }
+
+    // reducer
+    for (LoggedTask reduceTask : job.getReduceTasks()) {
+      if (reduceTask.getAttempts().size() == 0) {
+        throw new YarnException(
+            "Invalid reduce task, no attempt for a reducer!");
+      }
+      LoggedTaskAttempt taskAttempt =
+          reduceTask.getAttempts().get(reduceTask.getAttempts().size() - 1);
+      String hostname = taskAttempt.getHostName().getValue();
+      long containerLifeTime = taskAttempt.getFinishTime() -
+          taskAttempt.getStartTime();
+      containerList.add(
+          new ContainerSimulator(getDefaultContainerResource(),
+              containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce"));
+    }
+
+    // Only supports the default job type currently
+    runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId,
+        jobStartTimeMS, jobFinishTimeMS, containerList, null,
+        getAMContainerResource(null));
+  }
+
+  private Resource getDefaultContainerResource() {
+    int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB,
+        SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT);
+    int containerVCores = getConf().getInt(SLSConfiguration.CONTAINER_VCORES,
+        SLSConfiguration.CONTAINER_VCORES_DEFAULT);
+    return Resources.createResource(containerMemory, containerVCores);
+  }
+
+  /**
+   * parse workload information from synth-generator trace files.
+   */
+  @SuppressWarnings("unchecked")
+  private void startAMFromSynthGenerator() throws YarnException, IOException {
+    Configuration localConf = new Configuration();
+    localConf.set("fs.defaultFS", "file:///");
     long baselineTimeMS = 0;
-    for (String inputTrace : inputTraces) {
-      File fin = new File(inputTrace);
-      JobTraceReader reader = new JobTraceReader(
-              new Path(fin.getAbsolutePath()), conf);
-      try {
-        LoggedJob job = null;
-        while ((job = reader.getNext()) != null) {
-          // only support MapReduce currently
-          String jobType = "mapreduce";
-          String user = job.getUser() == null ?
-                  "default" : job.getUser().getValue();
-          String jobQueue = job.getQueue().getValue();
-          String oldJobId = job.getJobID().toString();
-          long jobStartTimeMS = job.getSubmitTime();
-          long jobFinishTimeMS = job.getFinishTime();
-          if (baselineTimeMS == 0) {
-            baselineTimeMS = jobStartTimeMS;
-          }
-          jobStartTimeMS -= baselineTimeMS;
-          jobFinishTimeMS -= baselineTimeMS;
-          if (jobStartTimeMS < 0) {
-            LOG.warn("Warning: reset job {} start time to 0.", oldJobId);
-            jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
-            jobStartTimeMS = 0;
-          }
-
-          boolean isTracked = trackedApps.contains(oldJobId);
-          int queueSize = queueAppNumMap.containsKey(jobQueue) ?
-                  queueAppNumMap.get(jobQueue) : 0;
-          queueSize ++;
-          queueAppNumMap.put(jobQueue, queueSize);
-
-          List<ContainerSimulator> containerList =
-                  new ArrayList<ContainerSimulator>();
-          // map tasks
-          for(LoggedTask mapTask : job.getMapTasks()) {
-            if (mapTask.getAttempts().size() == 0) {
-              continue;
-            }
-            LoggedTaskAttempt taskAttempt = mapTask.getAttempts()
-                    .get(mapTask.getAttempts().size() - 1);
-            String hostname = taskAttempt.getHostName().getValue();
-            long containerLifeTime = taskAttempt.getFinishTime()
-                    - taskAttempt.getStartTime();
-            containerList.add(new ContainerSimulator(containerResource,
-                    containerLifeTime, hostname, 10, "map"));
-          }
-
-          // reduce tasks
-          for(LoggedTask reduceTask : job.getReduceTasks()) {
-            if (reduceTask.getAttempts().size() == 0) {
-              continue;
-            }
-            LoggedTaskAttempt taskAttempt = reduceTask.getAttempts()
-                    .get(reduceTask.getAttempts().size() - 1);
-            String hostname = taskAttempt.getHostName().getValue();
-            long containerLifeTime = taskAttempt.getFinishTime()
-                    - taskAttempt.getStartTime();
-            containerList.add(new ContainerSimulator(containerResource,
-                    containerLifeTime, hostname, 20, "reduce"));
-          }
-
-          // create a new AM
-          AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
-                  amClassMap.get(jobType), conf);
-          if (amSim != null) {
-            amSim.init(AM_ID ++, heartbeatInterval, containerList,
-                    rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue,
-                    isTracked, oldJobId);
-            runner.schedule(amSim);
-            maxRuntime = Math.max(maxRuntime, jobFinishTimeMS);
-            numTasks += containerList.size();
-            amMap.put(oldJobId, amSim);
-          }
+
+    // reservations use wall clock time, so need to have a reference for that
+    UTCClock clock = new UTCClock();
+    long now = clock.getTime();
+
+    try {
+
+      // if we use the nodeFile this could have been not initialized yet.
+      if (stjp == null) {
+        stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
+      }
+
+      SynthJob job = null;
+      // we use stjp, a reference to the job producer instantiated during node
+      // creation
+      while ((job = (SynthJob) stjp.getNextJob()) != null) {
+        // only support MapReduce currently
+        String user = job.getUser();
+        String jobQueue = job.getQueueName();
+        String oldJobId = job.getJobID().toString();
+        long jobStartTimeMS = job.getSubmissionTime();
+
+        // CARLO: Finish time is only used for logging, omit for now
+        long jobFinishTimeMS = -1L;
+
+        if (baselineTimeMS == 0) {
+          baselineTimeMS = jobStartTimeMS;
+        }
+        jobStartTimeMS -= baselineTimeMS;
+        jobFinishTimeMS -= baselineTimeMS;
+        if (jobStartTimeMS < 0) {
+          LOG.warn("Warning: reset job {} start time to 0.", oldJobId);
+          jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
+          jobStartTimeMS = 0;
+        }
+
+        increaseQueueAppNum(jobQueue);
+
+        List<ContainerSimulator> containerList =
+            new ArrayList<ContainerSimulator>();
+        ArrayList<NodeId> keyAsArray = new ArrayList<NodeId>(nmMap.keySet());
+        Random rand = new Random(stjp.getSeed());
+
+        Resource maxMapRes = Resource.newInstance(0, 0);
+        long maxMapDur = 0;
+        // map tasks
+        for (int i = 0; i < job.getNumberMaps(); i++) {
+          TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.MAP, i, 0);
+          RMNode node = nmMap
+              .get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode();
+          String hostname = "/" + node.getRackName() + "/" + node.getHostName();
+          long containerLifeTime = tai.getRuntime();
+          Resource containerResource =
+              Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
+                  (int) tai.getTaskInfo().getTaskVCores());
+          containerList.add(new ContainerSimulator(containerResource,
+              containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map"));
+          maxMapRes = Resources.componentwiseMax(maxMapRes, containerResource);
+          maxMapDur =
+              containerLifeTime > maxMapDur ? containerLifeTime : maxMapDur;
+
+        }
+
+        Resource maxRedRes = Resource.newInstance(0, 0);
+        long maxRedDur = 0;
+        // reduce tasks
+        for (int i = 0; i < job.getNumberReduces(); i++) {
+          TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.REDUCE, i, 0);
+          RMNode node = nmMap
+              .get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode();
+          String hostname = "/" + node.getRackName() + "/" + node.getHostName();
+          long containerLifeTime = tai.getRuntime();
+          Resource containerResource =
+              Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
+                  (int) tai.getTaskInfo().getTaskVCores());
+          containerList.add(new ContainerSimulator(containerResource,
+              containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce"));
+          maxRedRes = Resources.componentwiseMax(maxRedRes, containerResource);
+          maxRedDur =
+              containerLifeTime > maxRedDur ? containerLifeTime : maxRedDur;
+
+        }
+
+        // generating reservations for the jobs that require them
+
+        ReservationSubmissionRequest rr = null;
+        if (job.hasDeadline()) {
+          ReservationId reservationId =
+              ReservationId.newInstance(this.rm.getStartTime(), AM_ID);
+
+          rr = ReservationClientUtil.createMRReservation(reservationId,
+              "reservation_" + AM_ID, maxMapRes, job.getNumberMaps(), maxMapDur,
+              maxRedRes, job.getNumberReduces(), maxRedDur,
+              now + jobStartTimeMS, now + job.getDeadline(),
+              job.getQueueName());
+
         }
-      } finally {
-        reader.close();
+
+        runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId,
+            jobStartTimeMS, jobFinishTimeMS, containerList, rr,
+            getAMContainerResource(null));
+      }
+    } finally {
+      stjp.close();
+    }
+
+  }
+
+  private Resource getAMContainerResource(Map jsonJob) {
+    Resource amContainerResource =
+        SLSConfiguration.getAMContainerResource(getConf());
+
+    if (jsonJob == null) {
+      return amContainerResource;
+    }
+
+    if (jsonJob.containsKey("am.memory")) {
+      amContainerResource.setMemorySize(
+          Long.parseLong(jsonJob.get("am.memory").toString()));
+    }
+
+    if (jsonJob.containsKey("am.vcores")) {
+      amContainerResource.setVirtualCores(
+          Integer.parseInt(jsonJob.get("am.vcores").toString()));
+    }
+    return amContainerResource;
+  }
+
+  private void increaseQueueAppNum(String queue) throws YarnException {
+    SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler();
+    String queueName = wrapper.getRealQueueName(queue);
+    Integer appNum = queueAppNumMap.get(queueName);
+    if (appNum == null) {
+      appNum = 1;
+    } else {
+      appNum++;
+    }
+
+    queueAppNumMap.put(queueName, appNum);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void runNewAM(String jobType, String user,
+      String jobQueue, String oldJobId, long jobStartTimeMS,
+      long jobFinishTimeMS, List<ContainerSimulator> containerList,
+      ReservationSubmissionRequest rr, Resource amContainerResource) {
+
+    AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
+        amClassMap.get(jobType), new Configuration());
+
+    if (amSim != null) {
+      int heartbeatInterval = getConf().getInt(
+          SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS,
+          SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
+      boolean isTracked = trackedApps.contains(oldJobId);
+
+      if (oldJobId == null) {
+        oldJobId = Integer.toString(AM_ID);
       }
+      AM_ID++;
+
+      amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
+          jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, rr,
+          runner.getStartTimeMS(), amContainerResource);
+      runner.schedule(amSim);
+      maxRuntime = Math.max(maxRuntime, jobFinishTimeMS);
+      numTasks += containerList.size();
+      amMap.put(oldJobId, amSim);
     }
   }
-  
+
   private void printSimulationInfo() {
     if (printSimulation) {
       // node
@@ -468,7 +778,7 @@ public class SLSRunner {
       LOG.info("JobId\tQueue\tAMType\tDuration\t#Tasks");
       for (Map.Entry<String, AMSimulator> entry : amMap.entrySet()) {
         AMSimulator am = entry.getValue();
-        LOG.info(entry.getKey() + "\t" + am.getQueue() + "\t" + am.getAMType() 
+        LOG.info(entry.getKey() + "\t" + am.getQueue() + "\t" + am.getAMType()
             + "\t" + am.getDuration() + "\t" + am.getNumTasks());
       }
       LOG.info("------------------------------------");
@@ -502,69 +812,125 @@ public class SLSRunner {
     return nmMap;
   }
 
-  public static TaskRunner getRunner() {
-    return runner;
-  }
-
   public static void decreaseRemainingApps() {
-    remainingApps --;
+    remainingApps--;
 
     if (remainingApps == 0) {
       LOG.info("SLSRunner tears down.");
-      System.exit(0);
+      if (exitAtTheFinish) {
+        System.exit(0);
+      }
     }
   }
 
-  public static void main(String args[]) throws Exception {
+  public void stop() throws InterruptedException {
+    rm.stop();
+    runner.stop();
+  }
+
+  public int run(final String[] argv) throws IOException, InterruptedException,
+      ParseException, ClassNotFoundException, YarnException {
+
     Options options = new Options();
+
+    // Left for compatibility
     options.addOption("inputrumen", true, "input rumen files");
     options.addOption("inputsls", true, "input sls files");
+
+    // New more general format
+    options.addOption("tracetype", true, "the type of trace");
+    options.addOption("tracelocation", true, "input trace files");
+
     options.addOption("nodes", true, "input topology");
     options.addOption("output", true, "output directory");
     options.addOption("trackjobs", true,
-            "jobs to be tracked during simulating");
+        "jobs to be tracked during simulating");
     options.addOption("printsimulation", false,
-            "print out simulation information");
-    
+        "print out simulation information");
+
     CommandLineParser parser = new GnuParser();
-    CommandLine cmd = parser.parse(options, args);
+    CommandLine cmd = parser.parse(options, argv);
+
+    String traceType = null;
+    String traceLocation = null;
+
+    // compatibility with old commandline
+    if (cmd.hasOption("inputrumen")) {
+      traceType = "RUMEN";
+      traceLocation = cmd.getOptionValue("inputrumen");
+    }
+    if (cmd.hasOption("inputsls")) {
+      traceType = "SLS";
+      traceLocation = cmd.getOptionValue("inputsls");
+    }
+
+    if (cmd.hasOption("tracetype")) {
+      traceType = cmd.getOptionValue("tracetype");
+      traceLocation = cmd.getOptionValue("tracelocation");
+    }
 
-    String inputRumen = cmd.getOptionValue("inputrumen");
-    String inputSLS = cmd.getOptionValue("inputsls");
     String output = cmd.getOptionValue("output");
-    
-    if ((inputRumen == null && inputSLS == null) || output == null) {
-      System.err.println();
-      System.err.println("ERROR: Missing input or output file");
-      System.err.println();
-      System.err.println("Options: -inputrumen|-inputsls FILE,FILE... " +
-              "-output FILE [-nodes FILE] [-trackjobs JobId,JobId...] " +
-              "[-printsimulation]");
-      System.err.println();
-      System.exit(1);
-    }
-    
+
     File outputFile = new File(output);
-    if (! outputFile.exists()
-            && ! outputFile.mkdirs()) {
+    if (!outputFile.exists() && !outputFile.mkdirs()) {
       System.err.println("ERROR: Cannot create output directory "
-              + outputFile.getAbsolutePath());
-      System.exit(1);
+          + outputFile.getAbsolutePath());
+      throw new YarnException("Cannot create output directory");
     }
-    
+
     Set<String> trackedJobSet = new HashSet<String>();
     if (cmd.hasOption("trackjobs")) {
       String trackjobs = cmd.getOptionValue("trackjobs");
       String jobIds[] = trackjobs.split(",");
       trackedJobSet.addAll(Arrays.asList(jobIds));
     }
-    
-    String nodeFile = cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : "";
 
-    boolean isSLS = inputSLS != null;
-    String inputFiles[] = isSLS ? inputSLS.split(",") : inputRumen.split(",");
-    SLSRunner sls = new SLSRunner(isSLS, inputFiles, nodeFile, output,
+    String tempNodeFile =
+        cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : "";
+
+    TraceType tempTraceType = TraceType.SLS;
+    switch (traceType) {
+    case "SLS":
+      tempTraceType = TraceType.SLS;
+      break;
+    case "RUMEN":
+      tempTraceType = TraceType.RUMEN;
+      break;
+
+    case "SYNTH":
+      tempTraceType = TraceType.SYNTH;
+      break;
+    default:
+      printUsage();
+      throw new YarnException("Misconfigured input");
+    }
+
+    String[] inputFiles = traceLocation.split(",");
+
+    setSimulationParams(tempTraceType, inputFiles, tempNodeFile, output,
         trackedJobSet, cmd.hasOption("printsimulation"));
-    sls.start();
+
+    start();
+
+    return 0;
   }
+
+  public static void main(String[] argv) throws Exception {
+    exitAtTheFinish = true;
+    ToolRunner.run(new Configuration(), new SLSRunner(), argv);
+  }
+
+  static void printUsage() {
+    System.err.println();
+    System.err.println("ERROR: Wrong tracetype");
+    System.err.println();
+    System.err.println(
+        "Options: -tracetype " + "SLS|RUMEN|SYNTH -tracelocation FILE,FILE... "
+            + "(deprecated alternative options --inputsls FILE, FILE,... "
+            + " | --inputrumen FILE,FILE,...)"
+            + "-output FILE [-nodes FILE] [-trackjobs JobId,JobId...] "
+            + "[-printsimulation]");
+    System.err.println();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
index e536cb6..6171154 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.sls.appmaster;
 
 import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -34,25 +35,24 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords
-        .FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-
-import org.apache.hadoop.yarn.api.protocolrecords
-        .RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords
-        .RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -60,12 +60,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
 import org.apache.hadoop.yarn.util.Records;
-
 import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
 import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
 import org.apache.hadoop.yarn.sls.SLSRunner;
@@ -90,7 +86,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
           RecordFactoryProvider.getRecordFactory(null);
   // response queue
   protected final BlockingQueue<AllocateResponse> responseQueue;
-  protected int RESPONSE_ID = 1;
+  private int responseId = 0;
   // user name
   protected String user;  
   // queue name
@@ -98,6 +94,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
   // am type
   protected String amtype;
   // job start/end time
+  private long baselineTimeMS;
   protected long traceStartTimeMS;
   protected long traceFinishTimeMS;
   protected long simulateStartTimeMS;
@@ -107,28 +104,41 @@ public abstract class AMSimulator extends TaskRunner.Task {
   // progress
   protected int totalContainers;
   protected int finishedContainers;
+
+  // waiting for AM container
+  volatile boolean isAMContainerRunning = false;
+  volatile Container amContainer;
   
-  protected final Logger LOG = LoggerFactory.getLogger(AMSimulator.class);
+  private static final Logger LOG = LoggerFactory.getLogger(AMSimulator.class);
+
+  private Resource amContainerResource;
+
+  private ReservationSubmissionRequest reservationRequest;
 
   public AMSimulator() {
-    this.responseQueue = new LinkedBlockingQueue<AllocateResponse>();
+    this.responseQueue = new LinkedBlockingQueue<>();
   }
 
-  public void init(int id, int heartbeatInterval, 
-      List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
-      long traceStartTime, long traceFinishTime, String user, String queue, 
-      boolean isTracked, String oldAppId) {
-    super.init(traceStartTime, traceStartTime + 1000000L * heartbeatInterval,
-            heartbeatInterval);
-    this.user = user;
-    this.rm = rm;
-    this.se = se;
-    this.user = user;
-    this.queue = queue;
-    this.oldAppId = oldAppId;
-    this.isTracked = isTracked;
-    this.traceStartTimeMS = traceStartTime;
-    this.traceFinishTimeMS = traceFinishTime;
+  @SuppressWarnings("checkstyle:parameternumber")
+  public void init(int heartbeatInterval,
+      List<ContainerSimulator> containerList, ResourceManager resourceManager,
+      SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
+      String simQueue, boolean tracked, String oldApp,
+      ReservationSubmissionRequest rr, long baseTimeMS,
+      Resource amContainerResource) {
+    super.init(startTime, startTime + 1000000L * heartbeatInterval,
+        heartbeatInterval);
+    this.user = simUser;
+    this.rm = resourceManager;
+    this.se = slsRunnner;
+    this.queue = simQueue;
+    this.oldAppId = oldApp;
+    this.isTracked = tracked;
+    this.baselineTimeMS = baseTimeMS;
+    this.traceStartTimeMS = startTime;
+    this.traceFinishTimeMS = finishTime;
+    this.reservationRequest = rr;
+    this.amContainerResource = amContainerResource;
   }
 
   /**
@@ -136,29 +146,66 @@ public abstract class AMSimulator extends TaskRunner.Task {
    */
   @Override
   public void firstStep() throws Exception {
-    simulateStartTimeMS = System.currentTimeMillis() - 
-                          SLSRunner.getRunner().getStartTimeMS();
+    simulateStartTimeMS = System.currentTimeMillis() - baselineTimeMS;
 
-    // submit application, waiting until ACCEPTED
-    submitApp();
+    ReservationId reservationId = null;
 
-    // register application master
-    registerAM();
+    // submit a reservation if one is required, exceptions naturally happen
+    // when the reservation does not fit, catch, log, and move on running job
+    // without reservation.
+    try {
+      reservationId = submitReservationWhenSpecified();
+    } catch (UndeclaredThrowableException y) {
+      LOG.warn("Unable to place reservation: " + y.getMessage());
+    }
+
+    // submit application, waiting until ACCEPTED
+    submitApp(reservationId);
 
     // track app metrics
     trackApp();
   }
 
+  public synchronized void notifyAMContainerLaunched(Container masterContainer)
+      throws Exception {
+    this.amContainer = masterContainer;
+    this.appAttemptId = masterContainer.getId().getApplicationAttemptId();
+    registerAM();
+    isAMContainerRunning = true;
+  }
+
+  private ReservationId submitReservationWhenSpecified()
+      throws IOException, InterruptedException {
+    if (reservationRequest != null) {
+      UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws YarnException, IOException {
+          rm.getClientRMService().submitReservation(reservationRequest);
+          LOG.info("RESERVATION SUCCESSFULLY SUBMITTED "
+              + reservationRequest.getReservationId());
+          return null;
+
+        }
+      });
+      return reservationRequest.getReservationId();
+    } else {
+      return null;
+    }
+  }
+
   @Override
   public void middleStep() throws Exception {
-    // process responses in the queue
-    processResponseQueue();
-    
-    // send out request
-    sendContainerRequest();
-    
-    // check whether finish
-    checkStop();
+    if (isAMContainerRunning) {
+      // process responses in the queue
+      processResponseQueue();
+
+      // send out request
+      sendContainerRequest();
+
+      // check whether finish
+      checkStop();
+    }
   }
 
   @Override
@@ -168,6 +215,22 @@ public abstract class AMSimulator extends TaskRunner.Task {
     if (isTracked) {
       untrackApp();
     }
+
+    // Finish AM container
+    if (amContainer != null) {
+      LOG.info("AM container = {} reported to finish", amContainer.getId());
+      se.getNmMap().get(amContainer.getNodeId()).cleanupContainer(
+          amContainer.getId());
+    } else {
+      LOG.info("AM container is null");
+    }
+
+    if (null == appAttemptId) {
+      // If appAttemptId == null, AM is not launched from RM's perspective, so
+      // it's unnecessary to finish am as well
+      return;
+    }
+
     // unregister application master
     final FinishApplicationMasterRequest finishAMRequest = recordFactory
                   .newRecordInstance(FinishApplicationMasterRequest.class);
@@ -187,13 +250,14 @@ public abstract class AMSimulator extends TaskRunner.Task {
       }
     });
 
-    simulateFinishTimeMS = System.currentTimeMillis() -
-        SLSRunner.getRunner().getStartTimeMS();
+    simulateFinishTimeMS = System.currentTimeMillis() - baselineTimeMS;
     // record job running information
-    ((SchedulerWrapper)rm.getResourceScheduler())
-         .addAMRuntime(appId, 
-                      traceStartTimeMS, traceFinishTimeMS, 
-                      simulateStartTimeMS, simulateFinishTimeMS);
+    SchedulerMetrics schedulerMetrics =
+            ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics();
+    if (schedulerMetrics != null) {
+      schedulerMetrics.addAMRuntime(appId, traceStartTimeMS, traceFinishTimeMS,
+              simulateStartTimeMS, simulateFinishTimeMS);
+    }
   }
   
   protected ResourceRequest createResourceRequest(
@@ -213,7 +277,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
       List<ContainerId> toRelease) {
     AllocateRequest allocateRequest =
             recordFactory.newRecordInstance(AllocateRequest.class);
-    allocateRequest.setResponseId(RESPONSE_ID ++);
+    allocateRequest.setResponseId(responseId++);
     allocateRequest.setAskList(ask);
     allocateRequest.setReleaseList(toRelease);
     return allocateRequest;
@@ -229,7 +293,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
   
   protected abstract void checkStop();
   
-  private void submitApp()
+  private void submitApp(ReservationId reservationId)
           throws YarnException, InterruptedException, IOException {
     // ask for new application
     GetNewApplicationRequest newAppRequest =
@@ -249,14 +313,19 @@ public abstract class AMSimulator extends TaskRunner.Task {
     appSubContext.setPriority(Priority.newInstance(0));
     ContainerLaunchContext conLauContext = 
         Records.newRecord(ContainerLaunchContext.class);
-    conLauContext.setApplicationACLs(
-        new HashMap<ApplicationAccessType, String>());
+    conLauContext
+        .setApplicationACLs(new HashMap<ApplicationAccessType, String>());
     conLauContext.setCommands(new ArrayList<String>());
     conLauContext.setEnvironment(new HashMap<String, String>());
     conLauContext.setLocalResources(new HashMap<String, LocalResource>());
     conLauContext.setServiceData(new HashMap<String, ByteBuffer>());
     appSubContext.setAMContainerSpec(conLauContext);
-    appSubContext.setUnmanagedAM(true);
+    appSubContext.setResource(amContainerResource);
+
+    if(reservationId != null) {
+      appSubContext.setReservationID(reservationId);
+    }
+
     subAppRequest.setApplicationSubmissionContext(appSubContext);
     UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
     ugi.doAs(new PrivilegedExceptionAction<Object>() {
@@ -267,22 +336,6 @@ public abstract class AMSimulator extends TaskRunner.Task {
       }
     });
     LOG.info("Submit a new application {}", appId);
-    
-    // waiting until application ACCEPTED
-    RMApp app = rm.getRMContext().getRMApps().get(appId);
-    while(app.getState() != RMAppState.ACCEPTED) {
-      Thread.sleep(10);
-    }
-
-    // Waiting until application attempt reach LAUNCHED
-    // "Unmanaged AM must register after AM attempt reaches LAUNCHED state"
-    this.appAttemptId = rm.getRMContext().getRMApps().get(appId)
-        .getCurrentAppAttempt().getAppAttemptId();
-    RMAppAttempt rmAppAttempt = rm.getRMContext().getRMApps().get(appId)
-        .getCurrentAppAttempt();
-    while (rmAppAttempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED) {
-      Thread.sleep(10);
-    }
   }
 
   private void registerAM()
@@ -314,14 +367,20 @@ public abstract class AMSimulator extends TaskRunner.Task {
 
   private void trackApp() {
     if (isTracked) {
-      ((SchedulerWrapper) rm.getResourceScheduler())
-              .addTrackedApp(appAttemptId, oldAppId);
+      SchedulerMetrics schedulerMetrics =
+          ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics();
+      if (schedulerMetrics != null) {
+        schedulerMetrics.addTrackedApp(appId, oldAppId);
+      }
     }
   }
   public void untrackApp() {
     if (isTracked) {
-      ((SchedulerWrapper) rm.getResourceScheduler())
-              .removeTrackedApp(appAttemptId, oldAppId);
+      SchedulerMetrics schedulerMetrics =
+          ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics();
+      if (schedulerMetrics != null) {
+        schedulerMetrics.removeTrackedApp(oldAppId);
+      }
     }
   }
   
@@ -332,26 +391,28 @@ public abstract class AMSimulator extends TaskRunner.Task {
     Map<String, ResourceRequest> nodeLocalRequestMap = new HashMap<String, ResourceRequest>();
     ResourceRequest anyRequest = null;
     for (ContainerSimulator cs : csList) {
-      String rackHostNames[] = SLSUtils.getRackHostName(cs.getHostname());
-      // check rack local
-      String rackname = rackHostNames[0];
-      if (rackLocalRequestMap.containsKey(rackname)) {
-        rackLocalRequestMap.get(rackname).setNumContainers(
-            rackLocalRequestMap.get(rackname).getNumContainers() + 1);
-      } else {
-        ResourceRequest request = createResourceRequest(
-                cs.getResource(), rackname, priority, 1);
-        rackLocalRequestMap.put(rackname, request);
-      }
-      // check node local
-      String hostname = rackHostNames[1];
-      if (nodeLocalRequestMap.containsKey(hostname)) {
-        nodeLocalRequestMap.get(hostname).setNumContainers(
-            nodeLocalRequestMap.get(hostname).getNumContainers() + 1);
-      } else {
-        ResourceRequest request = createResourceRequest(
-                cs.getResource(), hostname, priority, 1);
-        nodeLocalRequestMap.put(hostname, request);
+      if (cs.getHostname() != null) {
+        String[] rackHostNames = SLSUtils.getRackHostName(cs.getHostname());
+        // check rack local
+        String rackname = "/" + rackHostNames[0];
+        if (rackLocalRequestMap.containsKey(rackname)) {
+          rackLocalRequestMap.get(rackname).setNumContainers(
+              rackLocalRequestMap.get(rackname).getNumContainers() + 1);
+        } else {
+          ResourceRequest request =
+              createResourceRequest(cs.getResource(), rackname, priority, 1);
+          rackLocalRequestMap.put(rackname, request);
+        }
+        // check node local
+        String hostname = rackHostNames[1];
+        if (nodeLocalRequestMap.containsKey(hostname)) {
+          nodeLocalRequestMap.get(hostname).setNumContainers(
+              nodeLocalRequestMap.get(hostname).getNumContainers() + 1);
+        } else {
+          ResourceRequest request =
+              createResourceRequest(cs.getResource(), hostname, priority, 1);
+          nodeLocalRequestMap.put(hostname, request);
+        }
       }
       // any
       if (anyRequest == null) {
@@ -382,4 +443,12 @@ public abstract class AMSimulator extends TaskRunner.Task {
   public int getNumTasks() {
     return totalContainers;
   }
+
+  public ApplicationId getApplicationId() {
+    return appId;
+  }
+
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return appAttemptId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
index 5d005df..21bf054 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
@@ -32,7 +32,9 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -40,8 +42,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-
 import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
 import org.apache.hadoop.yarn.sls.SLSRunner;
 import org.slf4j.Logger;
@@ -63,10 +63,10 @@ public class MRAMSimulator extends AMSimulator {
   
   private static final int PRIORITY_REDUCE = 10;
   private static final int PRIORITY_MAP = 20;
-  
+
   // pending maps
   private LinkedList<ContainerSimulator> pendingMaps =
-          new LinkedList<ContainerSimulator>();
+          new LinkedList<>();
   
   // pending failed maps
   private LinkedList<ContainerSimulator> pendingFailedMaps =
@@ -107,106 +107,55 @@ public class MRAMSimulator extends AMSimulator {
   private int mapTotal = 0;
   private int reduceFinished = 0;
   private int reduceTotal = 0;
-  // waiting for AM container 
-  private boolean isAMContainerRunning = false;
-  private Container amContainer;
+
   // finished
   private boolean isFinished = false;
-  // resource for AM container
-  private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024;
-  private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1;
 
-  public final Logger LOG = LoggerFactory.getLogger(MRAMSimulator.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MRAMSimulator.class);
 
-  public void init(int id, int heartbeatInterval,
+  @SuppressWarnings("checkstyle:parameternumber")
+  public void init(int heartbeatInterval,
       List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
       long traceStartTime, long traceFinishTime, String user, String queue, 
-      boolean isTracked, String oldAppId) {
-    super.init(id, heartbeatInterval, containerList, rm, se, 
-              traceStartTime, traceFinishTime, user, queue,
-              isTracked, oldAppId);
+      boolean isTracked, String oldAppId, ReservationSubmissionRequest rr,
+      long baselineStartTimeMS, Resource amContainerResource) {
+    super.init(heartbeatInterval, containerList, rm, se,
+        traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId,
+        rr, baselineStartTimeMS, amContainerResource);
     amtype = "mapreduce";
     
     // get map/reduce tasks
     for (ContainerSimulator cs : containerList) {
       if (cs.getType().equals("map")) {
         cs.setPriority(PRIORITY_MAP);
-        pendingMaps.add(cs);
+        allMaps.add(cs);
       } else if (cs.getType().equals("reduce")) {
         cs.setPriority(PRIORITY_REDUCE);
-        pendingReduces.add(cs);
+        allReduces.add(cs);
       }
     }
-    allMaps.addAll(pendingMaps);
-    allReduces.addAll(pendingReduces);
-    mapTotal = pendingMaps.size();
-    reduceTotal = pendingReduces.size();
+
+    LOG.info("Added new job with {} mapper and {} reducers",
+        allMaps.size(), allReduces.size());
+
+    mapTotal = allMaps.size();
+    reduceTotal = allReduces.size();
     totalContainers = mapTotal + reduceTotal;
   }
 
   @Override
-  public void firstStep() throws Exception {
-    super.firstStep();
-    
-    requestAMContainer();
-  }
-
-  /**
-   * send out request for AM container
-   */
-  protected void requestAMContainer()
-          throws YarnException, IOException, InterruptedException {
-    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
-    ResourceRequest amRequest = createResourceRequest(
-            BuilderUtils.newResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB,
-                    MR_AM_CONTAINER_RESOURCE_VCORES),
-            ResourceRequest.ANY, 1, 1);
-    ask.add(amRequest);
-    LOG.debug("Application {} sends out allocate request for its AM", appId);
-    final AllocateRequest request = this.createAllocateRequest(ask);
-
-    UserGroupInformation ugi =
-            UserGroupInformation.createRemoteUser(appAttemptId.toString());
-    Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()
-            .get(appAttemptId.getApplicationId())
-            .getRMAppAttempt(appAttemptId).getAMRMToken();
-    ugi.addTokenIdentifier(token.decodeIdentifier());
-    AllocateResponse response = ugi.doAs(
-            new PrivilegedExceptionAction<AllocateResponse>() {
-      @Override
-      public AllocateResponse run() throws Exception {
-        return rm.getApplicationMasterService().allocate(request);
-      }
-    });
-    if (response != null) {
-      responseQueue.put(response);
+  public synchronized void notifyAMContainerLaunched(Container masterContainer)
+      throws Exception {
+    if (null != masterContainer) {
+      restart();
+      super.notifyAMContainerLaunched(masterContainer);
     }
   }
 
   @Override
   @SuppressWarnings("unchecked")
-  protected void processResponseQueue()
-          throws InterruptedException, YarnException, IOException {
-    // Check whether receive the am container
-    if (!isAMContainerRunning) {
-      if (!responseQueue.isEmpty()) {
-        AllocateResponse response = responseQueue.take();
-        if (response != null
-            && !response.getAllocatedContainers().isEmpty()) {
-          // Get AM container
-          Container container = response.getAllocatedContainers().get(0);
-          se.getNmMap().get(container.getNodeId())
-              .addNewContainer(container, -1L);
-          // Start AM container
-          amContainer = container;
-          LOG.debug("Application {} starts its AM container ({}).", appId,
-              amContainer.getId());
-          isAMContainerRunning = true;
-        }
-      }
-      return;
-    }
-
+  protected void processResponseQueue() throws Exception {
     while (! responseQueue.isEmpty()) {
       AllocateResponse response = responseQueue.take();
 
@@ -227,11 +176,15 @@ public class MRAMSimulator extends AMSimulator {
               assignedReduces.remove(containerId);
               reduceFinished ++;
               finishedContainers ++;
-            } else {
+            } else if (amContainer.getId().equals(containerId)){
               // am container released event
               isFinished = true;
               LOG.info("Application {} goes to finish.", appId);
             }
+
+            if (mapFinished >= mapTotal && reduceFinished >= reduceTotal) {
+              lastStep();
+            }
           } else {
             // container to be killed
             if (assignedMaps.containsKey(containerId)) {
@@ -242,10 +195,9 @@ public class MRAMSimulator extends AMSimulator {
               LOG.debug("Application {} has one reducer killed ({}).",
                   appId, containerId);
               pendingFailedReduces.add(assignedReduces.remove(containerId));
-            } else {
-              LOG.info("Application {}'s AM is going to be killed." +
-                  " Restarting...", appId);
-              restart();
+            } else if (amContainer.getId().equals(containerId)){
+              LOG.info("Application {}'s AM is " +
+                  "going to be killed. Waiting for rescheduling...", appId);
             }
           }
         }
@@ -253,11 +205,8 @@ public class MRAMSimulator extends AMSimulator {
       
       // check finished
       if (isAMContainerRunning &&
-              (mapFinished == mapTotal) &&
-              (reduceFinished == reduceTotal)) {
-        // to release the AM container
-        se.getNmMap().get(amContainer.getNodeId())
-                .cleanupContainer(amContainer.getId());
+              (mapFinished >= mapTotal) &&
+              (reduceFinished >= reduceTotal)) {
         isAMContainerRunning = false;
         LOG.debug("Application {} sends out event to clean up"
             + " its AM container.", appId);
@@ -291,21 +240,38 @@ public class MRAMSimulator extends AMSimulator {
    */
   private void restart()
           throws YarnException, IOException, InterruptedException {
-    // clear 
-    finishedContainers = 0;
+    // clear
     isFinished = false;
-    mapFinished = 0;
-    reduceFinished = 0;
     pendingFailedMaps.clear();
     pendingMaps.clear();
     pendingReduces.clear();
     pendingFailedReduces.clear();
-    pendingMaps.addAll(allMaps);
-    pendingReduces.addAll(pendingReduces);
-    isAMContainerRunning = false;
+
+    // Only add totalMaps - finishedMaps
+    int added = 0;
+    for (ContainerSimulator cs : allMaps) {
+      if (added >= mapTotal - mapFinished) {
+        break;
+      }
+      pendingMaps.add(cs);
+    }
+
+    // And same, only add totalReduces - finishedReduces
+    added = 0;
+    for (ContainerSimulator cs : allReduces) {
+      if (added >= reduceTotal - reduceFinished) {
+        break;
+      }
+      pendingReduces.add(cs);
+    }
     amContainer = null;
-    // resent am container request
-    requestAMContainer();
+  }
+
+  private List<ContainerSimulator> mergeLists(List<ContainerSimulator> left, List<ContainerSimulator> right) {
+    List<ContainerSimulator> list = new ArrayList<>();
+    list.addAll(left);
+    list.addAll(right);
+    return list;
   }
 
   @Override
@@ -317,42 +283,44 @@ public class MRAMSimulator extends AMSimulator {
 
     // send out request
     List<ResourceRequest> ask = null;
-    if (isAMContainerRunning) {
-      if (mapFinished != mapTotal) {
-        // map phase
-        if (! pendingMaps.isEmpty()) {
-          ask = packageRequests(pendingMaps, PRIORITY_MAP);
-          LOG.debug("Application {} sends out request for {} mappers.",
-              appId, pendingMaps.size());
-          scheduledMaps.addAll(pendingMaps);
-          pendingMaps.clear();
-        } else if (! pendingFailedMaps.isEmpty() && scheduledMaps.isEmpty()) {
-          ask = packageRequests(pendingFailedMaps, PRIORITY_MAP);
-          LOG.debug("Application {} sends out requests for {} failed mappers.",
-              appId, pendingFailedMaps.size());
-          scheduledMaps.addAll(pendingFailedMaps);
-          pendingFailedMaps.clear();
-        }
-      } else if (reduceFinished != reduceTotal) {
-        // reduce phase
-        if (! pendingReduces.isEmpty()) {
-          ask = packageRequests(pendingReduces, PRIORITY_REDUCE);
-          LOG.debug("Application {} sends out requests for {} reducers.",
-              appId, pendingReduces.size());
-          scheduledReduces.addAll(pendingReduces);
-          pendingReduces.clear();
-        } else if (! pendingFailedReduces.isEmpty()
-                && scheduledReduces.isEmpty()) {
-          ask = packageRequests(pendingFailedReduces, PRIORITY_REDUCE);
-          LOG.debug("Application {} sends out request for {} failed reducers.",
-              appId, pendingFailedReduces.size());
-          scheduledReduces.addAll(pendingFailedReduces);
-          pendingFailedReduces.clear();
-        }
+    if (mapFinished != mapTotal) {
+      // map phase
+      if (!pendingMaps.isEmpty()) {
+        ask = packageRequests(mergeLists(pendingMaps, scheduledMaps),
+            PRIORITY_MAP);
+        LOG.debug("Application {} sends out request for {} mappers.",
+            appId, pendingMaps.size());
+        scheduledMaps.addAll(pendingMaps);
+        pendingMaps.clear();
+      } else if (!pendingFailedMaps.isEmpty()) {
+        ask = packageRequests(mergeLists(pendingFailedMaps, scheduledMaps),
+            PRIORITY_MAP);
+        LOG.debug("Application {} sends out requests for {} failed mappers.",
+            appId, pendingFailedMaps.size());
+        scheduledMaps.addAll(pendingFailedMaps);
+        pendingFailedMaps.clear();
+      }
+    } else if (reduceFinished != reduceTotal) {
+      // reduce phase
+      if (!pendingReduces.isEmpty()) {
+        ask = packageRequests(mergeLists(pendingReduces, scheduledReduces),
+            PRIORITY_REDUCE);
+        LOG.debug("Application {} sends out requests for {} reducers.",
+                appId, pendingReduces.size());
+        scheduledReduces.addAll(pendingReduces);
+        pendingReduces.clear();
+      } else if (!pendingFailedReduces.isEmpty()) {
+        ask = packageRequests(mergeLists(pendingFailedReduces, scheduledReduces),
+            PRIORITY_REDUCE);
+        LOG.debug("Application {} sends out request for {} failed reducers.",
+            appId, pendingFailedReduces.size());
+        scheduledReduces.addAll(pendingFailedReduces);
+        pendingFailedReduces.clear();
       }
     }
+
     if (ask == null) {
-      ask = new ArrayList<ResourceRequest>();
+      ask = new ArrayList<>();
     }
     
     final AllocateRequest request = createAllocateRequest(ask);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
index 8fd5b3f..038f202 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.sls.conf;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
 
 @Private
 @Unstable
@@ -62,6 +64,14 @@ public class SLSConfiguration {
   public static final int AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000;
   public static final String AM_TYPE = AM_PREFIX + "type.";
 
+  public static final String AM_CONTAINER_MEMORY = AM_PREFIX +
+      "container.memory";
+  public static final int AM_CONTAINER_MEMORY_DEFAULT = 1024;
+
+  public static final String AM_CONTAINER_VCORES = AM_PREFIX +
+      "container.vcores";
+  public static final int AM_CONTAINER_VCORES_DEFAULT = 1;
+
   // container
   public static final String CONTAINER_PREFIX = PREFIX + "container.";
   public static final String CONTAINER_MEMORY_MB = CONTAINER_PREFIX
@@ -70,4 +80,9 @@ public class SLSConfiguration {
   public static final String CONTAINER_VCORES = CONTAINER_PREFIX + "vcores";
   public static final int CONTAINER_VCORES_DEFAULT = 1;
 
+  public static Resource getAMContainerResource(Configuration conf) {
+    return Resource.newInstance(
+        conf.getLong(AM_CONTAINER_MEMORY, AM_CONTAINER_MEMORY_DEFAULT),
+        conf.getInt(AM_CONTAINER_VCORES, AM_CONTAINER_VCORES_DEFAULT));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
index fb1c1f4..9197b1e 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.sls.nodemanager;
 
 import java.io.IOException;
-import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
new file mode 100644
index 0000000..b4ffb61
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.resourcemanager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.sls.SLSRunner;
+import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
+
+import java.util.Map;
+
+public class MockAMLauncher extends ApplicationMasterLauncher
+    implements EventHandler<AMLauncherEvent> {
+  private static final Log LOG = LogFactory.getLog(
+      MockAMLauncher.class);
+
+  Map<String, AMSimulator> amMap;
+  SLSRunner se;
+
+  public MockAMLauncher(SLSRunner se, RMContext rmContext,
+      Map<String, AMSimulator> amMap) {
+    super(rmContext);
+    this.amMap = amMap;
+    this.se = se;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    // Do nothing
+  }
+
+  private void setupAMRMToken(RMAppAttempt appAttempt) {
+    // Setup AMRMToken
+    Token<AMRMTokenIdentifier> amrmToken =
+        super.context.getAMRMTokenSecretManager().createAndGetAMRMToken(
+            appAttempt.getAppAttemptId());
+    ((RMAppAttemptImpl) appAttempt).setAMRMToken(amrmToken);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void handle(AMLauncherEvent event) {
+    if (AMLauncherEventType.LAUNCH == event.getType()) {
+      ApplicationId appId =
+          event.getAppAttempt().getAppAttemptId().getApplicationId();
+
+      // find AMSimulator
+      for (AMSimulator ams : amMap.values()) {
+        if (ams.getApplicationId() != null && ams.getApplicationId().equals(
+            appId)) {
+          try {
+            Container amContainer = event.getAppAttempt().getMasterContainer();
+
+            setupAMRMToken(event.getAppAttempt());
+
+            // Notify RMAppAttempt to change state
+            super.context.getDispatcher().getEventHandler().handle(
+                new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(),
+                    RMAppAttemptEventType.LAUNCHED));
+
+            ams.notifyAMContainerLaunched(
+                event.getAppAttempt().getMasterContainer());
+            LOG.info("Notify AM launcher launched:" + amContainer.getId());
+
+            se.getNmMap().get(amContainer.getNodeId())
+                .addNewContainer(amContainer, 100000000L);
+
+            return;
+          } catch (Exception e) {
+            throw new YarnRuntimeException(e);
+          }
+        }
+      }
+
+      throw new YarnRuntimeException(
+          "Didn't find any AMSimulator for applicationId=" + appId);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message