hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From viraj...@apache.org
Subject [29/37] hadoop git commit: YARN-6363. Extending SLS: Synthetic Load Generator. (Carlo Curino via wangda)
Date Fri, 21 Apr 2017 21:36:13 GMT
YARN-6363. Extending SLS: Synthetic Load Generator. (Carlo Curino via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/de69d6e8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/de69d6e8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/de69d6e8

Branch: refs/heads/HDFS-9806
Commit: de69d6e81128470dd5d2fd865d4b3a79188f740b
Parents: 667966c
Author: Wangda Tan <wangda@apache.org>
Authored: Thu Apr 20 21:54:18 2017 -0700
Committer: Wangda Tan <wangda@apache.org>
Committed: Thu Apr 20 21:54:30 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/tools/rumen/TaskInfo.java |  29 +-
 .../apache/hadoop/tools/rumen/ZombieJob.java    |   9 +-
 hadoop-tools/hadoop-sls/pom.xml                 |   3 +
 hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh  |  36 +-
 .../hadoop/yarn/sls/ReservationClientUtil.java  |  78 +++
 .../org/apache/hadoop/yarn/sls/SLSRunner.java   | 569 +++++++++++++------
 .../hadoop/yarn/sls/appmaster/AMSimulator.java  |  89 ++-
 .../yarn/sls/appmaster/MRAMSimulator.java       |   9 +-
 .../sls/resourcemanager/MockAMLauncher.java     |   5 +
 .../sls/scheduler/SLSCapacityScheduler.java     |  24 +-
 .../yarn/sls/scheduler/SLSFairScheduler.java    |  22 +-
 .../hadoop/yarn/sls/scheduler/TaskRunner.java   |   9 +-
 .../hadoop/yarn/sls/synthetic/SynthJob.java     | 306 ++++++++++
 .../yarn/sls/synthetic/SynthJobClass.java       | 180 ++++++
 .../sls/synthetic/SynthTraceJobProducer.java    | 316 ++++++++++
 .../hadoop/yarn/sls/synthetic/SynthUtils.java   | 101 ++++
 .../yarn/sls/synthetic/SynthWorkload.java       | 121 ++++
 .../hadoop/yarn/sls/synthetic/package-info.java |  22 +
 .../apache/hadoop/yarn/sls/utils/SLSUtils.java  |   9 +
 .../src/site/markdown/SchedulerLoadSimulator.md | 122 +++-
 .../hadoop/yarn/sls/BaseSLSRunnerTest.java      | 120 ++++
 .../apache/hadoop/yarn/sls/TestSLSRunner.java   |  90 +--
 .../hadoop/yarn/sls/TestSynthJobGeneration.java |  96 ++++
 .../yarn/sls/appmaster/TestAMSimulator.java     |   2 +-
 .../yarn/sls/scheduler/TestTaskRunner.java      |   2 +-
 .../src/test/resources/capacity-scheduler.xml   |  10 +
 .../src/test/resources/fair-scheduler.xml       |   8 +-
 .../hadoop-sls/src/test/resources/inputsls.json |  55 ++
 .../hadoop-sls/src/test/resources/nodes.json    |  84 +++
 .../src/test/resources/sls-runner.xml           |   6 +-
 .../hadoop-sls/src/test/resources/syn.json      |  53 ++
 .../hadoop-sls/src/test/resources/yarn-site.xml |  10 +-
 32 files changed, 2303 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java
index 9aa6373..6159f85 100644
--- a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java
+++ b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java
@@ -23,21 +23,37 @@ public class TaskInfo {
   private final long bytesOut;
   private final int recsOut;
   private final long maxMemory;
+  private final long maxVcores;
   private final ResourceUsageMetrics metrics;
 
+
+  public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut,
+                  long maxMemory) {
+    this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, 1,
+        new ResourceUsageMetrics());
+  }
+
   public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut,
-      long maxMemory) {
-    this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, 
+                  long maxMemory, ResourceUsageMetrics
+                      metrics) {
+    this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, 1, metrics);
+  }
+
+  public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut,
+      long maxMemory, long maxVcores) {
+    this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, maxVcores,
          new ResourceUsageMetrics());
   }
   
   public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut,
-                  long maxMemory, ResourceUsageMetrics metrics) {
+                  long maxMemory, long maxVcores, ResourceUsageMetrics
+                      metrics) {
     this.bytesIn = bytesIn;
     this.recsIn = recsIn;
     this.bytesOut = bytesOut;
     this.recsOut = recsOut;
     this.maxMemory = maxMemory;
+    this.maxVcores = maxVcores;
     this.metrics = metrics;
   }
 
@@ -79,6 +95,13 @@ public class TaskInfo {
   }
 
   /**
+   * @return Vcores used by the task.
+   */
+  public long getTaskVCores() {
+    return maxVcores;
+  }
+
+  /**
    * @return Resource usage metrics
    */
   public ResourceUsageMetrics getResourceUsageMetrics() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/ZombieJob.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/ZombieJob.java b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/ZombieJob.java
index 3857e1f..6400840 100644
--- a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/ZombieJob.java
+++ b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/ZombieJob.java
@@ -426,7 +426,7 @@ public class ZombieJob implements JobStory {
     LoggedTask loggedTask = getLoggedTask(taskType, taskNumber);
     if (loggedTask == null) {
       // TODO insert parameters
-      TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0);
+      TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0, 0);
       return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
           taskNumber, locality);
     }
@@ -473,7 +473,7 @@ public class ZombieJob implements JobStory {
     LoggedTask loggedTask = getLoggedTask(taskType, taskNumber);
     if (loggedTask == null) {
       // TODO insert parameters
-      TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0);
+      TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0, 0);
       return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
           taskNumber, locality);
     }
