hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r921670 - in /hadoop/mapreduce/trunk: ./ src/contrib/mumak/src/java/org/apache/hadoop/mapred/ src/contrib/mumak/src/test/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/tools/rumen/ src/tools/org/apache/hadoop/tools/rumen/
Date Thu, 11 Mar 2010 02:38:22 GMT
Author: cdouglas
Date: Thu Mar 11 02:38:21 2010
New Revision: 921670

URL: http://svn.apache.org/viewvc?rev=921670&view=rev
Log:
MAPREDUCE-1306. Randomize the arrival of heartbeat responses in Mumak.
Contributed by Tamas Sarlos

Added:
    hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/DeterministicCollectionAspects.aj
    hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorDeterministicReplay.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRandomSeedGenerator.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RandomSeedGenerator.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java
    hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobStoryProducer.java
    hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTracker.java
    hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java
    hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorTaskTracker.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=921670&r1=921669&r2=921670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Mar 11 02:38:21 2010
@@ -440,6 +440,9 @@ Trunk (unreleased changes)
     with non-deletable permissions are created within it.
     (Amar Kamat via yhemanth)
 
+    MAPREDUCE-1306. Randomize the arrival of heartbeat responses in Mumak.
+    (Tamas Sarlos via cdouglas)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/DeterministicCollectionAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/DeterministicCollectionAspects.aj?rev=921670&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/DeterministicCollectionAspects.aj
(added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/DeterministicCollectionAspects.aj
Thu Mar 11 02:38:21 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+
+// HashSet and HashMap do not guarantee that the oder of iteration is 
+// determinstic. We need the latter for the deterministic replay of 
+// simulations. These iterations are heavily used in the JobTracker, e.g. when 
+// looking for non-local map tasks. Not all HashSet and HashMap instances
+// are iterated over, but to be safe and simple we replace all with 
+// LinkedHashSet and LinkedHashMap whose iteration order is deterministic.
+
+public privileged aspect DeterministicCollectionAspects {
+
+  // Fortunately the Java runtime type of generic classes do not contain
+  // the generic parameter. We can catch all with a single base pattern using
+  // the erasure of the generic type.
+
+  HashSet around() : call(HashSet.new()) {
+    return new LinkedHashSet();
+  }
+
+  HashMap around() : call(HashMap.new()) {
+    return new LinkedHashMap();
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java?rev=921670&r1=921669&r2=921670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java
Thu Mar 11 02:38:21 2010
@@ -19,15 +19,17 @@ package org.apache.hadoop.mapred;
 
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.List;
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
+import java.util.Random;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.SimulatorEvent;
 import org.apache.hadoop.mapred.SimulatorEventQueue;
 import org.apache.hadoop.mapred.JobCompleteEvent;
@@ -43,6 +45,7 @@ import org.apache.hadoop.tools.rumen.Log
 import org.apache.hadoop.tools.rumen.MachineNode;
 import org.apache.hadoop.tools.rumen.RackNode;
 import org.apache.hadoop.tools.rumen.ZombieCluster;
+import org.apache.hadoop.tools.rumen.RandomSeedGenerator;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -55,9 +58,8 @@ import org.apache.hadoop.util.ToolRunner
  */
 public class SimulatorEngine extends Configured implements Tool {
   public static final List<SimulatorEvent> EMPTY_EVENTS = new ArrayList<SimulatorEvent>();
-  private static final int DEFAULT_MAP_SLOTS_PER_NODE = 2;
-  private static final int DEFAULT_REDUCE_SLOTS_PER_NODE = 2;
-
+  /** Default number of milliseconds required to boot up the entire cluster. */
+  public static final int DEFAULT_CLUSTER_STARTUP_DURATION = 100*1000;
   protected final SimulatorEventQueue queue = new SimulatorEventQueue();
   String traceFile;
   String topologyFile;
@@ -66,30 +68,74 @@ public class SimulatorEngine extends Con
   boolean shutdown = false;
   long terminateTime = Long.MAX_VALUE;
   long currentTime;
-  
+  /** 
+   * Master random seed read from the configuration file, if present.
+   * It is (only) used for creating sub seeds for all the random number 
+   * generators.
+   */
+  long masterRandomSeed;
+                                                                                        
                                                   
   /**
    * Start simulated task trackers based on topology.
-   * @param clusterStory The cluster topology.
+   * @param clusterStory the cluster topology.
+   * @param jobConf configuration object.
    * @param now
    *    time stamp when the simulator is started, {@link SimulatorTaskTracker}s
-   *    are started shortly after this time stamp
+   *    are started uniformly randomly spread in [now,now+startDuration).
+   * @return time stamp by which the entire cluster is booted up and all task
+   *    trackers are sending hearbeats in their steady rate.
    */
-  void startTaskTrackers(ClusterStory clusterStory, long now) {
+  long startTaskTrackers(ClusterStory cluster, JobConf jobConf, long now) {
     /** port assigned to TTs, incremented by 1 for each TT */
     int port = 10000;
-    long ms = now + 100;
+    int numTaskTrackers = 0;
+
+    Random random = new Random(RandomSeedGenerator.getSeed(
+       "forStartTaskTrackers()", masterRandomSeed));
 
-    for (MachineNode node : clusterStory.getMachines()) {
-      String hostname = node.getName();
-      String taskTrackerName = "tracker_" + hostname + ":localhost/127.0.0.1:"
-          + port;
+    final int startDuration = jobConf.getInt("mumak.cluster.startup.duration",
+        DEFAULT_CLUSTER_STARTUP_DURATION);
+    
+    for (MachineNode node : cluster.getMachines()) {
+      jobConf.set("mumak.tasktracker.host.name", node.getName());
+      jobConf.set("mumak.tasktracker.tracker.name",
+          "tracker_" + node.getName() + ":localhost/127.0.0.1:" + port);
+      long subRandomSeed = RandomSeedGenerator.getSeed(
+         "forTaskTracker" + numTaskTrackers, masterRandomSeed);
+      jobConf.setLong("mumak.tasktracker.random.seed", subRandomSeed);
+      numTaskTrackers++;
       port++;
-      SimulatorTaskTracker tt = new SimulatorTaskTracker(jt, taskTrackerName,
-          hostname, node.getMapSlots(), node.getReduceSlots());
-      queue.addAll(tt.init(ms++));
+      SimulatorTaskTracker tt = new SimulatorTaskTracker(jt, jobConf);
+      long firstHeartbeat = now + random.nextInt(startDuration);
+      queue.addAll(tt.init(firstHeartbeat));
     }
+    
+    // In startDuration + heartbeat interval of the full cluster time each 
+    // TT is started up and told on its 2nd heartbeat to beat at a rate 
+    // corresponding to the steady state of the cluster    
+    long clusterSteady = now + startDuration + jt.getNextHeartbeatInterval();
+    return clusterSteady;
   }
-  
+
+  /**
+   * Reads a positive long integer from a configuration.
+   *
+   * @param Configuration conf configuration objects
+   * @param String propertyName name of the property
+   * @return time
+   */
+  long getTimeProperty(Configuration conf, String propertyName,
+                       long defaultValue) 
+      throws IllegalArgumentException {
+    // possible improvement: change date format to human readable ?
+    long time = conf.getLong(propertyName, defaultValue);
+    if (time <= 0) {
+      throw new IllegalArgumentException(propertyName + "time must be positive: "
+          + time);
+    }
+    return time;
+  }
+   
   /**
    * Initiate components in the simulation.
    * @throws InterruptedException
@@ -97,8 +143,7 @@ public class SimulatorEngine extends Con
    */
   @SuppressWarnings("deprecation")
   void init() throws InterruptedException, IOException {
-    long now = System.currentTimeMillis();
-
+    
     JobConf jobConf = new JobConf(getConf());
     jobConf.setClass("topology.node.switch.mapping.impl",
         StaticMapping.class, DNSToSwitchMapping.class);
@@ -121,19 +166,29 @@ public class SimulatorEngine extends Con
     jobConf.set("hadoop.job.history.location", (new Path(logPath, "history")
         .toString()));
     
+    // start time for virtual clock
+    // possible improvement: set default value to sth more meaningful based on
+    // the 1st job
+    long now = getTimeProperty(jobConf, "mumak.start.time", 
+                               System.currentTimeMillis());
+
     jt = SimulatorJobTracker.startTracker(jobConf, now, this);
     jt.offerService();
     
+    masterRandomSeed = jobConf.getLong("mumak.random.seed", System.nanoTime()); 
+    
     // max Map/Reduce tasks per node
-    int maxMaps = getConf().getInt("mapred.tasktracker.map.tasks.maximum",
-        DEFAULT_MAP_SLOTS_PER_NODE);
+    int maxMaps = getConf().getInt(
+        "mapred.tasktracker.map.tasks.maximum",
+        SimulatorTaskTracker.DEFAULT_MAP_SLOTS);
     int maxReduces = getConf().getInt(
         "mapred.tasktracker.reduce.tasks.maximum",
-        DEFAULT_REDUCE_SLOTS_PER_NODE);
+    
+      SimulatorTaskTracker.DEFAULT_REDUCE_SLOTS);
 
     MachineNode defaultNode = new MachineNode.Builder("default", 2)
         .setMapSlots(maxMaps).setReduceSlots(maxReduces).build();
-    
+            
     LoggedNetworkTopology topology = new ClusterTopologyReader(new Path(
         topologyFile), jobConf).get();
     // Setting the static mapping before removing numeric IP hosts.
@@ -142,24 +197,23 @@ public class SimulatorEngine extends Con
       removeIpHosts(topology);
     }
     ZombieCluster cluster = new ZombieCluster(topology, defaultNode);
-    long firstJobStartTime = now + 60000;
-    JobStoryProducer jobStoryProducer = new SimulatorJobStoryProducer(
-        new Path(traceFile), cluster, firstJobStartTime, jobConf);
     
+    // create TTs based on topology.json  
+    long firstJobStartTime = startTaskTrackers(cluster, jobConf, now);
+
+    long subRandomSeed = RandomSeedGenerator.getSeed("forSimulatorJobStoryProducer",
+                                                     masterRandomSeed);
+    JobStoryProducer jobStoryProducer = new SimulatorJobStoryProducer(
+        new Path(traceFile), cluster, firstJobStartTime, jobConf, subRandomSeed);
+
     final SimulatorJobSubmissionPolicy submissionPolicy = SimulatorJobSubmissionPolicy
         .getPolicy(jobConf);
-    
+
     jc = new SimulatorJobClient(jt, jobStoryProducer, submissionPolicy);
     queue.addAll(jc.init(firstJobStartTime));
 
-    // create TTs based on topology.json     
-    startTaskTrackers(cluster, now);
-    
-    terminateTime = getConf().getLong("mumak.terminate.time", Long.MAX_VALUE);
-    if (terminateTime <= 0) {
-      throw new IllegalArgumentException("Terminate time must be positive: "
-          + terminateTime);
-    }
+    terminateTime = getTimeProperty(jobConf, "mumak.terminate.time",
+                                    Long.MAX_VALUE);
   }
   
   /**

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobStoryProducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobStoryProducer.java?rev=921670&r1=921669&r2=921670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobStoryProducer.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobStoryProducer.java
Thu Mar 11 02:38:21 2010
@@ -41,7 +41,12 @@ public class SimulatorJobStoryProducer i
 
   public SimulatorJobStoryProducer(Path path, ZombieCluster cluster,
       long firstJobStartTime, Configuration conf) throws IOException {
-    producer = new ZombieJobProducer(path, cluster, conf);
+    this(path, cluster, firstJobStartTime, conf, System.nanoTime());
+  }
+
+  public SimulatorJobStoryProducer(Path path, ZombieCluster cluster,
+      long firstJobStartTime, Configuration conf, long seed) throws IOException {
+    producer = new ZombieJobProducer(path, cluster, conf, seed);
     this.firstJobStartTime = firstJobStartTime;
   }
 

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTracker.java?rev=921670&r1=921669&r2=921670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTracker.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTracker.java
Thu Mar 11 02:38:21 2010
@@ -22,10 +22,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Map;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Set;
-import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.Iterator;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,6 +36,7 @@ import org.apache.hadoop.tools.rumen.Tas
 import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo;
 // Explicitly use the new api, older o.a.h.mapred.TaskAttemptID is deprecated
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * This class simulates a {@link TaskTracker}. Its main purpose is to call heartbeat()
@@ -49,6 +51,17 @@ import org.apache.hadoop.mapreduce.TaskA
  * handle*Action() methods.
  */
 public class SimulatorTaskTracker implements SimulatorEventListener {
+  /** Default host name. */
+  public static String DEFAULT_HOST_NAME = "unknown";
+  /** Default task tracker name. */
+  public static String DEFAULT_TRACKER_NAME = 
+      "tracker_unknown:localhost/127.0.0.1:10000";
+  /** Default number of map slots per task tracker. */
+  public static final int DEFAULT_MAP_SLOTS = 2;
+  /** Default number of reduce slots per task tracker. */
+  public static final int DEFAULT_REDUCE_SLOTS = 2;
+  /** Default range of heartbeat response perturbations + 1 in milliseconds. */
+  public static final int DEFAULT_HEARTBEAT_FUZZ = 11;
   /** The name of the task tracker. */
   protected final String taskTrackerName;
   /** The name of the host the task tracker is running on. */
@@ -68,9 +81,11 @@ public class SimulatorTaskTracker implem
    * not yet reported tasks. We manage it in a mark & sweep garbage collector 
    * manner. We insert tasks on launch, mark them on completion, and remove
    * completed tasks at heartbeat() reports.
+   * We use LinkedHashMap instead of HashMap so that the order of iteration
+   * is deterministic.
    */
   protected Map<TaskAttemptID, SimulatorTaskInProgress> tasks = 
-      new HashMap<TaskAttemptID, SimulatorTaskInProgress>();
+      new LinkedHashMap<TaskAttemptID, SimulatorTaskInProgress>();
   /** 
    * Number of map slots allocated to tasks in RUNNING state on this task 
    * tracker. Must be in sync with the tasks map above. 
@@ -92,34 +107,68 @@ public class SimulatorTaskTracker implem
 
   /**
    * Task attempt ids for which TaskAttemptCompletionEvent was created but the 
-   * task attempt got killed. 
+   * task attempt got killed.
+   * We use LinkedHashSet to get deterministic iterators, should ever use one.
    */
   private Set<TaskAttemptID> orphanTaskCompletions = 
-    new HashSet<TaskAttemptID>();
+    new LinkedHashSet<TaskAttemptID>();
 
   /** The log object to send our messages to; only used for debugging. */
   private static final Log LOG = LogFactory.getLog(SimulatorTaskTracker.class);
   
+  /** 
+   * Number of milliseconds to perturb the requested heartbeat intervals with
+   * to simulate network latency, etc.
+   * If <= 1 then there is no pertrubation. This option is also useful for 
+   * testing.
+   * If > 1 then hearbeats are perturbed with a uniformly random integer in 
+   * (-heartbeatIntervalFuzz,+heartbeatIntervalFuzz), not including 
+   * the bounds.
+   */
+  private int heartbeatIntervalFuzz = -1;
+  /** Used for randomly perturbing the heartbeat timings. */
+  private Random random;
+  
   /**
    * Constructs a task tracker. 
    *
    * @param jobTracker the SimulatorJobTracker we talk to
-   * @param taskTrackerName the task tracker name to report, otherwise unused
-   * @param hostName the host name to report, otherwise unused
-   * @param maxMapTasks the number of map slots
-   * @param maxReduceTasks the number of reduce slots
-   */
-  public SimulatorTaskTracker(InterTrackerProtocol jobTracker, 
-                              String taskTrackerName, String hostName, 
-                              int maxMapTasks, int maxReduceTasks) {
+   * @param conf Configuration object. Parameters read are:
+   * <dl>
+   * <dt> mumak.tasktracker.tracker.name <dd> 
+   *      the task tracker name to report, otherwise unused
+   * <dt> mumak.tasktracker.host.name <dd> 
+   *      the host name to report, otherwise unused
+   * <dt> mapred.tasktracker.map.tasks.maximum <dd> 
+   *      the number of map slots
+   * <dt> mapred.tasktracker.reduce.tasks.maximum <dd> 
+   *      the number of reduce slots
+   * <dt> mumak.tasktracker.heartbeat.fuzz <dd>
+   *      Perturbation for the heartbeats. 
+   *      None if <= 1 else perturbations are uniformly randomly generated 
+   *      in (-heartbeat.fuzz,+heartbeat.fuzz), not including the bounds.
+   * </dl>
+   */
+  public SimulatorTaskTracker(InterTrackerProtocol jobTracker,
+                              Configuration conf) {
+    this.taskTrackerName = conf.get(
+        "mumak.tasktracker.tracker.name", DEFAULT_TRACKER_NAME);
+
     LOG.debug("SimulatorTaskTracker constructor, taskTrackerName=" +
               taskTrackerName);
 
     this.jobTracker = jobTracker;    
-    this.taskTrackerName = taskTrackerName;
-    this.hostName = hostName;
-    this.maxMapSlots = maxMapTasks;
-    this.maxReduceSlots = maxReduceTasks;
+    this.hostName = conf.get(
+        "mumak.tasktracker.host.name", DEFAULT_HOST_NAME);
+    this.maxMapSlots = conf.getInt(
+        "mapred.tasktracker.map.tasks.maximum", DEFAULT_MAP_SLOTS);
+    this.maxReduceSlots = conf.getInt(
+        "mapred.tasktracker.reduce.tasks.maximum", DEFAULT_REDUCE_SLOTS);
+    this.heartbeatIntervalFuzz = conf.getInt(
+        "mumak.tasktracker.heartbeat.fuzz", DEFAULT_HEARTBEAT_FUZZ);
+    long seed = conf.getLong("mumak.tasktracker.random.seed", 
+        System.nanoTime());
+    this.random = new Random(seed);
   }
   
   /**
@@ -637,7 +686,18 @@ public class SimulatorTaskTracker implem
     List<SimulatorEvent> events = handleHeartbeatResponse(response, now);
     
     // Next heartbeat
-    events.add(new HeartbeatEvent(this, now + response.getHeartbeatInterval()));
+    int heartbeatInterval = response.getHeartbeatInterval();
+    if (heartbeatIntervalFuzz > 1) {
+      // Add some randomness to heartbeat timings to simulate network latency, 
+      // time spent servicing this heartbeat request, etc.
+      // randomFuzz is in (-heartbeatIntervalFuzz,+heartbeatIntervalFuzz)
+      int randomFuzz = random.nextInt(2*heartbeatIntervalFuzz-1) - 
+                       heartbeatIntervalFuzz;
+      heartbeatInterval += randomFuzz;
+      // make sure we never schedule a heartbeat in the past
+      heartbeatInterval = Math.max(1, heartbeatInterval); 
+    }
+    events.add(new HeartbeatEvent(this, now + heartbeatInterval));
 
     return events;
   }

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorDeterministicReplay.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorDeterministicReplay.java?rev=921670&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorDeterministicReplay.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorDeterministicReplay.java
Thu Mar 11 02:38:21 2010
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.junit.Test;
+
+public class TestSimulatorDeterministicReplay {
+
+  public static final Log LOG = LogFactory.getLog(
+      TestSimulatorDeterministicReplay.class);
+  protected SimulatorJobSubmissionPolicy policy = SimulatorJobSubmissionPolicy.REPLAY;
+  
+  @Test
+  public void testMain() throws Exception {
+    Path hadoopLogDir = new Path(
+        System.getProperty("test.build.data"), "mumak-replay");
+    Path hadoopLogDir1 = new Path(hadoopLogDir, "run1");
+    Path hadoopLogDir2 = new Path(hadoopLogDir, "run2");
+    runMumak(hadoopLogDir1, 50031);
+    LOG.info("Run1 done");
+    runMumak(hadoopLogDir2, 50032);
+    compareLogDirs(hadoopLogDir1.toString(), hadoopLogDir2.toString());
+  }
+  
+  void compareLogDirs(String dir1, String dir2) {
+    try {
+      Runtime runtime = Runtime.getRuntime();
+      Process process = runtime.exec("diff -r /dev/null /dev/null");
+      process.waitFor();
+      // If there is no diff available, we skip the test and end up in 
+      // the catch block
+      // Make sure diff understands the -r option
+      if (process.exitValue() != 0) {
+        LOG.warn("diff -r is not working, skipping the test");
+        return;
+      }
+      // Run the real comparison
+      process = runtime.exec("diff -r " + dir1 + " " + dir2);
+      process.waitFor();
+      Assert.assertEquals("Job history logs differ, diff returned", 
+                          0, process.exitValue());
+    } catch (Exception e) {
+      LOG.warn("Exception while diffing: " + e);
+    }                        
+  }
+  
+  // We need a different http port parameter for each run as the socket
+  // is not closed properly in hadoop
+  void runMumak(Path hadoopLogDir, int jobTrackerHttpPort) 
+      throws Exception {
+    final Configuration conf = new Configuration();
+    conf.set(SimulatorJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy.name());
+    final FileSystem lfs = FileSystem.getLocal(conf);
+    final Path rootInputDir = new Path(
+        System.getProperty("src.test.data", "data")).makeQualified(lfs);
+    final Path traceFile = new Path(rootInputDir, "19-jobs.trace.json.gz");
+    final Path topologyFile = new Path(rootInputDir, "19-jobs.topology.json.gz");
+
+    LOG.info("traceFile = " + traceFile + " topology = " + topologyFile);
+    
+    conf.setLong("mumak.start.time", 10);
+    // Run 20 minutes of the simulation
+    conf.setLong("mumak.terminate.time", 10 + 20*60*1000);
+    conf.setLong("mumak.random.seed", 42);
+    // SimulatorEngine reads conf and the system property too (!)
+    System.setProperty("hadoop.log.dir", hadoopLogDir.toString());
+    conf.set("hadoop.log.dir", hadoopLogDir.toString());
+    conf.set("mapred.job.tracker.http.address",
+             "0.0.0.0:" + jobTrackerHttpPort);
+    String[] args = { traceFile.toString(), topologyFile.toString() };
+    int res = ToolRunner.run(conf, new SimulatorEngine(), args);
+    Assert.assertEquals(0, res);
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java?rev=921670&r1=921669&r2=921670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java
Thu Mar 11 02:38:21 2010
@@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.mapred.TaskStatus.Phase;
 import org.apache.hadoop.mapred.TaskStatus.State;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.conf.Configuration;
 
 import org.apache.hadoop.mapred.FakeJobs;
 import org.junit.Test;
@@ -89,9 +90,8 @@ public class TestSimulatorJobTracker {
     short responseId = 0;
     int now = 0;
 
-    FakeTaskTracker(InterTrackerProtocol jobTracker, String taskTrackerName,
-        String hostName, int maxMapTasks, int maxReduceTasks) {
-      super(jobTracker, taskTrackerName, hostName, maxMapTasks, maxReduceTasks);
+    FakeTaskTracker(InterTrackerProtocol jobTracker, Configuration conf) {
+      super(jobTracker, conf);
 
       LOG.info("FakeTaskTracker constructor, taskTrackerName="
           + taskTrackerName);
@@ -199,9 +199,15 @@ public class TestSimulatorJobTracker {
       LOG.info("From JTQueue: job id = " + js.getJobID());
     }
 
-    FakeTaskTracker fakeTracker = new FakeTaskTracker(sjobTracker,
-        "tracker_host1.foo.com:localhost/127.0.0.1:9010", "host1.foo.com", 10,
-        10);
+    Configuration ttConf = new Configuration();
+    ttConf.set("mumak.tasktracker.tracker.name", 
+               "tracker_host1.foo.com:localhost/127.0.0.1:9010");
+    ttConf.set("mumak.tasktracker.host.name", "host1.foo.com"); 
+    ttConf.setInt("mapred.tasktracker.map.tasks.maximum", 10); 
+    ttConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 10);
+    ttConf.setInt("mumak.tasktracker.heartbeat.fuzz", -1); 
+    FakeTaskTracker fakeTracker = new FakeTaskTracker(sjobTracker, ttConf);
+    
     int numLaunchTaskActions = 0;
 
     for (int i = 0; i < NoMaps * 2; ++i) { // we should be able to assign all

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorTaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorTaskTracker.java?rev=921670&r1=921669&r2=921670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorTaskTracker.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorTaskTracker.java
Thu Mar 11 02:38:21 2010
@@ -24,6 +24,7 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -66,8 +67,6 @@ public class TestSimulatorTaskTracker {
   
   final static String taskAttemptIdPrefix = "attempt_200907150128_0007_";
   final String taskTrackerName = "test_task_tracker";
-  final int maxMapSlots = 3;
-  final int maxReduceSlots = 3;
   
   @Before
   public void setUp() {
@@ -77,9 +76,13 @@ public class TestSimulatorTaskTracker {
     } catch (Exception e) {
       Assert.fail("Couldn't set up the mock job tracker: " + e);
     }
-    taskTracker = new SimulatorTaskTracker(jobTracker, taskTrackerName,
-                                           "test_host", 
-                                           maxMapSlots, maxReduceSlots);
+    Configuration ttConf = new Configuration();
+    ttConf.set("mumak.tasktracker.tracker.name", taskTrackerName);
+    ttConf.set("mumak.tasktracker.host.name", "test_host");
+    ttConf.setInt("mapred.tasktracker.map.tasks.maximum", 3);
+    ttConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 3);
+    ttConf.setInt("mumak.tasktracker.heartbeat.fuzz", -1);    
+    taskTracker = new SimulatorTaskTracker(jobTracker, ttConf);
     eventQueue = new CheckedEventQueue(simulationStartTime); 
   }
   

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRandomSeedGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRandomSeedGenerator.java?rev=921670&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRandomSeedGenerator.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRandomSeedGenerator.java
Thu Mar 11 02:38:21 2010
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.apache.hadoop.tools.rumen.RandomSeedGenerator.getSeed;
+
+public class TestRandomSeedGenerator {
+  @Test
+  public void testSeedGeneration() {
+    long masterSeed1 = 42;
+    long masterSeed2 = 43;
+    
+    assertTrue("Deterministic seeding",
+        getSeed("stream1", masterSeed1) == getSeed("stream1", masterSeed1));
+    assertTrue("Deterministic seeding",
+        getSeed("stream2", masterSeed2) == getSeed("stream2", masterSeed2));
+    assertTrue("Different streams", 
+        getSeed("stream1", masterSeed1) != getSeed("stream2", masterSeed1));
+    assertTrue("Different master seeds",
+        getSeed("stream1", masterSeed1) != getSeed("stream1", masterSeed2));
+  }
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java?rev=921670&r1=921669&r2=921670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java
(original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java
Thu Mar 11 02:38:21 2010
@@ -38,7 +38,6 @@ public abstract class AbstractClusterSto
   protected Map<String, MachineNode> mNodeMap;
   protected Map<String, RackNode> rNodeMap;
   protected int maximumDistance = 0;
-  protected Random random;
   
   @Override
   public Set<MachineNode> getMachines() {
@@ -53,7 +52,8 @@ public abstract class AbstractClusterSto
   }
   
   @Override
-  public synchronized MachineNode[] getRandomMachines(int expected) {
+  public synchronized MachineNode[] getRandomMachines(int expected, 
+                                                      Random random) {
     if (expected == 0) {
       return new MachineNode[0];
     }
@@ -64,7 +64,6 @@ public abstract class AbstractClusterSto
 
     if (mNodesFlattened == null) {
       mNodesFlattened = machineNodes.toArray(new MachineNode[total]);
-      random = new Random();
     }
 
     MachineNode[] retval = new MachineNode[select];

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java?rev=921670&r1=921669&r2=921670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java Thu Mar
11 02:38:21 2010
@@ -18,6 +18,7 @@
 package org.apache.hadoop.tools.rumen;
 
 import java.util.Set;
+import java.util.Random;
 
 /**
  * {@link ClusterStory} represents all configurations of a MapReduce cluster,
@@ -45,9 +46,10 @@ public interface ClusterStory {
   /**
    * Select a random set of machines.
    * @param expected The expected sample size.
+   * @param random Random number generator to use.
    * @return An array of up to expected number of {@link MachineNode}s.
    */
-  public MachineNode[] getRandomMachines(int expected);
+  public MachineNode[] getRandomMachines(int expected, Random random);
 
   /**
    * Get {@link MachineNode} by its host name.

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RandomSeedGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RandomSeedGenerator.java?rev=921670&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RandomSeedGenerator.java
(added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RandomSeedGenerator.java
Thu Mar 11 02:38:21 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The purpose of this class is to generate new random seeds from a master
+ * seed. This is needed to make the Random().next*() calls in rumen and mumak
+ * deterministic so that mumak simulations become deterministically replayable.
+ *
+ * In these tools we need many independent streams of random numbers, some of
+ * which are created dynamically. We seed these streams with the sub-seeds 
+ * returned by RandomSeedGenerator.
+ * 
+ * For a slightly more complicated approach to generating multiple streams of 
+ * random numbers with better theoretical guarantees, see
+ * P. L'Ecuyer, R. Simard, E. J. Chen, and W. D. Kelton, 
+ * ``An Objected-Oriented Random-Number Package with Many Long Streams and 
+ * Substreams'', Operations Research, 50, 6 (2002), 1073--1075
+ * http://www.iro.umontreal.ca/~lecuyer/papers.html
+ * http://www.iro.umontreal.ca/~lecuyer/myftp/streams00/
+ */
+public class RandomSeedGenerator {
+  private static Log LOG = LogFactory.getLog(RandomSeedGenerator.class);
+  
+  /**
+   * Generates a new random seed.
+   *
+   * @param streamId a string identifying the stream of random numbers
+   * @param masterSeed higher level master random seed
+   * @return the random seed. Different (streamId, masterSeed) pairs result in
+   *         (vastly) different random seeds.
+   */   
+  public static long getSeed(String streamId, long masterSeed) {
+    MessageDigest md5 = null; 
+    try {
+      md5 = MessageDigest.getInstance("MD5");
+    } catch (NoSuchAlgorithmException nsae) {
+      throw new RuntimeException("Can't create MD5 digests", nsae);
+    }
+    //'/' : make sure that we don't get the same str from ('11',0) and ('1',10)
+    // We could have fed the bytes of masterSeed one by one to md5.update()
+    // instead
+    String str = streamId + '/' + masterSeed;
+    byte[] digest = md5.digest(str.getBytes());
+    // Create a long from the first 8 bytes of the digest
+    // This is fine as MD5 has the avalanche property.
+    // Paranoids could have XOR folded the other 8 bytes in too. 
+    long seed = 0;
+    for (int i=0; i<8; i++) {
+      seed = (seed<<8) + ((int)digest[i]+128);
+    }
+    return seed;
+  }
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java?rev=921670&r1=921669&r2=921670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java Thu Mar
11 02:38:21 2010
@@ -57,6 +57,7 @@ public class ZombieJob implements JobSto
   private JobConf jobConf;
 
   private long seed;
+  private long numRandomSeeds = 0;
   private boolean hasRandomSeed = false;
 
   private Map<LoggedDiscreteCDF, CDFRandomGenerator> interpolatorMap =
@@ -194,7 +195,8 @@ public class ZombieJob implements JobSto
         if (cluster == null) {
           splitsList.add(new FileSplit(emptyPath, 0, 0, new String[0]));
         } else {
-          MachineNode[] mNodes = cluster.getRandomMachines(avgHostPerSplit);
+          MachineNode[] mNodes = cluster.getRandomMachines(avgHostPerSplit,
+                                                           random);
           String[] hosts = new String[mNodes.length];
           for (int j = 0; j < hosts.length; ++j) {
             hosts[j] = mNodes[j].getName();
@@ -794,7 +796,13 @@ public class ZombieJob implements JobSto
 
     return makeUpRuntimeCore(loggedDiscreteCDF);
   }
-
+  
+  private synchronized long getNextRandomSeed() {
+    numRandomSeeds++;
+    return RandomSeedGenerator.getSeed("forZombieJob" + job.getJobID(),
+                                       numRandomSeeds);
+  }
+   
   private long makeUpRuntimeCore(LoggedDiscreteCDF loggedDiscreteCDF) {
     CDFRandomGenerator interpolator;
 
@@ -809,7 +817,7 @@ public class ZombieJob implements JobSto
 
       interpolator =
           hasRandomSeed ? new CDFPiecewiseLinearRandomGenerator(
-              loggedDiscreteCDF, ++seed)
+              loggedDiscreteCDF, getNextRandomSeed())
               : new CDFPiecewiseLinearRandomGenerator(loggedDiscreteCDF);
 
       /*

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java?rev=921670&r1=921669&r2=921670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java
(original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java
Thu Mar 11 02:38:21 2010
@@ -30,9 +30,15 @@ public class ZombieJobProducer implement
   private final JobTraceReader reader;
   private final ZombieCluster cluster;
 
-  private ZombieJobProducer(JobTraceReader reader, ZombieCluster cluster) {
+  private boolean hasRandomSeed = false;
+  private long randomSeed = 0;
+      
+  private ZombieJobProducer(JobTraceReader reader, ZombieCluster cluster,
+      boolean hasRandomSeed, long randomSeed) {
     this.reader = reader;
     this.cluster = cluster;
+    this.hasRandomSeed = hasRandomSeed;
+    this.randomSeed = (hasRandomSeed) ? randomSeed : System.nanoTime();
   }
 
   /**
@@ -49,9 +55,29 @@ public class ZombieJobProducer implement
    */
   public ZombieJobProducer(Path path, ZombieCluster cluster, Configuration conf)
       throws IOException {
-    this(new JobTraceReader(path, conf), cluster);
+    this(new JobTraceReader(path, conf), cluster, false, -1);
   }
 
+  
+  /**
+   * Constructor
+   * 
+   * @param path
+   *          Path to the JSON trace file, possibly compressed.
+   * @param cluster
+   *          The topology of the cluster that corresponds to the jobs in the
+   *          trace. The argument can be null if we do not have knowledge of the
+   *          cluster topology.
+   * @param conf
+   * @param randomSeed
+   *          use a deterministic seed.
+   * @throws IOException
+   */
+  public ZombieJobProducer(Path path, ZombieCluster cluster,
+      Configuration conf, long randomSeed) throws IOException {
+    this(new JobTraceReader(path, conf), cluster, true, randomSeed);
+  }
+  
   /**
    * Constructor
    * 
@@ -65,13 +91,39 @@ public class ZombieJobProducer implement
    */
   public ZombieJobProducer(InputStream input, ZombieCluster cluster)
       throws IOException {
-    this(new JobTraceReader(input), cluster);
+    this(new JobTraceReader(input), cluster, false, -1);
+  }
+
+  /**
+   * Constructor
+   * 
+   * @param input
+   *          The input stream for the JSON trace.
+   * @param cluster
+   *          The topology of the cluster that corresponds to the jobs in the
+   *          trace. The argument can be null if we do not have knowledge of the
+   *          cluster topology.
+   * @param randomSeed
+   *          use a deterministic seed.
+   * @throws IOException
+   */
+  public ZombieJobProducer(InputStream input, ZombieCluster cluster,
+      long randomSeed) throws IOException {
+    this(new JobTraceReader(input), cluster, true, randomSeed);
   }
 
   @Override
   public ZombieJob getNextJob() throws IOException {
     LoggedJob job = reader.getNext();
-    return (job == null) ? null : new ZombieJob(job, cluster);
+    if (job == null) {
+      return null;
+    } else if (hasRandomSeed) {
+      long subRandomSeed = RandomSeedGenerator.getSeed(
+            "forZombieJob" + job.getJobID(), randomSeed);
+      return new ZombieJob(job, cluster, subRandomSeed);
+    } else {
+      return new ZombieJob(job, cluster);
+    }
   }
 
   @Override



Mime
View raw message