@@ -639,7 +639,7 @@ public class ZombieJob implements JobStory {
 
   private TaskInfo getTaskInfo(LoggedTask loggedTask) {
     if (loggedTask == null) {
-      return new TaskInfo(0, 0, 0, 0, 0);
+      return new TaskInfo(0, 0, 0, 0, 0, 0);
     }
     List<LoggedTaskAttempt> attempts = loggedTask.getAttempts();
 
@@ -688,9 +688,10 @@ public class ZombieJob implements JobStory {
       break;
     }
 
+    //note: hardcoding vCores, as they are not collected
     TaskInfo taskInfo =
         new TaskInfo(inputBytes, (int) inputRecords, outputBytes,
-            (int) outputRecords, (int) heapMegabytes,
+            (int) outputRecords, (int) heapMegabytes, 1,
             metrics);
     return taskInfo;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/pom.xml b/hadoop-tools/hadoop-sls/pom.xml
index 0d4ef58..8fb57f3 100644
--- a/hadoop-tools/hadoop-sls/pom.xml
+++ b/hadoop-tools/hadoop-sls/pom.xml
@@ -132,6 +132,9 @@
             <exclude>src/test/resources/simulate.html.template</exclude>
             <exclude>src/test/resources/simulate.info.html.template</exclude>
             <exclude>src/test/resources/track.html.template</exclude>
+            <exclude>src/test/resources/syn.json</exclude>
+            <exclude>src/test/resources/inputsls.json</exclude>
+            <exclude>src/test/resources/nodes.json</exclude>
           </excludes>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh b/hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh
index 5f8d9fc..cbc5bc9 100644
--- a/hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh
+++ b/hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh
@@ -16,7 +16,9 @@
 function hadoop_usage()
 {
   echo "Usage: slsrun.sh <OPTIONS> "
-  echo "                 --input-rumen=<FILE1,FILE2,...>  | --input-sls=<FILE1,FILE2,...>"
+  echo "                 --tracetype=<SYNTH | SLS | RUMEN>"
+  echo "                 --tracelocation=<FILE1,FILE2,...>"
+  echo "                 (deprecated --input-rumen=<FILE1,FILE2,...>  | --input-sls=<FILE1,FILE2,...>)"
   echo "                 --output-dir=<SLS_SIMULATION_OUTPUT_DIRECTORY>"
   echo "                 [--nodes=<SLS_NODES_FILE>]"
   echo "                 [--track-jobs=<JOBID1,JOBID2,...>]"
@@ -33,6 +35,12 @@ function parse_args()
       --input-sls=*)
         inputsls=${i#*=}
       ;;
+      --tracetype=*)
+        tracetype=${i#*=}
+      ;;
+      --tracelocation=*)
+        tracelocation=${i#*=}
+      ;;
       --output-dir=*)
         outputdir=${i#*=}
       ;;
@@ -52,14 +60,12 @@ function parse_args()
     esac
   done
 
-  if [[ -z "${inputrumen}" && -z "${inputsls}" ]] ; then
-    hadoop_error "ERROR: Either --input-rumen or --input-sls must be specified."
-    hadoop_exit_with_usage 1
+  if [[ -z "${inputrumen}" && -z "${inputsls}" && -z "${tracetype}" ]] ; then
+    hadoop_error "ERROR: Either --input-rumen, --input-sls, or --tracetype (with --tracelocation) must be specified."
   fi
 
-  if [[ -n "${inputrumen}" && -n "${inputsls}" ]] ; then
-    hadoop_error "ERROR: Only specify one of --input-rumen or --input-sls."
-    hadoop_exit_with_usage 1
+  if [[ -n "${inputrumen}" && -n "${inputsls}" && -n "${tracetype}" ]] ; then
+    hadoop_error "ERROR: Only specify one of --input-rumen, --input-sls, or --tracetype (with --tracelocation)"
   fi
 
   if [[ -z "${outputdir}" ]] ; then
@@ -74,11 +80,17 @@ function calculate_classpath
 }
 
 function run_simulation() {
-  if [[ "${inputsls}" == "" ]] ; then
-    hadoop_add_param args -inputrumen "-inputrumen ${inputrumen}"
-  else
-    hadoop_add_param args -inputsls "-inputsls ${inputsls}"
-  fi
+
+   if [[ "${inputsls}" != "" ]] ; then
+        hadoop_add_param args -inputsls "-inputsls ${inputsls}"
+   fi
+   if [[ "${inputrumen}" != "" ]] ; then
+        hadoop_add_param args -inputrumen "-inputrumen ${inputrumen}"
+   fi
+   if [[ "${tracetype}" != "" ]] ; then
+        hadoop_add_param args -tracetype "-tracetype ${tracetype}"
+        hadoop_add_param args -tracelocation "-tracelocation ${tracelocation}"
+   fi
 
   hadoop_add_param args -output "-output ${outputdir}"
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ReservationClientUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ReservationClientUtil.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ReservationClientUtil.java
new file mode 100644
index 0000000..7c10a57
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ReservationClientUtil.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.records.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Simple support class, used to create reservation requests.
+ */
+public final class ReservationClientUtil {
+
+  private ReservationClientUtil(){
+    //avoid instantiation
+  }
+
+  /**
+   * Creates a request that envelopes a MR jobs, picking max number of maps and
+   * reducers, max durations, and max resources per container.
+   *
+   * @param reservationId the id of the reservation
+   * @param name the name of a reservation
+   * @param maxMapRes maximum resources used by any mapper
+   * @param numberMaps number of mappers
+   * @param maxMapDur maximum duration of any mapper
+   * @param maxRedRes maximum resources used by any reducer
+   * @param numberReduces number of reducers
+   * @param maxRedDur maximum duration of any reducer
+   * @param arrival start time of valid range for reservation
+   * @param deadline deadline for this reservation
+   * @param queueName queue to submit to
+   * @return a submission request
+   */
+  @SuppressWarnings("checkstyle:parameternumber")
+  public static ReservationSubmissionRequest createMRReservation(
+      ReservationId reservationId, String name, Resource maxMapRes,
+      int numberMaps, long maxMapDur, Resource maxRedRes, int numberReduces,
+      long maxRedDur, long arrival, long deadline, String queueName) {
+
+    ReservationRequest mapRR = ReservationRequest.newInstance(maxMapRes,
+        numberMaps, numberMaps, maxMapDur);
+    ReservationRequest redRR = ReservationRequest.newInstance(maxRedRes,
+        numberReduces, numberReduces, maxRedDur);
+
+    List<ReservationRequest> listResReq = new ArrayList<ReservationRequest>();
+    listResReq.add(mapRR);
+    listResReq.add(redRR);
+
+    ReservationRequests reservationRequests = ReservationRequests
+        .newInstance(listResReq, ReservationRequestInterpreter.R_ORDER_NO_GAP);
+    ReservationDefinition resDef = ReservationDefinition.newInstance(arrival,
+        deadline, reservationRequests, name);
+
+    // outermost request
+    ReservationSubmissionRequest request = ReservationSubmissionRequest
+        .newInstance(resDef, queueName, reservationId);
+
+    return request;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index ba43816..523d22a 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -41,17 +41,25 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.tools.rumen.JobTraceReader;
 import org.apache.hadoop.tools.rumen.LoggedJob;
 import org.apache.hadoop.tools.rumen.LoggedTask;
 import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -67,25 +75,27 @@ import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
 import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
 import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher;
 import org.apache.hadoop.yarn.sls.scheduler.*;
+import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
+import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
 import org.apache.hadoop.yarn.sls.utils.SLSUtils;
+import org.apache.hadoop.yarn.util.UTCClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.log4j.Logger;
 
 @Private
 @Unstable
-public class SLSRunner {
+public class SLSRunner extends Configured implements Tool {
   // RM, Runner
   private ResourceManager rm;
   private static TaskRunner runner = new TaskRunner();
   private String[] inputTraces;
-  private Configuration conf;
   private Map<String, Integer> queueAppNumMap;
-  
+
   // NM simulator
   private HashMap<NodeId, NMSimulator> nmMap;
   private int nmMemoryMB, nmVCores;
   private String nodeFile;
-  
+
   // AM simulator
   private int AM_ID;
   private Map<String, AMSimulator> amMap;
@@ -106,43 +116,64 @@ public class SLSRunner {
   // logger
   public final static Logger LOG = Logger.getLogger(SLSRunner.class);
 
-  // input traces, input-rumen or input-sls
-  private boolean isSLS;
-  
-  public SLSRunner(boolean isSLS, String inputTraces[], String nodeFile,
-                   String outputDir, Set<String> trackedApps,
-                   boolean printsimulation)
-          throws IOException, ClassNotFoundException {
-    this.isSLS = isSLS;
-    this.inputTraces = inputTraces.clone();
-    this.nodeFile = nodeFile;
-    this.trackedApps = trackedApps;
-    this.printSimulation = printsimulation;
-    metricsOutputDir = outputDir;
-    
+  /**
+   * The type of trace in input.
+   */
+  public enum TraceType {
+    SLS, RUMEN, SYNTH
+  }
+
+  private TraceType inputType;
+  private SynthTraceJobProducer stjp;
+
+  public SLSRunner() throws ClassNotFoundException {
+    Configuration tempConf = new Configuration(false);
+    init(tempConf);
+  }
+
+  public SLSRunner(Configuration tempConf) throws ClassNotFoundException {
+    init(tempConf);
+  }
+
+  private void init(Configuration tempConf) throws ClassNotFoundException {
     nmMap = new HashMap<>();
     queueAppNumMap = new HashMap<>();
     amMap = new ConcurrentHashMap<>();
     amClassMap = new HashMap<>();
-    
+
     // runner configuration
-    conf = new Configuration(false);
-    conf.addResource("sls-runner.xml");
+    tempConf.addResource("sls-runner.xml");
+    super.setConf(tempConf);
+
     // runner
-    int poolSize = conf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, 
-                                SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); 
+    int poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE,
+        SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT);
     SLSRunner.runner.setQueueSize(poolSize);
     // <AMType, Class> map
-    for (Map.Entry e : conf) {
+    for (Map.Entry e : tempConf) {
       String key = e.getKey().toString();
       if (key.startsWith(SLSConfiguration.AM_TYPE)) {
         String amType = key.substring(SLSConfiguration.AM_TYPE.length());
-        amClassMap.put(amType, Class.forName(conf.get(key)));
+        amClassMap.put(amType, Class.forName(tempConf.get(key)));
       }
     }
   }
-  
-  public void start() throws Exception {
+
+  public void setSimulationParams(TraceType inType, String[] inTraces,
+      String nodes, String outDir, Set<String> trackApps,
+      boolean printsimulation) throws IOException, ClassNotFoundException {
+
+    this.inputType = inType;
+    this.inputTraces = inTraces.clone();
+    this.nodeFile = nodes;
+    this.trackedApps = trackApps;
+    this.printSimulation = printsimulation;
+    metricsOutputDir = outDir;
+
+  }
+
+  public void start() throws IOException, ClassNotFoundException, YarnException,
+      InterruptedException {
     // start resource manager
     startRM();
     // start node managers
@@ -151,9 +182,9 @@ public class SLSRunner {
     startAM();
     // set queue & tracked apps information
     ((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
-                            .setQueueSet(this.queueAppNumMap.keySet());
+        .setQueueSet(this.queueAppNumMap.keySet());
     ((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
-                            .setTrackedAppSet(this.trackedApps);
+        .setTrackedAppSet(this.trackedApps);
     // print out simulation info
     printSimulationInfo();
     // blocked until all nodes RUNNING
@@ -162,23 +193,23 @@ public class SLSRunner {
     runner.start();
   }
 
-  private void startRM() throws Exception {
-    Configuration rmConf = new YarnConfiguration();
+  private void startRM() throws ClassNotFoundException, YarnException {
+    Configuration rmConf = new YarnConfiguration(getConf());
     String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER);
 
     // For CapacityScheduler we use a sub-classing instead of wrapping
     // to allow scheduler-specific invocations from monitors to work
     // this can be used for other schedulers as well if we care to
     // exercise/track behaviors that are not common to the scheduler api
-    if(Class.forName(schedulerClass) == CapacityScheduler.class) {
+    if (Class.forName(schedulerClass) == CapacityScheduler.class) {
       rmConf.set(YarnConfiguration.RM_SCHEDULER,
           SLSCapacityScheduler.class.getName());
     } else if (Class.forName(schedulerClass) == FairScheduler.class) {
       rmConf.set(YarnConfiguration.RM_SCHEDULER,
           SLSFairScheduler.class.getName());
-    } else if (Class.forName(schedulerClass) == FifoScheduler.class){
+    } else if (Class.forName(schedulerClass) == FifoScheduler.class) {
       // TODO add support for FifoScheduler
-      throw new Exception("Fifo Scheduler is not supported yet.");
+      throw new YarnException("Fifo Scheduler is not supported yet.");
     }
 
     rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir);
@@ -196,37 +227,47 @@ public class SLSRunner {
 
   private void startNM() throws YarnException, IOException {
     // nm configuration
-    nmMemoryMB = conf.getInt(SLSConfiguration.NM_MEMORY_MB,
-            SLSConfiguration.NM_MEMORY_MB_DEFAULT);
-    nmVCores = conf.getInt(SLSConfiguration.NM_VCORES,
-            SLSConfiguration.NM_VCORES_DEFAULT);
-    int heartbeatInterval = conf.getInt(
-            SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS,
+    nmMemoryMB = getConf().getInt(SLSConfiguration.NM_MEMORY_MB,
+        SLSConfiguration.NM_MEMORY_MB_DEFAULT);
+    nmVCores = getConf().getInt(SLSConfiguration.NM_VCORES,
+        SLSConfiguration.NM_VCORES_DEFAULT);
+    int heartbeatInterval =
+        getConf().getInt(SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS,
             SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT);
     // nm information (fetch from topology file, or from sls/rumen json file)
     Set<String> nodeSet = new HashSet<String>();
     if (nodeFile.isEmpty()) {
-      if (isSLS) {
-        for (String inputTrace : inputTraces) {
+      for (String inputTrace : inputTraces) {
+
+        switch (inputType) {
+        case SLS:
           nodeSet.addAll(SLSUtils.parseNodesFromSLSTrace(inputTrace));
-        }
-      } else {
-        for (String inputTrace : inputTraces) {
+          break;
+        case RUMEN:
           nodeSet.addAll(SLSUtils.parseNodesFromRumenTrace(inputTrace));
+          break;
+        case SYNTH:
+          stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
+          nodeSet.addAll(SLSUtils.generateNodesFromSynth(stjp.getNumNodes(),
+              stjp.getNodesPerRack()));
+          break;
+        default:
+          throw new YarnException("Input configuration not recognized, "
+              + "trace type should be SLS, RUMEN, or SYNTH");
         }
       }
-
     } else {
       nodeSet.addAll(SLSUtils.parseNodesFromNodeFile(nodeFile));
     }
+
     // create NM simulators
     Random random = new Random();
     Set<String> rackSet = new HashSet<String>();
     for (String hostName : nodeSet) {
       // we randomize the heartbeat start time from zero to 1 interval
       NMSimulator nm = new NMSimulator();
-      nm.init(hostName, nmMemoryMB, nmVCores, 
-          random.nextInt(heartbeatInterval), heartbeatInterval, rm);
+      nm.init(hostName, nmMemoryMB, nmVCores, random.nextInt(heartbeatInterval),
+          heartbeatInterval, rm);
       nmMap.put(nm.getNode().getNodeID(), nm);
       runner.schedule(nm);
       rackSet.add(nm.getNode().getRackName());
@@ -241,39 +282,50 @@ public class SLSRunner {
       int numRunningNodes = 0;
       for (RMNode node : rm.getRMContext().getRMNodes().values()) {
         if (node.getState() == NodeState.RUNNING) {
-          numRunningNodes ++;
+          numRunningNodes++;
         }
       }
       if (numRunningNodes == numNMs) {
         break;
       }
-      LOG.info(MessageFormat.format("SLSRunner is waiting for all " +
-              "nodes RUNNING. {0} of {1} NMs initialized.",
-              numRunningNodes, numNMs));
+      LOG.info(MessageFormat.format(
+          "SLSRunner is waiting for all "
+              + "nodes RUNNING. {0} of {1} NMs initialized.",
+          numRunningNodes, numNMs));
       Thread.sleep(1000);
     }
     LOG.info(MessageFormat.format("SLSRunner takes {0} ms to launch all nodes.",
-            (System.currentTimeMillis() - startTimeMS)));
+        (System.currentTimeMillis() - startTimeMS)));
   }
 
   @SuppressWarnings("unchecked")
   private void startAM() throws YarnException, IOException {
     // application/container configuration
-    int heartbeatInterval = conf.getInt(
-            SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS,
+    int heartbeatInterval =
+        getConf().getInt(SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS,
             SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
-    int containerMemoryMB = conf.getInt(SLSConfiguration.CONTAINER_MEMORY_MB,
+    int containerMemoryMB =
+        getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB,
             SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT);
-    int containerVCores = conf.getInt(SLSConfiguration.CONTAINER_VCORES,
-            SLSConfiguration.CONTAINER_VCORES_DEFAULT);
+    int containerVCores = getConf().getInt(SLSConfiguration.CONTAINER_VCORES,
+        SLSConfiguration.CONTAINER_VCORES_DEFAULT);
     Resource containerResource =
-            BuilderUtils.newResource(containerMemoryMB, containerVCores);
+        BuilderUtils.newResource(containerMemoryMB, containerVCores);
 
     // application workload
-    if (isSLS) {
+    switch (inputType) {
+    case SLS:
       startAMFromSLSTraces(containerResource, heartbeatInterval);
-    } else {
+      break;
+    case RUMEN:
       startAMFromRumenTraces(containerResource, heartbeatInterval);
+      break;
+    case SYNTH:
+      startAMFromSynthGenerator(heartbeatInterval);
+      break;
+    default:
+      throw new YarnException("Input configuration not recognized, "
+          + "trace type should be SLS, RUMEN, or SYNTH");
     }
     numAMs = amMap.size();
     remainingApps = numAMs;
@@ -284,7 +336,7 @@ public class SLSRunner {
    */
   @SuppressWarnings("unchecked")
   private void startAMFromSLSTraces(Resource containerResource,
-                                    int heartbeatInterval) throws IOException {
+      int heartbeatInterval) throws IOException {
     // parse from sls traces
     JsonFactory jsonF = new JsonFactory();
     ObjectMapper mapper = new ObjectMapper();
@@ -292,26 +344,28 @@ public class SLSRunner {
       Reader input =
           new InputStreamReader(new FileInputStream(inputTrace), "UTF-8");
       try {
-        Iterator<Map> i = mapper.readValues(jsonF.createParser(input),
-                Map.class);
+        Iterator<Map> i =
+            mapper.readValues(jsonF.createParser(input), Map.class);
         while (i.hasNext()) {
           Map jsonJob = i.next();
 
           // load job information
-          long jobStartTime = Long.parseLong(
-                  jsonJob.get("job.start.ms").toString());
-          long jobFinishTime = Long.parseLong(
-                  jsonJob.get("job.end.ms").toString());
+          long jobStartTime =
+              Long.parseLong(jsonJob.get("job.start.ms").toString());
+          long jobFinishTime =
+              Long.parseLong(jsonJob.get("job.end.ms").toString());
 
           String user = (String) jsonJob.get("job.user");
-          if (user == null)  user = "default";
+          if (user == null) {
+            user = "default";
+          }
           String queue = jsonJob.get("job.queue.name").toString();
 
           String oldAppId = jsonJob.get("job.id").toString();
           boolean isTracked = trackedApps.contains(oldAppId);
-          int queueSize = queueAppNumMap.containsKey(queue) ?
-                  queueAppNumMap.get(queue) : 0;
-          queueSize ++;
+          int queueSize =
+              queueAppNumMap.containsKey(queue) ? queueAppNumMap.get(queue) : 0;
+          queueSize++;
           queueAppNumMap.put(queue, queueSize);
           // tasks
           List tasks = (List) jsonJob.get("job.tasks");
@@ -319,45 +373,45 @@ public class SLSRunner {
             continue;
           }
           List<ContainerSimulator> containerList =
-                  new ArrayList<ContainerSimulator>();
+              new ArrayList<ContainerSimulator>();
           for (Object o : tasks) {
             Map jsonTask = (Map) o;
             String hostname = jsonTask.get("container.host").toString();
-            long taskStart = Long.parseLong(
-                    jsonTask.get("container.start.ms").toString());
-            long taskFinish = Long.parseLong(
-                    jsonTask.get("container.end.ms").toString());
+            long taskStart =
+                Long.parseLong(jsonTask.get("container.start.ms").toString());
+            long taskFinish =
+                Long.parseLong(jsonTask.get("container.end.ms").toString());
             long lifeTime = taskFinish - taskStart;
 
             // Set memory and vcores from job trace file
             Resource res = Resources.clone(containerResource);
             if (jsonTask.containsKey("container.memory")) {
-              int containerMemory = Integer.parseInt(
-                  jsonTask.get("container.memory").toString());
+              int containerMemory =
+                  Integer.parseInt(jsonTask.get("container.memory").toString());
               res.setMemorySize(containerMemory);
             }
 
             if (jsonTask.containsKey("container.vcores")) {
-              int containerVCores = Integer.parseInt(
-                  jsonTask.get("container.vcores").toString());
+              int containerVCores =
+                  Integer.parseInt(jsonTask.get("container.vcores").toString());
               res.setVirtualCores(containerVCores);
             }
 
-            int priority = Integer.parseInt(
-                    jsonTask.get("container.priority").toString());
+            int priority =
+                Integer.parseInt(jsonTask.get("container.priority").toString());
             String type = jsonTask.get("container.type").toString();
-            containerList.add(new ContainerSimulator(res,
-                    lifeTime, hostname, priority, type));
+            containerList.add(new ContainerSimulator(res, lifeTime, hostname,
+                priority, type));
           }
 
           // create a new AM
           String amType = jsonJob.get("am.type").toString();
-          AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
-                  amClassMap.get(amType), new Configuration());
+          AMSimulator amSim = (AMSimulator) ReflectionUtils
+              .newInstance(amClassMap.get(amType), new Configuration());
           if (amSim != null) {
-            amSim.init(AM_ID++, heartbeatInterval, containerList, rm,
-                    this, jobStartTime, jobFinishTime, user, queue,
-                    isTracked, oldAppId);
+            amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this,
+                jobStartTime, jobFinishTime, user, queue, isTracked, oldAppId,
+                null, runner.getStartTimeMS());
             runner.schedule(amSim);
             maxRuntime = Math.max(maxRuntime, jobFinishTime);
             numTasks += containerList.size();
@@ -375,22 +429,21 @@ public class SLSRunner {
    */
   @SuppressWarnings("unchecked")
   private void startAMFromRumenTraces(Resource containerResource,
-                                      int heartbeatInterval)
-          throws IOException {
+      int heartbeatInterval) throws IOException {
     Configuration conf = new Configuration();
     conf.set("fs.defaultFS", "file:///");
     long baselineTimeMS = 0;
     for (String inputTrace : inputTraces) {
       File fin = new File(inputTrace);
-      JobTraceReader reader = new JobTraceReader(
-              new Path(fin.getAbsolutePath()), conf);
+      JobTraceReader reader =
+          new JobTraceReader(new Path(fin.getAbsolutePath()), conf);
       try {
         LoggedJob job = null;
         while ((job = reader.getNext()) != null) {
           // only support MapReduce currently
           String jobType = "mapreduce";
-          String user = job.getUser() == null ?
-                  "default" : job.getUser().getValue();
+          String user =
+              job.getUser() == null ? "default" : job.getUser().getValue();
           String jobQueue = job.getQueue().getValue();
           String oldJobId = job.getJobID().toString();
           long jobStartTimeMS = job.getSubmitTime();
@@ -407,48 +460,48 @@ public class SLSRunner {
           }
 
           boolean isTracked = trackedApps.contains(oldJobId);
-          int queueSize = queueAppNumMap.containsKey(jobQueue) ?
-                  queueAppNumMap.get(jobQueue) : 0;
-          queueSize ++;
+          int queueSize = queueAppNumMap.containsKey(jobQueue)
+              ? queueAppNumMap.get(jobQueue) : 0;
+          queueSize++;
           queueAppNumMap.put(jobQueue, queueSize);
 
           List<ContainerSimulator> containerList =
-                  new ArrayList<ContainerSimulator>();
+              new ArrayList<ContainerSimulator>();
           // map tasks
-          for(LoggedTask mapTask : job.getMapTasks()) {
+          for (LoggedTask mapTask : job.getMapTasks()) {
             if (mapTask.getAttempts().size() == 0) {
               continue;
             }
-            LoggedTaskAttempt taskAttempt = mapTask.getAttempts()
-                    .get(mapTask.getAttempts().size() - 1);
+            LoggedTaskAttempt taskAttempt =
+                mapTask.getAttempts().get(mapTask.getAttempts().size() - 1);
             String hostname = taskAttempt.getHostName().getValue();
-            long containerLifeTime = taskAttempt.getFinishTime()
-                    - taskAttempt.getStartTime();
+            long containerLifeTime =
+                taskAttempt.getFinishTime() - taskAttempt.getStartTime();
             containerList.add(new ContainerSimulator(containerResource,
-                    containerLifeTime, hostname, 10, "map"));
+                containerLifeTime, hostname, 10, "map"));
           }
 
           // reduce tasks
-          for(LoggedTask reduceTask : job.getReduceTasks()) {
+          for (LoggedTask reduceTask : job.getReduceTasks()) {
             if (reduceTask.getAttempts().size() == 0) {
               continue;
             }
             LoggedTaskAttempt taskAttempt = reduceTask.getAttempts()
-                    .get(reduceTask.getAttempts().size() - 1);
+                .get(reduceTask.getAttempts().size() - 1);
             String hostname = taskAttempt.getHostName().getValue();
-            long containerLifeTime = taskAttempt.getFinishTime()
-                    - taskAttempt.getStartTime();
+            long containerLifeTime =
+                taskAttempt.getFinishTime() - taskAttempt.getStartTime();
             containerList.add(new ContainerSimulator(containerResource,
-                    containerLifeTime, hostname, 20, "reduce"));
+                containerLifeTime, hostname, 20, "reduce"));
           }
 
           // create a new AM
-          AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
-                  amClassMap.get(jobType), conf);
+          AMSimulator amSim = (AMSimulator) ReflectionUtils
+              .newInstance(amClassMap.get(jobType), conf);
           if (amSim != null) {
-            amSim.init(AM_ID ++, heartbeatInterval, containerList,
-                    rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue,
-                    isTracked, oldJobId);
+            amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this,
+                jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked,
+                oldJobId, null, runner.getStartTimeMS());
             runner.schedule(amSim);
             maxRuntime = Math.max(maxRuntime, jobFinishTimeMS);
             numTasks += containerList.size();
@@ -460,34 +513,168 @@ public class SLSRunner {
       }
     }
   }
-  
+
+  /**
+   * parse workload information from synth-generator trace files.
+   */
+  @SuppressWarnings("unchecked")
+  private void startAMFromSynthGenerator(int heartbeatInterval)
+      throws IOException {
+    Configuration localConf = new Configuration();
+    localConf.set("fs.defaultFS", "file:///");
+    long baselineTimeMS = 0;
+
+    // reservations use wall clock time, so need to have a reference for that
+    UTCClock clock = new UTCClock();
+    long now = clock.getTime();
+
+    try {
+
+      // if we use the nodeFile this could have been not initialized yet.
+      if (stjp == null) {
+        stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
+      }
+
+      SynthJob job = null;
+      // we use stjp, a reference to the job producer instantiated during node
+      // creation
+      while ((job = (SynthJob) stjp.getNextJob()) != null) {
+        // only support MapReduce currently
+        String jobType = "mapreduce";
+        String user = job.getUser();
+        String jobQueue = job.getQueueName();
+        String oldJobId = job.getJobID().toString();
+        long jobStartTimeMS = job.getSubmissionTime();
+
+        // CARLO: Finish time is only used for logging, omit for now
+        long jobFinishTimeMS = -1L;
+
+        if (baselineTimeMS == 0) {
+          baselineTimeMS = jobStartTimeMS;
+        }
+        jobStartTimeMS -= baselineTimeMS;
+        jobFinishTimeMS -= baselineTimeMS;
+        if (jobStartTimeMS < 0) {
+          LOG.warn("Warning: reset job " + oldJobId + " start time to 0.");
+          jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
+          jobStartTimeMS = 0;
+        }
+
+        boolean isTracked = trackedApps.contains(oldJobId);
+        int queueSize = queueAppNumMap.containsKey(jobQueue)
+            ? queueAppNumMap.get(jobQueue) : 0;
+        queueSize++;
+        queueAppNumMap.put(jobQueue, queueSize);
+
+        List<ContainerSimulator> containerList =
+            new ArrayList<ContainerSimulator>();
+        ArrayList<NodeId> keyAsArray = new ArrayList<NodeId>(nmMap.keySet());
+        Random rand = new Random(stjp.getSeed());
+
+        Resource maxMapRes = Resource.newInstance(0, 0);
+        long maxMapDur = 0;
+        // map tasks
+        for (int i = 0; i < job.getNumberMaps(); i++) {
+          TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.MAP, i, 0);
+          RMNode node = nmMap
+              .get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode();
+          String hostname = "/" + node.getRackName() + "/" + node.getHostName();
+          long containerLifeTime = tai.getRuntime();
+          Resource containerResource =
+              Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
+                  (int) tai.getTaskInfo().getTaskVCores());
+          containerList.add(new ContainerSimulator(containerResource,
+              containerLifeTime, hostname, 10, "map"));
+          maxMapRes = Resources.componentwiseMax(maxMapRes, containerResource);
+          maxMapDur =
+              containerLifeTime > maxMapDur ? containerLifeTime : maxMapDur;
+
+        }
+
+        Resource maxRedRes = Resource.newInstance(0, 0);
+        long maxRedDur = 0;
+        // reduce tasks
+        for (int i = 0; i < job.getNumberReduces(); i++) {
+          TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.REDUCE, i, 0);
+          RMNode node = nmMap
+              .get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode();
+          String hostname = "/" + node.getRackName() + "/" + node.getHostName();
+          long containerLifeTime = tai.getRuntime();
+          Resource containerResource =
+              Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
+                  (int) tai.getTaskInfo().getTaskVCores());
+          containerList.add(new ContainerSimulator(containerResource,
+              containerLifeTime, hostname, 20, "reduce"));
+          maxRedRes = Resources.componentwiseMax(maxRedRes, containerResource);
+          maxRedDur =
+              containerLifeTime > maxRedDur ? containerLifeTime : maxRedDur;
+
+        }
+
+        // generating reservations for the jobs that require them
+
+        ReservationSubmissionRequest rr = null;
+        if (job.hasDeadline()) {
+          ReservationId reservationId =
+              ReservationId.newInstance(this.rm.getStartTime(), AM_ID);
+
+          rr = ReservationClientUtil.createMRReservation(reservationId,
+              "reservation_" + AM_ID, maxMapRes, job.getNumberMaps(), maxMapDur,
+              maxRedRes, job.getNumberReduces(), maxRedDur,
+              now + jobStartTimeMS, now + job.getDeadline(),
+              job.getQueueName());
+
+        }
+        // create a new AM
+        AMSimulator amSim = (AMSimulator) ReflectionUtils
+            .newInstance(amClassMap.get(jobType), localConf);
+        if (amSim != null) {
+          amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this,
+              jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked,
+              oldJobId, rr, runner.getStartTimeMS());
+          runner.schedule(amSim);
+          maxRuntime = Math.max(maxRuntime, jobFinishTimeMS);
+          numTasks += containerList.size();
+          amMap.put(oldJobId, amSim);
+        }
+      }
+    } finally {
+      stjp.close();
+    }
+
+  }
+
   private void printSimulationInfo() {
     if (printSimulation) {
       // node
       LOG.info("------------------------------------");
-      LOG.info(MessageFormat.format("# nodes = {0}, # racks = {1}, capacity " +
-              "of each node {2} MB memory and {3} vcores.",
-              numNMs, numRacks, nmMemoryMB, nmVCores));
+      LOG.info(MessageFormat.format(
+          "# nodes = {0}, # racks = {1}, capacity "
+              + "of each node {2} MB memory and {3} vcores.",
+          numNMs, numRacks, nmMemoryMB, nmVCores));
       LOG.info("------------------------------------");
       // job
-      LOG.info(MessageFormat.format("# applications = {0}, # total " +
-              "tasks = {1}, average # tasks per application = {2}",
-              numAMs, numTasks, (int)(Math.ceil((numTasks + 0.0) / numAMs))));
+      LOG.info(MessageFormat.format(
+          "# applications = {0}, # total "
+              + "tasks = {1}, average # tasks per application = {2}",
+          numAMs, numTasks, (int) (Math.ceil((numTasks + 0.0) / numAMs))));
       LOG.info("JobId\tQueue\tAMType\tDuration\t#Tasks");
       for (Map.Entry<String, AMSimulator> entry : amMap.entrySet()) {
         AMSimulator am = entry.getValue();
-        LOG.info(entry.getKey() + "\t" + am.getQueue() + "\t" + am.getAMType() 
+        LOG.info(entry.getKey() + "\t" + am.getQueue() + "\t" + am.getAMType()
             + "\t" + am.getDuration() + "\t" + am.getNumTasks());
       }
       LOG.info("------------------------------------");
       // queue
-      LOG.info(MessageFormat.format("number of queues = {0}  average " +
-              "number of apps = {1}", queueAppNumMap.size(),
-              (int)(Math.ceil((numAMs + 0.0) / queueAppNumMap.size()))));
+      LOG.info(MessageFormat.format(
+          "number of queues = {0}  average " + "number of apps = {1}",
+          queueAppNumMap.size(),
+          (int) (Math.ceil((numAMs + 0.0) / queueAppNumMap.size()))));
       LOG.info("------------------------------------");
       // runtime
-      LOG.info(MessageFormat.format("estimated simulation time is {0}" +
-              " seconds", (long)(Math.ceil(maxRuntime / 1000.0))));
+      LOG.info(
+          MessageFormat.format("estimated simulation time is {0}" + " seconds",
+              (long) (Math.ceil(maxRuntime / 1000.0))));
       LOG.info("------------------------------------");
     }
     // package these information in the simulateInfoMap used by other places
@@ -510,69 +697,121 @@ public class SLSRunner {
     return nmMap;
   }
 
-  public static TaskRunner getRunner() {
-    return runner;
-  }
-
   public static void decreaseRemainingApps() {
-    remainingApps --;
+    remainingApps--;
 
     if (remainingApps == 0) {
       LOG.info("SLSRunner tears down.");
-      System.exit(0);
     }
   }
 
-  public static void main(String args[]) throws Exception {
+  public void stop() throws InterruptedException {
+    rm.stop();
+    runner.stop();
+  }
+
+  public int run(final String[] argv) throws IOException, InterruptedException,
+      ParseException, ClassNotFoundException, YarnException {
+
     Options options = new Options();
+
+    // Left for compatibility
     options.addOption("inputrumen", true, "input rumen files");
     options.addOption("inputsls", true, "input sls files");
+
+    // New more general format
+    options.addOption("tracetype", true, "the type of trace");
+    options.addOption("tracelocation", true, "input trace files");
+
     options.addOption("nodes", true, "input topology");
     options.addOption("output", true, "output directory");
     options.addOption("trackjobs", true,
-            "jobs to be tracked during simulating");
+        "jobs to be tracked during simulating");
     options.addOption("printsimulation", false,
-            "print out simulation information");
-    
+        "print out simulation information");
+
     CommandLineParser parser = new GnuParser();
-    CommandLine cmd = parser.parse(options, args);
+    CommandLine cmd = parser.parse(options, argv);
 
-    String inputRumen = cmd.getOptionValue("inputrumen");
-    String inputSLS = cmd.getOptionValue("inputsls");
-    String output = cmd.getOptionValue("output");
-    
-    if ((inputRumen == null && inputSLS == null) || output == null) {
-      System.err.println();
-      System.err.println("ERROR: Missing input or output file");
-      System.err.println();
-      System.err.println("Options: -inputrumen|-inputsls FILE,FILE... " +
-              "-output FILE [-nodes FILE] [-trackjobs JobId,JobId...] " +
-              "[-printsimulation]");
-      System.err.println();
-      System.exit(1);
+    String traceType = null;
+    String traceLocation = null;
+
+    // compatibility with old commandline
+    if (cmd.hasOption("inputrumen")) {
+      traceType = "RUMEN";
+      traceLocation = cmd.getOptionValue("inputrumen");
+    }
+    if (cmd.hasOption("inputsls")) {
+      traceType = "SLS";
+      traceLocation = cmd.getOptionValue("inputsls");
+    }
+
+    if (cmd.hasOption("tracetype")) {
+      traceType = cmd.getOptionValue("tracetype");
+      traceLocation = cmd.getOptionValue("tracelocation");
     }
-    
+
+    String output = cmd.getOptionValue("output");
+
     File outputFile = new File(output);
-    if (! outputFile.exists()
-            && ! outputFile.mkdirs()) {
+    if (!outputFile.exists() && !outputFile.mkdirs()) {
       System.err.println("ERROR: Cannot create output directory "
-              + outputFile.getAbsolutePath());
-      System.exit(1);
+          + outputFile.getAbsolutePath());
+      throw new YarnException("Cannot create output directory");
     }
-    
+
     Set<String> trackedJobSet = new HashSet<String>();
     if (cmd.hasOption("trackjobs")) {
       String trackjobs = cmd.getOptionValue("trackjobs");
       String jobIds[] = trackjobs.split(",");
       trackedJobSet.addAll(Arrays.asList(jobIds));
     }
-    
-    String nodeFile = cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : "";
 
-    boolean isSLS = inputSLS != null;
-    String inputFiles[] = isSLS ? inputSLS.split(",") : inputRumen.split(",");
-    SLSRunner sls = new SLSRunner(isSLS, inputFiles, nodeFile, output,
+    String tempNodeFile =
+        cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : "";
+
+    TraceType tempTraceType = TraceType.SLS;
+    switch (traceType) {
+    case "SLS":
+      tempTraceType = TraceType.SLS;
+      break;
+    case "RUMEN":
+      tempTraceType = TraceType.RUMEN;
+      break;
+
+    case "SYNTH":
+      tempTraceType = TraceType.SYNTH;
+      break;
+    default:
+      printUsage();
+      throw new YarnException("Misconfigured input");
+    }
+
+    String[] inputFiles = traceLocation.split(",");
+
+    setSimulationParams(tempTraceType, inputFiles, tempNodeFile, output,
         trackedJobSet, cmd.hasOption("printsimulation"));
-    sls.start();
+
+    start();
+
+    return 0;
   }
+
+  public static void main(String[] argv) throws Exception {
+    ToolRunner.run(new Configuration(), new SLSRunner(), argv);
+  }
+
+  static void printUsage() {
+    System.err.println();
+    System.err.println("ERROR: Wrong tracetype");
+    System.err.println();
+    System.err.println(
+        "Options: -tracetype " + "SLS|RUMEN|SYNTH -tracelocation FILE,FILE... "
+            + "(deprecated alternative options --inputsls FILE, FILE,... "
+            + " | --inputrumen FILE,FILE,...)"
+            + "-output FILE [-nodes FILE] [-trackjobs JobId,JobId...] "
+            + "[-printsimulation]");
+    System.err.println();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
index a62f2b6..45a3c07 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.sls.appmaster;
 
 import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.text.MessageFormat;
@@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords
         .FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
@@ -55,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -97,6 +100,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
   // am type
   protected String amtype;
   // job start/end time
+  private long baselineTimeMS;
   protected long traceStartTimeMS;
   protected long traceFinishTimeMS;
   protected long simulateStartTimeMS;
@@ -117,25 +121,30 @@ public abstract class AMSimulator extends TaskRunner.Task {
   private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024;
   private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1;
 
+  private ReservationSubmissionRequest reservationRequest;
+
   public AMSimulator() {
     this.responseQueue = new LinkedBlockingQueue<>();
   }
 
-  public void init(int id, int heartbeatInterval, 
-      List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
-      long traceStartTime, long traceFinishTime, String user, String queue, 
-      boolean isTracked, String oldAppId) {
-    super.init(traceStartTime, traceStartTime + 1000000L * heartbeatInterval,
-            heartbeatInterval);
-    this.user = user;
-    this.rm = rm;
-    this.se = se;
-    this.user = user;
-    this.queue = queue;
-    this.oldAppId = oldAppId;
-    this.isTracked = isTracked;
-    this.traceStartTimeMS = traceStartTime;
-    this.traceFinishTimeMS = traceFinishTime;
+  @SuppressWarnings("checkstyle:parameternumber")
+  public void init(int id, int heartbeatInterval,
+      List<ContainerSimulator> containerList, ResourceManager resourceManager,
+      SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
+      String simQueue, boolean tracked, String oldApp,
+      ReservationSubmissionRequest rr, long baseTimeMS) {
+    super.init(startTime, startTime + 1000000L * heartbeatInterval,
+        heartbeatInterval);
+    this.user = simUser;
+    this.rm = resourceManager;
+    this.se = slsRunnner;
+    this.queue = simQueue;
+    this.oldAppId = oldApp;
+    this.isTracked = tracked;
+    this.baselineTimeMS = baseTimeMS;
+    this.traceStartTimeMS = startTime;
+    this.traceFinishTimeMS = finishTime;
+    this.reservationRequest = rr;
   }
 
   /**
@@ -143,11 +152,21 @@ public abstract class AMSimulator extends TaskRunner.Task {
    */
   @Override
   public void firstStep() throws Exception {
-    simulateStartTimeMS = System.currentTimeMillis() - 
-                          SLSRunner.getRunner().getStartTimeMS();
+    simulateStartTimeMS = System.currentTimeMillis() - baselineTimeMS;
+
+    ReservationId reservationId = null;
+
+    // submit a reservation if one is required, exceptions naturally happen
+    // when the reservation does not fit, catch, log, and move on running job
+    // without reservation.
+    try {
+      reservationId = submitReservationWhenSpecified();
+    } catch (UndeclaredThrowableException y) {
+      LOG.warn("Unable to place reservation: " + y.getMessage());
+    }
 
     // submit application, waiting until ACCEPTED
-    submitApp();
+    submitApp(reservationId);
 
     // track app metrics
     trackApp();
@@ -161,6 +180,26 @@ public abstract class AMSimulator extends TaskRunner.Task {
     isAMContainerRunning = true;
   }
 
+  private ReservationId submitReservationWhenSpecified()
+      throws IOException, InterruptedException {
+    if (reservationRequest != null) {
+      UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws YarnException, IOException {
+          rm.getClientRMService().submitReservation(reservationRequest);
+          LOG.info("RESERVATION SUCCESSFULLY SUBMITTED "
+              + reservationRequest.getReservationId());
+          return null;
+
+        }
+      });
+      return reservationRequest.getReservationId();
+    } else {
+      return null;
+    }
+  }
+
   @Override
   public void middleStep() throws Exception {
     if (isAMContainerRunning) {
@@ -217,14 +256,13 @@ public abstract class AMSimulator extends TaskRunner.Task {
       }
     });
 
-    simulateFinishTimeMS = System.currentTimeMillis() -
-        SLSRunner.getRunner().getStartTimeMS();
+    simulateFinishTimeMS = System.currentTimeMillis() - baselineTimeMS;
     // record job running information
     SchedulerMetrics schedulerMetrics =
-        ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics();
+            ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics();
     if (schedulerMetrics != null) {
       schedulerMetrics.addAMRuntime(appId, traceStartTimeMS, traceFinishTimeMS,
-          simulateStartTimeMS, simulateFinishTimeMS);
+              simulateStartTimeMS, simulateFinishTimeMS);
     }
   }
   
@@ -261,7 +299,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
   
   protected abstract void checkStop();
   
-  private void submitApp()
+  private void submitApp(ReservationId reservationId)
           throws YarnException, InterruptedException, IOException {
     // ask for new application
     GetNewApplicationRequest newAppRequest =
@@ -291,6 +329,11 @@ public abstract class AMSimulator extends TaskRunner.Task {
     appSubContext.setResource(Resources
         .createResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB,
             MR_AM_CONTAINER_RESOURCE_VCORES));
+
+    if(reservationId != null) {
+      appSubContext.setReservationID(reservationId);
+    }
+
     subAppRequest.setApplicationSubmissionContext(appSubContext);
     UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
     ugi.doAs(new PrivilegedExceptionAction<Object>() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
index e726b09..de6d19d 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
@@ -27,13 +27,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.avro.Protocol;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 
 import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
 import org.apache.hadoop.yarn.sls.SLSRunner;
@@ -114,13 +113,15 @@ public class MRAMSimulator extends AMSimulator {
 
   public final Logger LOG = Logger.getLogger(MRAMSimulator.class);
 
+  @SuppressWarnings("checkstyle:parameternumber")
   public void init(int id, int heartbeatInterval,
       List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
       long traceStartTime, long traceFinishTime, String user, String queue, 
-      boolean isTracked, String oldAppId) {
+      boolean isTracked, String oldAppId, ReservationSubmissionRequest rr,
+      long baselineStartTimeMS) {
     super.init(id, heartbeatInterval, containerList, rm, se, 
               traceStartTime, traceFinishTime, user, queue,
-              isTracked, oldAppId);
+              isTracked, oldAppId, rr, baselineStartTimeMS);
     amtype = "mapreduce";
     
     // get map/reduce tasks

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
index 20cf3e5..b4ffb61 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
@@ -65,6 +65,11 @@ public class MockAMLauncher extends ApplicationMasterLauncher
     // Do nothing
   }
 
+  @Override
+  protected void serviceStop() throws Exception {
+    // Do nothing
+  }
+
   private void setupAMRMToken(RMAppAttempt appAttempt) {
     // Setup AMRMToken
     Token<AMRMTokenIdentifier> amrmToken =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
index 7c37465..56190df 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -52,7 +51,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.sls.SLSRunner;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
-import org.apache.hadoop.yarn.sls.utils.SLSUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.codahale.metrics.Timer;
@@ -96,16 +94,6 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
       } catch (Exception e) {
         e.printStackTrace();
       }
-
-      ShutdownHookManager.get().addShutdownHook(new Runnable() {
-        @Override public void run() {
-          try {
-            schedulerMetrics.tearDown();
-          } catch (Exception e) {
-            e.printStackTrace();
-          }
-        }
-      }, SLSUtils.SHUTDOWN_HOOK_PRIORITY);
     }
   }
 
@@ -344,7 +332,6 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
       initQueueMetrics(child);
     }
   }
-
   @Override
   public void serviceInit(Configuration configuration) throws Exception {
     super.serviceInit(configuration);
@@ -354,6 +341,17 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
     }
   }
 
+  @Override
+  public void serviceStop() throws Exception {
+    try {
+      schedulerMetrics.tearDown();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    super.serviceStop();
+  }
+
+
   public SchedulerMetrics getSchedulerMetrics() {
     return schedulerMetrics;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
index 572dacf..f740f5a 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -44,7 +43,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.sls.SLSRunner;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
-import org.apache.hadoop.yarn.sls.utils.SLSUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.io.IOException;
@@ -90,16 +88,6 @@ public class SLSFairScheduler extends FairScheduler
       } catch (Exception e) {
         e.printStackTrace();
       }
-
-      ShutdownHookManager.get().addShutdownHook(new Runnable() {
-        @Override public void run() {
-          try {
-            schedulerMetrics.tearDown();
-          } catch (Exception e) {
-            e.printStackTrace();
-          }
-        }
-      }, SLSUtils.SHUTDOWN_HOOK_PRIORITY);
     }
   }
 
@@ -335,5 +323,15 @@ public class SLSFairScheduler extends FairScheduler
       initQueueMetrics(getQueueManager().getRootQueue());
     }
   }
+
+  @Override
+  public void serviceStop() throws Exception {
+    try {
+      schedulerMetrics.tearDown();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    super.serviceStop();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java
index d352904..19cfe88 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.yarn.sls.scheduler;
 
-import java.io.IOException;
 import java.text.MessageFormat;
 import java.util.Queue;
 import java.util.concurrent.DelayQueue;
@@ -27,7 +26,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 
 @Private
 @Unstable
@@ -148,8 +146,8 @@ public class TaskRunner {
 
   @SuppressWarnings("unchecked")
   public void start() {
-    if (executor != null) {
-      throw new IllegalStateException("Already started");
+    if (executor != null && !executor.isTerminated()) {
+      throw new IllegalStateException("Executor already running");
     }
     DelayQueue preStartQueue = queue;
 
@@ -164,8 +162,9 @@ public class TaskRunner {
     }
   }
   
-  public void stop() {
+  public void stop() throws InterruptedException {
     executor.shutdownNow();
+    executor.awaitTermination(20, TimeUnit.SECONDS);
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
new file mode 100644
index 0000000..3ed81e1
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.sls.synthetic;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math3.distribution.LogNormalDistribution;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskStatus.State;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.*;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.mapreduce.MRJobConfig.QUEUE_NAME;
+
+/**
+ * Generates random task data for a synthetic job.
+ */
+public class SynthJob implements JobStory {
+
+  @SuppressWarnings("StaticVariableName")
+  private static Log LOG = LogFactory.getLog(SynthJob.class);
+
+  private final Configuration conf;
+  private final int id;
+
+  @SuppressWarnings("ConstantName")
+  private static final AtomicInteger sequence = new AtomicInteger(0);
+  private final String name;
+  private final String queueName;
+  private final SynthJobClass jobClass;
+
+  // job timing
+  private final long submitTime;
+  private final long duration;
+  private final long deadline;
+
+  private final int numMapTasks;
+  private final int numRedTasks;
+  private final long mapMaxMemory;
+  private final long reduceMaxMemory;
+  private final long mapMaxVcores;
+  private final long reduceMaxVcores;
+  private final long[] mapRuntime;
+  private final float[] reduceRuntime;
+  private long totMapRuntime;
+  private long totRedRuntime;
+
+  public SynthJob(JDKRandomGenerator rand, Configuration conf,
+      SynthJobClass jobClass, long actualSubmissionTime) {
+
+    this.conf = conf;
+    this.jobClass = jobClass;
+
+    this.duration = MILLISECONDS.convert(jobClass.getDur(), SECONDS);
+    this.numMapTasks = jobClass.getMtasks();
+    this.numRedTasks = jobClass.getRtasks();
+
+    // sample memory distributions, correct for sub-minAlloc sizes
+    long tempMapMaxMemory = jobClass.getMapMaxMemory();
+    this.mapMaxMemory = tempMapMaxMemory < MRJobConfig.DEFAULT_MAP_MEMORY_MB
+        ? MRJobConfig.DEFAULT_MAP_MEMORY_MB : tempMapMaxMemory;
+    long tempReduceMaxMemory = jobClass.getReduceMaxMemory();
+    this.reduceMaxMemory =
+            tempReduceMaxMemory < MRJobConfig.DEFAULT_REDUCE_MEMORY_MB
+            ? MRJobConfig.DEFAULT_REDUCE_MEMORY_MB : tempReduceMaxMemory;
+
+    // sample vcores distributions, correct for sub-minAlloc sizes
+    long tempMapMaxVCores = jobClass.getMapMaxVcores();
+    this.mapMaxVcores = tempMapMaxVCores < MRJobConfig.DEFAULT_MAP_CPU_VCORES
+        ? MRJobConfig.DEFAULT_MAP_CPU_VCORES : tempMapMaxVCores;
+    long tempReduceMaxVcores = jobClass.getReduceMaxVcores();
+    this.reduceMaxVcores =
+        tempReduceMaxVcores < MRJobConfig.DEFAULT_REDUCE_CPU_VCORES
+            ? MRJobConfig.DEFAULT_REDUCE_CPU_VCORES : tempReduceMaxVcores;
+
+    if (numMapTasks > 0) {
+      conf.setLong(MRJobConfig.MAP_MEMORY_MB, this.mapMaxMemory);
+      conf.set(MRJobConfig.MAP_JAVA_OPTS,
+          "-Xmx" + (this.mapMaxMemory - 100) + "m");
+    }
+
+    if (numRedTasks > 0) {
+      conf.setLong(MRJobConfig.REDUCE_MEMORY_MB, this.reduceMaxMemory);
+      conf.set(MRJobConfig.REDUCE_JAVA_OPTS,
+          "-Xmx" + (this.reduceMaxMemory - 100) + "m");
+    }
+
+    boolean hasDeadline =
+        (rand.nextDouble() <= jobClass.jobClass.chance_of_reservation);
+
+    LogNormalDistribution deadlineFactor =
+        SynthUtils.getLogNormalDist(rand, jobClass.jobClass.deadline_factor_avg,
+            jobClass.jobClass.deadline_factor_stddev);
+
+    double deadlineFactorSample =
+        (deadlineFactor != null) ? deadlineFactor.sample() : -1;
+
+    this.queueName = jobClass.workload.getQueueName();
+
+    this.submitTime = MILLISECONDS.convert(actualSubmissionTime, SECONDS);
+
+    this.deadline =
+        hasDeadline ? MILLISECONDS.convert(actualSubmissionTime, SECONDS)
+            + (long) Math.ceil(deadlineFactorSample * duration) : -1;
+
+    conf.set(QUEUE_NAME, queueName);
+
+    // name and initialize job randomness
+    final long seed = rand.nextLong();
+    rand.setSeed(seed);
+    id = sequence.getAndIncrement();
+
+    name = String.format(jobClass.getClassName() + "_%06d", id);
+    LOG.debug(name + " (" + seed + ")");
+
+    LOG.info("JOB TIMING`: job: " + name + " submission:" + submitTime
+        + " deadline:" + deadline + " duration:" + duration
+        + " deadline-submission: " + (deadline - submitTime));
+
+    // generate map and reduce runtimes
+    mapRuntime = new long[numMapTasks];
+    for (int i = 0; i < numMapTasks; i++) {
+      mapRuntime[i] = jobClass.getMapTimeSample();
+      totMapRuntime += mapRuntime[i];
+    }
+    reduceRuntime = new float[numRedTasks];
+    for (int i = 0; i < numRedTasks; i++) {
+      reduceRuntime[i] = jobClass.getReduceTimeSample();
+      totRedRuntime += (long) Math.ceil(reduceRuntime[i]);
+    }
+  }
+
+  public boolean hasDeadline() {
+    return deadline > 0;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public String getUser() {
+    return jobClass.getUserName();
+  }
+
+  @Override
+  public JobID getJobID() {
+    return new JobID("job_mock_" + name, id);
+  }
+
+  @Override
+  public Values getOutcome() {
+    return Values.SUCCESS;
+  }
+
+  @Override
+  public long getSubmissionTime() {
+    return submitTime;
+  }
+
+  @Override
+  public int getNumberMaps() {
+    return numMapTasks;
+  }
+
+  @Override
+  public int getNumberReduces() {
+    return numRedTasks;
+  }
+
+  @Override
+  public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+    switch (taskType) {
+    case MAP:
+      return new TaskInfo(-1, -1, -1, -1, mapMaxMemory, mapMaxVcores);
+    case REDUCE:
+      return new TaskInfo(-1, -1, -1, -1, reduceMaxMemory, reduceMaxVcores);
+    default:
+      throw new IllegalArgumentException("Not interested");
+    }
+  }
+
+  @Override
+  public InputSplit[] getInputSplits() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber,
+      int taskAttemptNumber) {
+    switch (taskType) {
+    case MAP:
+      return new MapTaskAttemptInfo(State.SUCCEEDED,
+          getTaskInfo(taskType, taskNumber), mapRuntime[taskNumber], null);
+
+    case REDUCE:
+      // We assume uniform split between pull/sort/reduce
+      // aligned with naive progress reporting assumptions
+      return new ReduceTaskAttemptInfo(State.SUCCEEDED,
+          getTaskInfo(taskType, taskNumber),
+          (long) Math.round((reduceRuntime[taskNumber] / 3)),
+          (long) Math.round((reduceRuntime[taskNumber] / 3)),
+          (long) Math.round((reduceRuntime[taskNumber] / 3)), null);
+
+    default:
+      break;
+    }
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
+      int taskAttemptNumber, int locality) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public org.apache.hadoop.mapred.JobConf getJobConf() {
+    return new JobConf(conf);
+  }
+
+  @Override
+  public String getQueueName() {
+    return queueName;
+  }
+
+  @Override
+  public String toString() {
+    return "SynthJob [\n" + "  workload=" + jobClass.getWorkload().getId()
+        + "\n" + "  jobClass="
+        + jobClass.getWorkload().getClassList().indexOf(jobClass) + "\n"
+        + "  conf=" + conf + ",\n" + "  id=" + id + ",\n" + "  name=" + name
+        + ",\n" + "  mapRuntime=" + Arrays.toString(mapRuntime) + ",\n"
+        + "  reduceRuntime=" + Arrays.toString(reduceRuntime) + ",\n"
+        + "  submitTime=" + submitTime + ",\n" + "  numMapTasks=" + numMapTasks
+        + ",\n" + "  numRedTasks=" + numRedTasks + ",\n" + "  mapMaxMemory="
+        + mapMaxMemory + ",\n" + "  reduceMaxMemory=" + reduceMaxMemory + ",\n"
+        + "  queueName=" + queueName + "\n" + "]";
+  }
+
+  public SynthJobClass getJobClass() {
+    return jobClass;
+  }
+
+  public long getTotalSlotTime() {
+    return totMapRuntime + totRedRuntime;
+  }
+
+  public long getDuration() {
+    return duration;
+  }
+
+  public long getDeadline() {
+    return deadline;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof SynthJob)) {
+      return false;
+    }
+    SynthJob o = (SynthJob) other;
+    return Arrays.equals(mapRuntime, o.mapRuntime)
+        && Arrays.equals(reduceRuntime, o.reduceRuntime)
+        && submitTime == o.submitTime && numMapTasks == o.numMapTasks
+        && numRedTasks == o.numRedTasks && mapMaxMemory == o.mapMaxMemory
+        && reduceMaxMemory == o.reduceMaxMemory
+        && mapMaxVcores == o.mapMaxVcores
+        && reduceMaxVcores == o.reduceMaxVcores && queueName.equals(o.queueName)
+        && jobClass.equals(o.jobClass) && totMapRuntime == o.totMapRuntime
+        && totRedRuntime == o.totRedRuntime;
+  }
+
+  @Override
+  public int hashCode() {
+    // could have a bad distr; investigate if a relevant use case exists
+    return jobClass.hashCode() * (int) submitTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java
new file mode 100644
index 0000000..439698f
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.sls.synthetic;
+
+import org.apache.commons.math3.distribution.AbstractRealDistribution;
+import org.apache.commons.math3.distribution.LogNormalDistribution;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.JobClass;
+import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace;
+
+/**
+ * This is a class that represent a class of Jobs. It is used to generate an
+ * individual job, by picking random durations, task counts, container size,
+ * etc.
+ */
+public class SynthJobClass {
+
+  private final JDKRandomGenerator rand;
+  private final LogNormalDistribution dur;
+  private final LogNormalDistribution mapRuntime;
+  private final LogNormalDistribution redRuntime;
+  private final LogNormalDistribution mtasks;
+  private final LogNormalDistribution rtasks;
+  private final LogNormalDistribution mapMem;
+  private final LogNormalDistribution redMem;
+  private final LogNormalDistribution mapVcores;
+  private final LogNormalDistribution redVcores;
+
+  private final Trace trace;
+  @SuppressWarnings("VisibilityModifier")
+  protected final SynthWorkload workload;
+  @SuppressWarnings("VisibilityModifier")
+  protected final JobClass jobClass;
+
+  public SynthJobClass(JDKRandomGenerator rand, Trace trace,
+      SynthWorkload workload, int classId) {
+
+    this.trace = trace;
+    this.workload = workload;
+    this.rand = new JDKRandomGenerator();
+    this.rand.setSeed(rand.nextLong());
+    jobClass = trace.workloads.get(workload.getId()).job_classes.get(classId);
+
+    this.dur = SynthUtils.getLogNormalDist(rand, jobClass.dur_avg,
+        jobClass.dur_stddev);
+    this.mapRuntime = SynthUtils.getLogNormalDist(rand, jobClass.mtime_avg,
+        jobClass.mtime_stddev);
+    this.redRuntime = SynthUtils.getLogNormalDist(rand, jobClass.rtime_avg,
+        jobClass.rtime_stddev);
+    this.mtasks = SynthUtils.getLogNormalDist(rand, jobClass.mtasks_avg,
+        jobClass.mtasks_stddev);
+    this.rtasks = SynthUtils.getLogNormalDist(rand, jobClass.rtasks_avg,
+        jobClass.rtasks_stddev);
+
+    this.mapMem = SynthUtils.getLogNormalDist(rand, jobClass.map_max_memory_avg,
+        jobClass.map_max_memory_stddev);
+    this.redMem = SynthUtils.getLogNormalDist(rand,
+        jobClass.reduce_max_memory_avg, jobClass.reduce_max_memory_stddev);
+    this.mapVcores = SynthUtils.getLogNormalDist(rand,
+        jobClass.map_max_vcores_avg, jobClass.map_max_vcores_stddev);
+    this.redVcores = SynthUtils.getLogNormalDist(rand,
+        jobClass.reduce_max_vcores_avg, jobClass.reduce_max_vcores_stddev);
+  }
+
+  public JobStory getJobStory(Configuration conf, long actualSubmissionTime) {
+    return new SynthJob(rand, conf, this, actualSubmissionTime);
+  }
+
+  @Override
+  public String toString() {
+    return "SynthJobClass [workload=" + workload.getName() + ", class="
+        + jobClass.class_name + " job_count=" + jobClass.class_weight + ", dur="
+        + ((dur != null) ? dur.getNumericalMean() : 0) + ", mapRuntime="
+        + ((mapRuntime != null) ? mapRuntime.getNumericalMean() : 0)
+        + ", redRuntime="
+        + ((redRuntime != null) ? redRuntime.getNumericalMean() : 0)
+        + ", mtasks=" + ((mtasks != null) ? mtasks.getNumericalMean() : 0)
+        + ", rtasks=" + ((rtasks != null) ? rtasks.getNumericalMean() : 0)
+        + ", chance_of_reservation=" + jobClass.chance_of_reservation + "]\n";
+
+  }
+
+  public double getClassWeight() {
+    return jobClass.class_weight;
+  }
+
+  public long getDur() {
+    return genLongSample(dur);
+  }
+
+  public int getMtasks() {
+    return genIntSample(mtasks);
+  }
+
+  public int getRtasks() {
+    return genIntSample(rtasks);
+  }
+
+  public long getMapMaxMemory() {
+    return genLongSample(mapMem);
+  }
+
+  public long getReduceMaxMemory() {
+    return genLongSample(redMem);
+  }
+
+  public long getMapMaxVcores() {
+    return genLongSample(mapVcores);
+  }
+
+  public long getReduceMaxVcores() {
+    return genLongSample(redVcores);
+  }
+
+  public SynthWorkload getWorkload() {
+    return workload;
+  }
+
+  public int genIntSample(AbstractRealDistribution dist) {
+    if (dist == null) {
+      return 0;
+    }
+    double baseSample = dist.sample();
+    if (baseSample < 0) {
+      baseSample = 0;
+    }
+    return (int) (Integer.MAX_VALUE & (long) Math.ceil(baseSample));
+  }
+
+  public long genLongSample(AbstractRealDistribution dist) {
+    return dist != null ? (long) Math.ceil(dist.sample()) : 0;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof SynthJobClass)) {
+      return false;
+    }
+    SynthJobClass o = (SynthJobClass) other;
+    return workload.equals(o.workload);
+  }
+
+  @Override
+  public int hashCode() {
+    return workload.hashCode() * workload.getId();
+  }
+
+  public String getClassName() {
+    return jobClass.class_name;
+  }
+
+  public long getMapTimeSample() {
+    return genLongSample(mapRuntime);
+  }
+
+  public long getReduceTimeSample() {
+    return genLongSample(redRuntime);
+  }
+
+  public String getUserName() {
+    return jobClass.user_name;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message