hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r818674 [3/4] - in /hadoop/mapreduce/trunk: ./ src/contrib/ src/contrib/mumak/ src/contrib/mumak/bin/ src/contrib/mumak/conf/ src/contrib/mumak/ivy/ src/contrib/mumak/src/ src/contrib/mumak/src/java/ src/contrib/mumak/src/java/org/ src/cont...
Date Fri, 25 Sep 2009 00:25:31 GMT
Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/CheckedEventQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/CheckedEventQueue.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/CheckedEventQueue.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/CheckedEventQueue.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.mapred.TaskStatus.State;
+import org.apache.hadoop.mapred.TaskStatus.Phase;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * An EventQueue that checks events against a list of expected events upon
+ * enqueueing. Also contains routines for creating expected HeartbeatEvents and
+ * all expected events related to running map or reduce tasks on a task tracker.
+ */
+class CheckedEventQueue extends SimulatorEventQueue {
+  /**
+   * expected list of events to be returned from all EventListener.accept()
+   * called at time t, t is the key if no events are generated an empty list
+   * needs to be put there
+   * 
+   * IMPORTANT: this is NOT the events to be delivered at time t from the event
+   * queue, it is the list events to be inserted into the event queue at time t
+   */
+  private SortedMap<Long, List<SimulatorEvent>> expectedEvents = 
+      new TreeMap<Long, List<SimulatorEvent>>();
+       
+  // current simulation time
+  private long now;
+  private long simulationStartTime;
+  
+  /**
+   * We need the simulation start time so that we know the time of the first
+   * add().
+   * 
+   * @param simulationStartTime
+   *          Simulation start time.
+   */
+  public CheckedEventQueue(long simulationStartTime) {
+    now = simulationStartTime;
+    this.simulationStartTime = simulationStartTime;
+  }
+  
+  void check(SimulatorEvent event) {
+    for (Iterator<Map.Entry<Long, List<SimulatorEvent>>> it = expectedEvents.entrySet()
+        .iterator(); it.hasNext();) {
+      Map.Entry<Long, List<SimulatorEvent>> entry = it.next();
+      long insertTime = entry.getKey();
+      Assert.assertTrue(insertTime <= now);
+      if (insertTime < now) {
+        List<SimulatorEvent> events = entry.getValue();
+        if (!events.isEmpty()) {
+          Assert.fail("There are " + events.size() + " events at time "
+            + insertTime + " before " + now + ". First event: "+events.get(0));
+        }
+        it.remove();
+      } else { // insertTime == now
+        break;
+      }
+    }
+    
+    List<SimulatorEvent> expected = expectedEvents.get(now);
+    boolean found = false;
+    for (SimulatorEvent ee : expected) {
+      if (isSameEvent(ee, event)) {
+        expected.remove(ee);
+        found = true;
+        break;
+      }
+    }
+
+    Assert.assertTrue("Unexpected event to enqueue, now=" + now  + ", event=" + 
+               event + ", expecting=" + expected, found);
+  }
+  
+  /**
+   * We intercept the main routine of the real EventQueue and check the new
+   * event returned by accept() against the expectedEvents table
+   */
+  @Override
+  public boolean add(SimulatorEvent event) {
+    check(event);
+    return super.add(event);
+  }
+  
+  @Override
+  public boolean addAll(Collection<? extends SimulatorEvent> events) {
+    for (SimulatorEvent event : events) {
+      check(event);
+    }
+    return super.addAll(events);
+  }
+
+  // We need to override get() to track the current simulation time
+  @Override
+  public SimulatorEvent get() {
+    SimulatorEvent ret = super.get();
+    if (ret != null) {
+      now = ret.getTimeStamp();
+    }
+    return ret;
+  }
+
+  /**
+   * Auxiliary function for populating the expectedEvents table If event is null
+   * then just marks that an accept happens at time 'when', and the list of new
+   * events is empty
+   */
+  public void addExpected(long when, SimulatorEvent event) {
+    Assert.assertNotNull(event);
+    List<SimulatorEvent> expected = expectedEvents.get(when);
+    if (expected == null) {
+      expected = new ArrayList<SimulatorEvent>();
+      expectedEvents.put(when, expected);
+    }
+    expected.add(event);
+  }
+  
+  public long getLastCheckTime() {
+    return expectedEvents.lastKey();
+  }
+  
+  // there should be an empty expected event list left for the last
+  // time to check
+  public void checkMissingExpected() {
+    Assert.assertTrue(expectedEvents.size() <= 1);
+    for (List<SimulatorEvent> events : expectedEvents.values()) {
+      Assert.assertTrue(events.isEmpty());
+    }
+  }
+  
+  // fills in the expected events corresponding to the execution of a map task
+  public void expectMapTask(SimulatorTaskTracker taskTracker,
+                            TaskAttemptID taskId,
+                            long mapStart, long mapRuntime) {
+    long mapDone = mapStart + mapRuntime;
+    org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi =
+        org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId);
+    MapTaskStatus status = new MapTaskStatus(taskIdOldApi, 1.0f, 1,
+        State.SUCCEEDED, null, null, null, Phase.MAP, null);
+    status.setFinishTime(mapDone);
+    TaskAttemptCompletionEvent completionEvent = 
+        new TaskAttemptCompletionEvent(taskTracker, status);
+    addExpected(mapStart, completionEvent);
+  }
+
+  // fills in the expected events corresponding to the execution of a reduce 
+  // task
+  public void expectReduceTask(SimulatorTaskTracker taskTracker,
+                               TaskAttemptID taskId, long mapDone, 
+                               long reduceRuntime) {
+    long reduceDone = mapDone + reduceRuntime;
+    org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi =
+        org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId);
+    ReduceTaskStatus status = new ReduceTaskStatus(taskIdOldApi, 1.0f, 1,
+        State.SUCCEEDED, null, null, null, Phase.REDUCE, null);
+    status.setFinishTime(reduceDone);
+    TaskAttemptCompletionEvent completionEvent = 
+        new TaskAttemptCompletionEvent(taskTracker, status);
+    addExpected(mapDone, completionEvent);
+
+  }
+  
+  /**
+   * Fills in the events corresponding to the self heartbeats numAccepts is the
+   * number of times accept() will be called, it must be >= 1
+   */
+  public void expectHeartbeats(SimulatorTaskTracker taskTracker,
+                               int numAccepts, int heartbeatInterval) {
+    // initial heartbeat
+    addExpected(simulationStartTime,
+        new HeartbeatEvent(taskTracker, simulationStartTime));
+    long simulationTime = simulationStartTime;
+    for(int i=0; i<numAccepts; i++) {
+      long heartbeatTime = simulationTime + heartbeatInterval;
+      HeartbeatEvent he = new HeartbeatEvent(taskTracker, heartbeatTime);
+      addExpected(simulationTime, he);
+      simulationTime = heartbeatTime;
+    }
+  }
+  
+  /**
+   * Returns true iff two events are the same. We did not use equals() because
+   * we may want to test for partial equality only, and we don't want to bother
+   * writing new hashCode()s either.
+   */
+  protected boolean isSameEvent(SimulatorEvent event, SimulatorEvent otherEvent) {
+    // check for null reference
+    Assert.assertNotNull(event);
+    Assert.assertNotNull(otherEvent);
+    // type check
+    if (!event.getClass().equals(otherEvent.getClass())) {
+      return false;
+    }
+    // compare significant fields
+    if (event.listener != otherEvent.listener || 
+        event.timestamp != otherEvent.timestamp) {
+      return false;
+    }
+    if (event instanceof TaskAttemptCompletionEvent) {
+      TaskStatus s = ((TaskAttemptCompletionEvent)event).getStatus();
+      TaskStatus os = ((TaskAttemptCompletionEvent)otherEvent).getStatus();
+      if (!s.getTaskID().equals(os.getTaskID())) {
+        return false;
+      }
+    } 
+    return true;
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/FakeJobs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/FakeJobs.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/FakeJobs.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/FakeJobs.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.Random;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.MapTaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
+import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+
+/**
+ * {@link JobStory} represents the runtime information available for a
+ * completed Map-Reduce job.
+ */
+public class FakeJobs implements JobStory {
+	String jobName ;
+	long submissionTime = 0;
+	int maps = 0;
+	int reduces = 0;
+	Random random = new Random();
+
+	public FakeJobs (String name, long submissionTime, int nmaps, int nreduces) {
+		jobName = name;
+		this.submissionTime = submissionTime;
+		this.maps = nmaps;
+		this.reduces = nreduces;
+
+	}
+	public String getName() {
+		return jobName;
+	}
+	public org.apache.hadoop.mapreduce.JobID getJobID() {
+	  return null;
+	}
+	/**
+	 * Get the user who ran the job.
+	 * @return the user who ran the job
+	 */
+	public String getUser() {
+		return "mumak";
+	}
+
+	/**
+	 * Get the job submission time.
+	 * @return the job submission time
+	 */
+	public long getSubmissionTime(){
+
+		return submissionTime;
+	}
+
+	/**
+	 * Get the number of maps in the {@link JobStory}.
+	 * @return the number of maps in the <code>Job</code>
+	 */
+	public int getNumberMaps() {
+
+		return maps;
+	}
+
+	/**
+	 * Get the number of reduce in the {@link JobStory}.
+	 * @return the number of reduces in the <code>Job</code>
+	 */
+	public int getNumberReduces() {
+
+		return reduces;
+
+	}
+	/**
+	 * Get the input splits for the job.
+	 * @return the input splits for the job
+	 */
+  public InputSplit[] getInputSplits() {
+    InputSplit[] retval = new InputSplit[getNumberMaps()];
+    FileSplit tmp = new FileSplit(new Path("/"), 0, 0, new String[0]);
+
+    for (int i = 0; i < retval.length; ++i) {
+      retval[i] = tmp;
+    }
+
+    return retval;
+  }
+
+	/**
+	 * Get {@link TaskInfo} for a given task.
+	 * @param taskType {@link TaskType} of the task
+	 * @param taskNumber Partition number of the task
+	 * @return the <code>TaskInfo</code> for the given task
+	 */
+	public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+
+		return null;	
+
+	}  
+	/**
+	 * Get {@link TaskAttemptInfo} for a given task-attempt.
+	 * @param taskType {@link TaskType} of the task-attempt
+	 * @param taskNumber Partition number of the task-attempt
+	 * @param taskAttemptNumber Attempt number of the task
+	 * @return the <code>TaskAttemptInfo</code> for the given task-attempt
+	 */
+	public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, 
+			int taskNumber, 
+			int taskAttemptNumber) {
+		int bytesin = random.nextInt()%10000;
+		int recsin = bytesin/10 ;
+		int bytesout = (int)(bytesin*1.5);
+		int recsout = bytesout/10 ;
+		int maxMem = 1000000;
+		long mapRunTime = 5678;
+		long reduceShuffleTime = 0;
+		long reduceSortTime = 0;
+		long reduceRunTime = 1234;
+		TaskInfo task = new TaskInfo(bytesin, recsin, bytesout, recsout, maxMem);
+		TaskAttemptInfo tAInfo = null;
+		if (taskType == TaskType.MAP) {
+			tAInfo = new MapTaskAttemptInfo(TaskStatus.State.SUCCEEDED,
+					task, mapRunTime);
+		} else if (taskType == TaskType.REDUCE) {
+			tAInfo = new ReduceTaskAttemptInfo(TaskStatus.State.SUCCEEDED,
+					task, reduceShuffleTime, reduceSortTime, reduceRunTime);
+		} else {
+		  throw new IllegalArgumentException("Unsupported TaskType "+taskType);
+		}		
+		return tAInfo;
+
+	}
+	public JobConf getJobConf() {
+		JobConf jobConf = new JobConf();
+		jobConf.setJobName(jobName);
+		jobConf.setUser("mumak");
+		jobConf.setNumMapTasks(maps);
+		jobConf.setNumReduceTasks(reduces);
+		return jobConf;
+	}
+	
+  @Override
+  public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
+      int taskAttemptNumber, int locality) {
+    return getTaskAttemptInfo(TaskType.MAP, taskNumber, taskAttemptNumber);
+  }
+  
+  @Override
+  public Pre21JobHistoryConstants.Values getOutcome() {
+    return Pre21JobHistoryConstants.Values.SUCCESS;
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/HeartbeatHelper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/HeartbeatHelper.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/HeartbeatHelper.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/HeartbeatHelper.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.List;
+import java.util.ArrayList;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+//
+// collection of heartbeat() parameters whose correctness we care about
+// and the response we give if they are correct
+//
+public class HeartbeatHelper {
+  public TaskTrackerStatus status = 
+      new TaskTrackerStatus("dummytracker", "dummyhost");
+  public boolean acceptNewTasks = true;
+  public List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
+  
+  static final Log LOG = LogFactory.getLog(HeartbeatHelper.class);
+  
+  public void addTaskTrackerAction(TaskTrackerAction action) {
+    actions.add(action);
+  }
+  
+  // adds an expected TaskStatus report
+  public void addTaskReport(TaskStatus report) {
+    // there is no setTaskReports() in TaskTrackerStatus, so we need to
+    // create a new status object with a copy of all the other fields 
+    String trackerName = status.getTrackerName();
+    String host = status.getHost();
+    int httpPort = status.getHttpPort();
+    List<TaskStatus> taskReports = status.getTaskReports();
+    int failures = status.getFailures();
+    int maxMapTasks = status.getMaxMapSlots();
+    int maxReduceTasks = status.getMaxReduceSlots();
+    
+    taskReports.add(report);
+    status = new TaskTrackerStatus(trackerName, host, httpPort, taskReports,
+                                   failures, maxMapTasks, maxReduceTasks);
+  }
+  
+  public TaskTrackerAction[] getTaskTrackerActions() {
+    return actions.toArray(new TaskTrackerAction[0]);
+  }
+  
+  // checks most incoming parameters we care about
+  public void checkHeartbeatParameters(TaskTrackerStatus otherStatus, 
+                                       boolean otherAcceptNewTasks) {
+    Assert.assertEquals("Mismatch in acceptNewTask", 
+                 this.acceptNewTasks, otherAcceptNewTasks);
+
+    List<TaskStatus> taskReports = this.status.getTaskReports(); 
+    List<TaskStatus> otherTaskReports = otherStatus.getTaskReports(); 
+    
+    Assert.assertEquals("Mismatch in number of reported tasks",
+                 taskReports.size(), otherTaskReports.size());
+    for(TaskStatus report : taskReports) {
+      boolean found = false;
+      for(TaskStatus otherReport : otherTaskReports) {
+        if(report.getTaskID() == otherReport.getTaskID()) {
+          Assert.assertEquals("Map/reduce task mismatch",
+                       report.getIsMap(), otherReport.getIsMap());
+          Assert.assertEquals("Mismatch in run state",
+                       report.getRunState(), otherReport.getRunState());
+          Assert.assertEquals("Mismatch in run phase",
+                       report.getPhase(), otherReport.getPhase());
+          found = true;
+          break;
+        }
+      }
+      Assert.assertTrue("Task status report not found, taskID=" + 
+                 report.getTaskID(), found);
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorEngine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorEngine.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorEngine.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorEngine.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
+
+public class MockSimulatorEngine extends SimulatorEngine {
+  HashSet<String> TRACKERS=null;
+  HashMap<JobID, JobStory> jobs;
+  HashSet<JobID> submittedJobs;
+  HashSet<JobID> completedJobs;
+  private int fixedJobs;
+  private long startTime;
+
+  public static final Log LOG = LogFactory.getLog(MockSimulatorEngine.class);
+  
+  public MockSimulatorEngine(int nJobs,
+      @SuppressWarnings("unused") int nTrackers) {
+    super();
+    fixedJobs = nJobs;
+    jobs = new HashMap<JobID, JobStory>();
+    submittedJobs = new HashSet<JobID>();
+    completedJobs = new HashSet<JobID>();
+  }
+
+  @Override
+  public void run() throws IOException, InterruptedException {
+    startTime = System.currentTimeMillis();
+    init();
+    validateInitialization();
+    SimulatorEvent nextEvent;
+    while ((nextEvent = queue.get()) != null && nextEvent.getTimeStamp() < terminateTime
+        && !shutdown) {
+      currentTime = nextEvent.getTimeStamp();
+      SimulatorEventListener listener = nextEvent.getListener();
+      if (nextEvent instanceof JobSubmissionEvent) {
+        validateJobSubmission((JobSubmissionEvent)nextEvent);
+      } else if (nextEvent instanceof JobCompleteEvent) {
+        validateJobComplete((JobCompleteEvent)nextEvent);
+      }
+      List<SimulatorEvent> response = listener.accept(nextEvent);
+      queue.addAll(response);
+    }
+    validateEnd();
+    summary(System.out);
+  }
+  
+  private void validateEnd() {
+    Assert.assertEquals("Number of submitted jobs does not match trace",
+        submittedJobs.size(), fixedJobs);
+    Assert.assertEquals("Number of submitted jobs does not match trace",
+        completedJobs.size(), fixedJobs);
+  }
+
+  private Pre21JobHistoryConstants.Values convertState (JobStatus status) {
+    int runState = status.getRunState();
+    if (runState == JobStatus.FAILED) {
+      return Pre21JobHistoryConstants.Values.FAILED;
+    } else if (runState == JobStatus.SUCCEEDED) {
+      return Pre21JobHistoryConstants.Values.SUCCESS;
+    } else {
+      throw new IllegalArgumentException("unknown status " + status);
+    }
+  }
+  
+  private void validateJobComplete(JobCompleteEvent completeEvent) {
+    JobID jobId = completeEvent.getJobStatus().getJobID();
+    JobStatus finalStatus = completeEvent.getJobStatus();
+
+    Assert.assertTrue("Job completed was not submitted:"+jobId, 
+               submittedJobs.contains(jobId));
+    Assert.assertFalse("Job completed more than once:" + jobId, 
+                completedJobs.contains(jobId));
+    completedJobs.add(jobId);
+   
+    Pre21JobHistoryConstants.Values finalValue = jobs.get(jobId).getOutcome();
+    Pre21JobHistoryConstants.Values obtainedStatus = convertState(finalStatus);
+    Assert.assertEquals("Job completion final status mismatch", obtainedStatus,
+        finalValue);
+  }
+
+  private void validateJobSubmission(JobSubmissionEvent submissionEvent) {
+    JobID jobId = submissionEvent.getJob().getJobID();
+    LOG.info("Job being submitted: " + jobId);
+    Assert.assertFalse("Job " + jobId + " is already submitted", submittedJobs
+        .contains(jobId));
+    LOG.info("Adding to submitted Jobs " + jobId);
+    submittedJobs.add(jobId); 
+    jobs.put(jobId, submissionEvent.getJob());
+    Pre21JobHistoryConstants.Values finalValue = submissionEvent.getJob().getOutcome();
+    Assert.assertTrue("Job has final state neither SUCCESS nor FAILED",
+        finalValue == Pre21JobHistoryConstants.Values.FAILED
+            || finalValue == Pre21JobHistoryConstants.Values.SUCCESS);
+  }
+
+  private void validateInitialization() {
+    // The JobTracker has been initialized.
+    Assert.assertTrue("SimulatorJobTracker is null", jt != null);
+    Assert.assertTrue("Clock of simulator is behind startTime",
+        SimulatorJobTracker.getClock().getTime() >= startTime);
+    // The JobClient has been initialized
+    Assert.assertTrue("SimulatorJobClient is null", jc != null);
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.mapred.TaskStatus.State;
+import org.apache.hadoop.mapred.TaskStatus.Phase;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+import org.apache.hadoop.tools.rumen.MapTaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo;
+//
+// Mock jobtracker class that check heartbeat() in parameters and 
+// sends responses based on a prepopulated table
+//  
+public class MockSimulatorJobTracker implements InterTrackerProtocol,
+                                                ClientProtocol {
+  private final long simulationStartTime;
+  private final int heartbeatInterval;
+  
+  // Helper table, used iff checkHeartbeats == true
+  // Contains the expected task tracker status report at time t for all task 
+  // trackers identified by their name and the heartbeat response to send
+  private SortedMap<Long, TreeMap<String, HeartbeatHelper>> heartbeats = 
+      new TreeMap<Long, TreeMap<String, HeartbeatHelper>>();
+  private final boolean checkHeartbeats;
+  private int jobId = 0;
+  
+  static final Log LOG = LogFactory.getLog(MockSimulatorJobTracker.class);
+
+  public MockSimulatorJobTracker(long simulationStartTime,
+                                 int heartbeatInterval,
+                                 boolean checkHeartbeats) {
+    this.simulationStartTime = simulationStartTime;
+    this.heartbeatInterval = heartbeatInterval;
+    this.checkHeartbeats = checkHeartbeats;
+  }
+  
+  @Override
+  public JobID getNewJobID() throws IOException {
+    return new JobID("mockJT", jobId++);
+  }
+
+  @Override
+  public JobStatus submitJob(JobID jobId) throws IOException {
+    JobStatus status = new JobStatus(jobId, 0.0f, 0.0f, 0.0f, 0.0f,
+        JobStatus.State.RUNNING, JobPriority.NORMAL, "", "", "", "");
+    return status;
+  }
+
+  @Override
+  public HeartbeatResponse heartbeat(TaskTrackerStatus status,
+      boolean restarted, boolean initialContact, boolean acceptNewTasks,
+      short responseId) throws IOException {
+    if (!(status instanceof SimulatorTaskTrackerStatus)) {
+      throw new IllegalArgumentException(
+          "Expecting SimulatorTaskTrackerStatus, actual status type "
+              + status.getClass());
+    }
+    SimulatorTaskTrackerStatus trackerStatus =
+        (SimulatorTaskTrackerStatus)status;
+    long now = trackerStatus.getCurrentSimulationTime();
+    String trackerName = status.getTrackerName();
+    
+    LOG.debug("Received heartbeat() from trackerName=" + trackerName + 
+              ", now=" + now);
+
+    HeartbeatResponse response = new HeartbeatResponse();
+    response.setHeartbeatInterval(heartbeatInterval);
+    response.setActions(new TaskTrackerAction[0]);
+    
+    if (checkHeartbeats) {         
+      Assert.assertFalse("No more heartbeats were expected ", heartbeats.isEmpty());
+      long nextToCheck = heartbeats.firstKey();
+      // Missing heartbeat check
+      Assert.assertTrue(nextToCheck <= now);
+      if (nextToCheck < now) {
+        LOG.debug("Simulation time progressed, last checked heartbeat at=" + 
+                   nextToCheck + ", now=" + now + ". Checking if no " +
+                   "required heartbeats were missed in the past");
+        SortedMap<String, HeartbeatHelper> previousHeartbeats = 
+            heartbeats.get(nextToCheck);
+        Assert.assertNotNull(previousHeartbeats);
+        Assert.assertTrue(previousHeartbeats.isEmpty());
+        heartbeats.remove(nextToCheck);
+        nextToCheck = heartbeats.firstKey();
+      }
+      Assert.assertEquals("Heartbeat at the wrong time", nextToCheck, now);
+      
+      SortedMap<String, HeartbeatHelper> currentHeartbeats = 
+            heartbeats.get(now);
+      HeartbeatHelper currentHeartbeat = currentHeartbeats.get(trackerName);
+      Assert.assertNotNull("Unknown task tracker name=" + trackerName,
+                    currentHeartbeat);
+      currentHeartbeats.remove(trackerName);
+    
+      currentHeartbeat.checkHeartbeatParameters(status, acceptNewTasks);
+    
+      response.setActions(currentHeartbeat.getTaskTrackerActions());
+    }
+    return response;
+  }
+  
+  //
+  // Populates the mock jobtracker's helper & checker table with expected
+  // empty reports from the task trackers and empty task actions to perform
+  // 
+  public void expectEmptyHeartbeats(String taskTrackerName, 
+                                    int numHeartbeats) {
+    long simulationTime = simulationStartTime;
+    for (int i=0; i<numHeartbeats; i++) {
+      TreeMap<String, HeartbeatHelper> hb = heartbeats.get(simulationTime);
+      if (hb == null) {
+        hb = new TreeMap<String, HeartbeatHelper>();
+        heartbeats.put(simulationTime, hb);
+      }
+      hb.put(taskTrackerName, new HeartbeatHelper());
+      simulationTime += heartbeatInterval;
+    }
+  }
+
+  // Fills in all the expected and return heartbeat parameters corresponding
+  // to running a map task on a task tracker.
+  // Use killTime < 0 if not killed
+  public void runMapTask(String taskTrackerName, TaskAttemptID taskId,
+                         long mapStart, long mapRuntime, long killHeartbeat) {
+    long mapDone = mapStart + mapRuntime;
+    long mapEndHeartbeat = nextHeartbeat(mapDone);
+    final boolean isKilled = (killHeartbeat>=0);
+    if (isKilled) {
+      mapEndHeartbeat = nextHeartbeat(killHeartbeat + 1);
+    }
+
+    LOG.debug("mapStart=" + mapStart + ", mapDone=" + mapDone + 
+              ", mapEndHeartbeat=" + mapEndHeartbeat + 
+              ", killHeartbeat=" + killHeartbeat);
+    
+    final int numSlotsRequired = 1;
+    org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi = 
+        org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId);        
+    Task task = new MapTask("dummyjobfile", taskIdOldApi, 0, "dummysplitclass",
+                            null, numSlotsRequired);
+    // all byte counters are 0
+    TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0); 
+    MapTaskAttemptInfo taskAttemptInfo = 
+        new MapTaskAttemptInfo(State.SUCCEEDED, taskInfo, mapRuntime);
+    TaskTrackerAction action = 
+        new SimulatorLaunchTaskAction(task, taskAttemptInfo);
+    heartbeats.get(mapStart).get(taskTrackerName).addTaskTrackerAction(action);
+    if (isKilled) {
+      action = new KillTaskAction(taskIdOldApi);
+      heartbeats.get(killHeartbeat).get(taskTrackerName).addTaskTrackerAction(
+         action);
+    }
+
+    for(long simulationTime = mapStart + heartbeatInterval; 
+        simulationTime <= mapEndHeartbeat;
+        simulationTime += heartbeatInterval) {
+      State state = simulationTime < mapEndHeartbeat ? 
+          State.RUNNING : State.SUCCEEDED;
+      if (simulationTime == mapEndHeartbeat && isKilled) {
+        state = State.KILLED;
+      }
+      MapTaskStatus mapStatus = new MapTaskStatus(
+          task.getTaskID(), 0.0f, 0, state, "", "", null, Phase.MAP, null);
+      heartbeats.get(simulationTime).get(taskTrackerName).addTaskReport(
+         mapStatus);
+    }
+  }
+
+  // Fills in all the expected and return heartbeat parameters corresponding
+  // to running a reduce task on a task tracker.
+  // Use killTime<0 if not killed
+  public void runReduceTask(String taskTrackerName, TaskAttemptID taskId,
+                            long reduceStart, long mapDoneDelay, 
+                            long reduceRuntime, long killHeartbeat) {
+    long mapDone = nextHeartbeat(reduceStart + mapDoneDelay);
+    long reduceDone = mapDone + reduceRuntime;
+    long reduceEndHeartbeat = nextHeartbeat(reduceDone);
+    final boolean isKilled = (killHeartbeat>=0);
+    if (isKilled) {
+      reduceEndHeartbeat = nextHeartbeat(killHeartbeat + 1);
+    }
+
+    LOG.debug("reduceStart=" + reduceStart + ", mapDone=" + mapDone + 
+              ", reduceDone=" + reduceDone + 
+              ", reduceEndHeartbeat=" + reduceEndHeartbeat +
+              ", killHeartbeat=" + killHeartbeat);
+
+    final int numSlotsRequired = 1;
+    org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi = 
+        org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId);        
+    Task task = new ReduceTask("dummyjobfile", taskIdOldApi, 0, 0,
+                               numSlotsRequired);
+    // all byte counters are 0
+    TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0); 
+    ReduceTaskAttemptInfo taskAttemptInfo = 
+        new ReduceTaskAttemptInfo(State.SUCCEEDED, taskInfo, 0, 0, 
+                                  reduceRuntime);
+    TaskTrackerAction action = 
+        new SimulatorLaunchTaskAction(task, taskAttemptInfo);    
+    heartbeats.get(reduceStart).get(taskTrackerName).addTaskTrackerAction(
+        action);
+    if (!isKilled || mapDone < killHeartbeat) {
+      action = new AllMapsCompletedTaskAction(task.getTaskID());
+      heartbeats.get(mapDone).get(taskTrackerName).addTaskTrackerAction(
+          action);
+    }
+    if (isKilled) {
+      action = new KillTaskAction(taskIdOldApi);
+      heartbeats.get(killHeartbeat).get(taskTrackerName).addTaskTrackerAction(
+         action);
+    }
+
+    for(long simulationTime = reduceStart + heartbeatInterval; 
+        simulationTime <= reduceEndHeartbeat;
+        simulationTime += heartbeatInterval) {
+      State state = simulationTime < reduceEndHeartbeat ? 
+          State.RUNNING : State.SUCCEEDED;
+      if (simulationTime == reduceEndHeartbeat && isKilled) {
+        state = State.KILLED;
+      }
+      // mapDone is when the all maps done event delivered
+      Phase phase = simulationTime <= mapDone ? Phase.SHUFFLE : Phase.REDUCE; 
+      ReduceTaskStatus reduceStatus = new ReduceTaskStatus(
+          task.getTaskID(), 0.0f, 0, state, "", "", null, phase, null);
+      heartbeats.get(simulationTime).get(taskTrackerName).addTaskReport(
+          reduceStatus);
+    }
+  }
+  
+  // Should be called at the end of the simulation: Mock JT should have 
+  // consumed all entries from the heartbeats table by that time
+  public void checkMissingHeartbeats() {
+    Assert.assertEquals(1, heartbeats.size());
+    long lastHeartbeat = heartbeats.firstKey();
+    Assert.assertTrue("Missing heartbeats, last heartbeat=" + lastHeartbeat,
+               heartbeats.get(lastHeartbeat).isEmpty());
+  }
+          
+  // rounds up to the next heartbeat time
+  public long nextHeartbeat(long time) {
+    long numHeartbeats = (long)Math.ceil(
+        (time - simulationStartTime)/(double)heartbeatInterval);
+    return simulationStartTime + numHeartbeats * heartbeatInterval;
+  }
+  
+  // Rest of InterTrackerProtocol follows, unused in simulation
+  @Override
+  public String getFilesystemName() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void reportTaskTrackerError(String taskTracker,
+                                     String errorClass,
+                                     String errorMessage) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid,
+      int fromEventId, int maxEvents) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String getSystemDir() {
+    throw new UnsupportedOperationException();
+  }
+  
+  @Override
+  public String getBuildVersion() {
+    throw new UnsupportedOperationException();
+  }
+  
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TaskCompletionEvent[] getTaskCompletionEvents(
+      org.apache.hadoop.mapred.JobID jobid, int fromEventId, int maxEvents)
+      throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+      InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+      InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public QueueInfo[] getChildQueues(String queueName) throws IOException,
+      InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ClusterMetrics getClusterMetrics() throws IOException,
+      InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Counters getJobCounters(JobID jobid) throws IOException,
+      InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String getJobHistoryDir() throws IOException, InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public JobStatus getJobStatus(JobID jobid) throws IOException,
+      InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public org.apache.hadoop.mapreduce.server.jobtracker.State getJobTrackerState()
+      throws IOException, InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public QueueInfo getQueue(String queueName) throws IOException,
+      InterruptedException {
+    throw new UnsupportedOperationException();
+
+  }
+
+  @Override
+  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+      InterruptedException {
+    throw new UnsupportedOperationException();
+
+  }
+
+  @Override
+  public QueueInfo[] getQueues() throws IOException, InterruptedException {
+    throw new UnsupportedOperationException();
+
+  }
+
+  @Override
+  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+    throw new UnsupportedOperationException();
+
+  }
+
+  @Override
+  public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException,
+      InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TaskReport[] getTaskReports(JobID jobid, TaskType type)
+      throws IOException, InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getTaskTrackerExpiryInterval() throws IOException,
+      InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void killJob(JobID jobid) throws IOException, InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean killTask(TaskAttemptID taskId, boolean shouldFail)
+      throws IOException, InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setJobPriority(JobID jobid, String priority) throws IOException,
+      InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.MockSimulatorEngine;
+import org.apache.hadoop.tools.rumen.ZombieCluster;
+import org.apache.hadoop.tools.rumen.ZombieJobProducer;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.junit.Test;
+
+public class TestSimulatorEndToEnd {
+
+  public static final Log LOG = LogFactory.getLog(MockSimulatorEngine.class);
+  
+  @Test
+  public void testMain() throws Exception {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+    final Path rootInputDir = new Path(
+        System.getProperty("src.test.data", "data")).makeQualified(lfs);
+    final Path traceFile = new Path(rootInputDir, "19-jobs.trace.json.gz");
+    final Path topologyFile = new Path(rootInputDir, "19-jobs.topology.json.gz");
+
+    LOG.info("traceFile = " + traceFile.toString() + " topology = "
+        + topologyFile.toString());
+   
+    int numJobs = getNumberJobs(traceFile, conf);
+    int nTrackers = getNumberTaskTrackers(topologyFile, conf);
+    
+    MockSimulatorEngine mockMumak = new MockSimulatorEngine(numJobs, nTrackers);
+
+    String[] args = { traceFile.toString(), topologyFile.toString() };
+    int res = ToolRunner.run(new Configuration(), mockMumak, args);
+    Assert.assertEquals(res, 0);
+  }
+  
+  private int getNumberJobs(Path inputFile, Configuration conf)
+      throws IOException {
+    ZombieJobProducer jobProducer = new ZombieJobProducer(inputFile, null, conf);
+    try {
+      int numJobs = 0;
+      while (jobProducer.getNextJob() != null) {
+        ++numJobs;
+      }
+      return numJobs;
+    } finally {
+      jobProducer.close();
+    }
+  }
+  
+  private int getNumberTaskTrackers(Path inputFile, Configuration conf)
+      throws IOException {
+    return new ZombieCluster(inputFile, null, conf).getMachines().size();
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEngine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEngine.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEngine.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEngine.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class TestSimulatorEngine {
+  private static final int TIME_WARP = 1000;
+  Random random = new Random();
+  
+  public static class TestSimpleEvent extends SimulatorEvent {
+    public TestSimpleEvent(SimulatorEventListener listener, long timestamp) {
+      super(listener, timestamp);
+    }
+  }
+  
+  /**
+   * Handling each {@link TestComplexEvent1} of level n will produce another
+   * {@link TestComplexEvent1} of level n-1 and 4 {@link TestSimpleEvent}s when
+   * n>0, produce no event if n=0. All events are created with a random time
+   * stamp within 1000 units into the future.
+   */
+  public static class TestComplexEvent1 extends SimulatorEvent {
+    private int level;
+    
+    public TestComplexEvent1(SimulatorEventListener listener, long timeStamp, 
+                             int level) {
+      super(listener, timeStamp);
+      this.level = level;
+    }
+    
+    public int getLevel() {
+      return level;
+    }
+  }
+  
+  /**
+   * Handling each {@link TestComplexEvent2} of level n will produce 2
+   * {@link TestComplexEvent2}s of level n-1 and 1 {@link TestSimpleEvent} when
+   * n>0, produce no event if n=0. All events are created with a random time
+   * stamp within 1000 units into the future.
+   */
+  public class TestComplexEvent2 extends TestComplexEvent1 {
+    public TestComplexEvent2(SimulatorEventListener listener, long timeStamp, 
+                             int level) {
+      super(listener, timeStamp, level);
+    }
+  }
+  
+  class TestListener implements SimulatorEventListener {
+
+    @Override
+    public List<SimulatorEvent> accept(SimulatorEvent event) {
+      SimulatorEventListener listener = event.getListener();
+      long now = event.getTimeStamp();
+      if (event instanceof TestComplexEvent2) {
+        // ce2(n) -> 2*ce2(n-1) + se
+        int level = ((TestComplexEvent2) event).getLevel();
+        if (level == 0)
+          return SimulatorEventQueue.EMPTY_EVENTS;
+        List<SimulatorEvent> response = new ArrayList<SimulatorEvent>();
+        for (int i = 0; i < 2; i++)
+          response.add(new TestComplexEvent2(listener,
+                                             now + random.nextInt(TIME_WARP),
+                                             level-1));
+        response.add(new TestSimpleEvent(listener, now + random.nextInt(TIME_WARP)));
+        return response;
+      } else if (event instanceof TestComplexEvent1) {
+        TestComplexEvent1 e = (TestComplexEvent1)event;
+        // ce1(n) -> ce1(n-1) + 4*se
+        if (e.getLevel() == 0)
+          return SimulatorEventQueue.EMPTY_EVENTS;
+        List<SimulatorEvent> response = new ArrayList<SimulatorEvent>();
+        response.add(new TestComplexEvent1(listener,
+                                           now + random.nextInt(TIME_WARP),
+                                           e.getLevel()-1));
+        for (int i = 0; i < 4; i++)
+          response.add(new TestSimpleEvent(listener, 
+                                           now + random.nextInt(TIME_WARP)));
+        return response;
+      } else if (event instanceof TestSimpleEvent) {
+        return SimulatorEventQueue.EMPTY_EVENTS;
+      } else {
+        throw new IllegalArgumentException("unknown event type: "
+            + event.getClass());
+      }
+    }
+
+    @Override
+    public List<SimulatorEvent> init(long when) {
+      return null;
+    }
+  }
+
+  public class TestSimulator1 extends SimulatorEngine {
+    
+    private int level = 10;
+    
+    @Override
+    protected void init() {
+      this.queue.add(new TestComplexEvent1(new TestListener(), 
+          random.nextInt(TIME_WARP), level));
+    }
+    
+    @Override
+    protected void summary(PrintStream out) {
+      out.println(queue.getCurrentTime() + ", " + queue.getEventCount() + 
+                         ", " + queue.getSize());
+      Assert.assertEquals(5*level+1, queue.getEventCount());
+    }
+  }
+  
+  public class TestSimulator2 extends SimulatorEngine {
+    
+    private int level = 10;
+    
+    @Override
+    protected void init() {
+      this.queue.add(new TestComplexEvent2(new TestListener(), 
+          random.nextInt(TIME_WARP), level));
+    }
+    
+    @Override
+    protected void summary(PrintStream out) {
+      out.println(queue.getCurrentTime() + ", " + queue.getEventCount() + 
+                         ", " + queue.getSize());
+      Assert.assertEquals(3*(1<<level)-2, queue.getEventCount());
+    }
+  }
+  
+  /**
+   * Test {@link SimulatorEngine} using {@link TestSimulator1}. Insert a 
+   * {@link TestComplexEvent1} in the beginning. The simulation stops when the
+   * {@link SimulatorEventQueue} is empty. Total number of events processed is checked
+   * against expected number (5*level+1).
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testComplex1() throws IOException, InterruptedException {
+    SimulatorEngine simulation = new TestSimulator1();
+    simulation.run();
+  }
+  
+  /**
+   * Test {@link SimulatorEngine} using {@link TestSimulator2}. Insert a 
+   * {@link TestComplexEvent2} in the beginning. The simulation stops when the
+   * {@link SimulatorEventQueue} is empty. Total number of events processed is checked
+   * against expected number (3 * 2^level - 2).
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testComplex2() throws IOException, InterruptedException {
+    SimulatorEngine simulation = new TestSimulator2();
+    simulation.run();
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEventQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEventQueue.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEventQueue.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEventQueue.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+public class TestSimulatorEventQueue {
+  private Random random = new Random();
+
+  public class TestEvent extends SimulatorEvent {
+    
+    public TestEvent(SimulatorEventListener listener, long timeStamp) {
+      super(listener, timeStamp);
+    }
+
+  }
+  
+  public class TestEventWithCount extends TestEvent {
+    private int count;
+    
+    public TestEventWithCount(SimulatorEventListener listener, long timeStamp, 
+                              int count) {
+      super(listener, timeStamp);
+      this.count = count;
+    }
+    
+    public int getCount() {
+      return count;
+    }
+  }
+
+  public static class TestListener implements SimulatorEventListener {
+
+    @Override
+    public List<SimulatorEvent> accept(SimulatorEvent event) {
+      if (event instanceof TestEvent) {
+        return SimulatorEventQueue.EMPTY_EVENTS;
+      }
+      return null;
+    }
+
+    @Override
+    public List<SimulatorEvent> init(long when) {
+      return null;
+    }
+  }
+  
+  @Test
+  public void testSimpleGetPut() {
+    SimulatorEventQueue queue = new SimulatorEventQueue();
+    SimulatorEventListener listener = new TestListener();
+    SimulatorEvent event = new TestEvent(listener, 10);
+    
+    queue.add(event);
+    SimulatorEvent first = queue.get();
+    
+    Assert.assertEquals(first.getTimeStamp(), event.getTimeStamp());
+    Assert.assertEquals(first.getListener(), event.getListener());
+  }
+
+  @Test
+  public void testListPut() {
+    SimulatorEventQueue queue = new SimulatorEventQueue();
+    SimulatorEventListener listener = new TestListener();
+    List<SimulatorEvent> listEvent = new ArrayList<SimulatorEvent>();
+    
+    listEvent.add(new TestEvent(listener, 10));
+    listEvent.add(new TestEvent(listener, 11));
+    
+    queue.addAll(listEvent);
+    SimulatorEvent first = queue.get();    
+    Assert.assertEquals(first.getTimeStamp(), 10);
+    Assert.assertEquals(first.getListener(), listener);
+    
+    SimulatorEvent second = queue.get();
+    Assert.assertEquals(second.getTimeStamp(), 11);
+    Assert.assertEquals(first.getListener(), listener);
+  }
+
+  @Test  
+  public void testKeepOrder() {
+    SimulatorEventQueue queue = new SimulatorEventQueue();
+    SimulatorEventListener listener = new TestListener();
+    List<SimulatorEvent> listEvent = new ArrayList<SimulatorEvent>();
+    int count = 0;
+    
+    for (int i = 0; i < random.nextInt(100); i++) {
+      listEvent.clear();
+      for (int j = 0; j < random.nextInt(5); j++) {
+        listEvent.add(new TestEventWithCount(listener, random.nextInt(10), count++));
+      }
+      queue.addAll(listEvent);
+    }
+    
+    TestEventWithCount next;
+    //dump(next);
+    TestEventWithCount last = null;
+    while((next = (TestEventWithCount) queue.get()) != null) {
+      if (last != null && last.getTimeStamp() == next.getTimeStamp()) {
+        Assert.assertTrue (last.getCount() < next.getCount());
+        //dump(next);
+      }
+      last = next;
+    }
+  }
+  
+  public void dump(TestEventWithCount event) {
+    System.out.println("timestamp: " + event.getTimeStamp()
+        + ", count: " + event.getCount());
+  }
+  
+  @Test
+  public void testInsertEventIntoPast() {
+    SimulatorEventQueue queue = new SimulatorEventQueue();
+    SimulatorEventListener listener = new TestListener();
+    
+    queue.add(new TestEvent(listener, 10));
+    queue.get();
+    // current time is 10.
+    try {
+      // the event to insert happened at 5. It happens in the past because
+      // current time is 10.
+      queue.add(new TestEvent(listener, 5));
+      Assert.fail("Added Event occurred in the past");
+    } catch (Exception e) {
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobClient.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobClient.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobClient.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+
+public class TestSimulatorJobClient {
+  MockSimulatorJobTracker jobTracker = null;
+  CheckedEventQueue eventQueue = null;
+  SimulatorJobClient jobClient = null;
+
+  static final Log LOG = LogFactory.getLog(TestSimulatorJobClient.class);
+    
+  long simulationStartTime = 100000;
+  final int heartbeatInterval = 5000; // not used other than initializing SimJT
+  final long[] jobSubmissionTimes = new long[] {
+      1240335960685L,
+      1240335962848L,
+      1240336843916L,
+      1240336853354L,
+      1240336893801L,
+      1240337079617L,
+  };
+  
+  // assume reading from trace is correct
+  @Test
+  public final void testRelativeStartTime() throws IOException {
+    long relativeStartTime = jobSubmissionTimes[0] - simulationStartTime;
+    MockJobStoryProducer jobStoryProducer =
+      new MockJobStoryProducer(jobSubmissionTimes, relativeStartTime);
+    
+    try {
+      jobTracker = new MockSimulatorJobTracker(simulationStartTime,
+                                               heartbeatInterval, true);
+    } catch (Exception e) {
+      Assert.fail("Couldn't set up the mock job tracker: " + e);
+    }
+    eventQueue = new CheckedEventQueue(simulationStartTime);
+    jobClient = new SimulatorJobClient(jobTracker, jobStoryProducer);
+
+    // add all expected events
+    eventQueue.addExpected(simulationStartTime,
+                           new JobSubmissionEvent(jobClient,
+                                                  simulationStartTime,
+                                                  jobStoryProducer.getJob(0)));
+    for (int i = 1; i < jobSubmissionTimes.length; i++) {
+      eventQueue.addExpected(jobSubmissionTimes[i-1] - relativeStartTime,
+                             new JobSubmissionEvent(jobClient, 
+                                                    jobSubmissionTimes[i] - relativeStartTime,
+                                                    jobStoryProducer.getJob(i)));
+    }
+
+    long runUntil = eventQueue.getLastCheckTime();
+    LOG.debug("Running until simulation time=" + runUntil);
+
+    List<SimulatorEvent> events = jobClient.init(simulationStartTime);
+    eventQueue.addAll(events);
+
+    while (true) {
+      // can't be empty as it must go past runUntil for verifiability
+      // besides it is never empty because of HeartbeatEvent            
+      SimulatorEvent currentEvent = eventQueue.get();
+      // copy time, make sure TT does not modify it
+      long now = currentEvent.getTimeStamp();
+      LOG.debug("Number of events to deliver=" + (eventQueue.getSize()+1) +
+                ", now=" + now);
+      LOG.debug("Calling accept(), event=" + currentEvent + ", now=" + now);
+      events = jobClient.accept(currentEvent);
+      if (now > runUntil) {
+        break;
+      }                             
+      LOG.debug("Accept() returned " + events.size() + " new event(s)");
+      for (SimulatorEvent newEvent: events) {
+        LOG.debug("New event " + newEvent);
+      }
+      eventQueue.addAll(events);
+      LOG.debug("Done checking and enqueuing new events");
+    }
+    
+    // make sure we have seen all expected events, even for the last 
+    // time checked
+    LOG.debug("going to check if all expected events have been processed");
+    eventQueue.checkMissingExpected();
+    // TODO: Mock JT should have consumed all entries from its job submission table
+    //jobTracker.checkMissingJobSubmission();
+  }
+
+  static class MockJobStoryProducer implements JobStoryProducer {
+    private long[] times;
+    private int index = 0;
+    private List<MockJobStory> jobs = new ArrayList<MockJobStory>();
+    
+    public MockJobStoryProducer(long[] times, long relativeStartTime) {
+      super();
+      Assert.assertTrue(times.length > 0);
+      this.times = times;
+      index = 0;
+      
+      for (long time: times) {
+        jobs.add(new MockJobStory(time - relativeStartTime));
+      }
+    }
+    
+    @Override
+    public JobStory getNextJob() {
+      if (index >= times.length) {
+        return null;
+      }
+      return jobs.get(index++);
+    }
+    
+    public JobStory getJob(int i) {
+      return jobs.get(i);
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+  }
+  
+  static class MockJobStory implements JobStory {
+    private long submissionTime;
+    
+    public MockJobStory(long submissionTime) {
+      this.submissionTime = submissionTime;
+    }
+    
+    @Override
+    public InputSplit[] getInputSplits() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public JobConf getJobConf() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
+        int taskAttemptNumber, int locality) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String getName() {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override 
+    public JobID getJobID() {
+      return null;
+    }
+
+    @Override
+    public int getNumberMaps() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getNumberReduces() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getSubmissionTime() {
+      return submissionTime;
+    }
+
+    @Override
+    public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType,
+        int taskNumber, int taskAttemptNumber) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String getUser() {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public Pre21JobHistoryConstants.Values getOutcome() {
+      return Pre21JobHistoryConstants.Values.SUCCESS;
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import java.util.HashSet;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.mapred.TaskStatus.Phase;
+import org.apache.hadoop.mapred.TaskStatus.State;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+
+import org.apache.hadoop.mapred.FakeJobs;
+import org.junit.Test;
+
+public class TestSimulatorJobTracker {
+
+  SimulatorTaskTracker taskTracker;
+
+  public static final Log LOG = LogFactory
+      .getLog(TestSimulatorJobTracker.class);
+
+  @SuppressWarnings("deprecation")
+  public JobConf createJobConf() {
+    JobConf jtConf = new JobConf();
+    jtConf.set("mapred.job.tracker", "localhost:8012");
+    jtConf.set("mapred.jobtracker.job.history.block.size", "512");
+    jtConf.set("mapred.jobtracker.job.history.buffer.size", "512");
+    jtConf.setLong("mapred.tasktracker.expiry.interval", 5000);
+    jtConf.setInt("mapred.reduce.copy.backoff", 4);
+    jtConf.setLong("mapred.job.reuse.jvm.num.tasks", -1);
+    jtConf.setUser("mumak");
+    jtConf.set("mapred.system.dir", jtConf.get("hadoop.tmp.dir", "/tmp/hadoop-"
+        + jtConf.getUser())
+        + "/mapred/system");
+    System.out.println("Created JobConf");
+    return jtConf;
+  }
+
+  public static class FakeJobClient {
+
+    ClientProtocol jobTracker;
+    int numMaps;
+    int numReduces;
+
+    public FakeJobClient(ClientProtocol jobTracker, int numMaps,
+        int numReduces) {
+      this.jobTracker = jobTracker;
+      this.numMaps = numMaps;
+      this.numReduces = numReduces;
+    }
+
+    public void submitNewJob() throws IOException, InterruptedException {
+      org.apache.hadoop.mapreduce.JobID jobId = jobTracker.getNewJobID();
+      LOG.info("Obtained from Jobtracker jobid = " + jobId);
+      FakeJobs job = new FakeJobs("job1", 0, numMaps, numReduces);
+
+      SimulatorJobCache.put(org.apache.hadoop.mapred.JobID.downgrade(jobId), job);
+      jobTracker.submitJob(jobId);
+    }
+  }
+
+  public static class FakeTaskTracker extends SimulatorTaskTracker {
+
+    boolean firstHeartbeat = true;
+    short responseId = 0;
+    int now = 0;
+
+    FakeTaskTracker(InterTrackerProtocol jobTracker, String taskTrackerName,
+        String hostName, int maxMapTasks, int maxReduceTasks) {
+      super(jobTracker, taskTrackerName, hostName, maxMapTasks, maxReduceTasks);
+
+      LOG.info("FakeTaskTracker constructor, taskTrackerName="
+          + taskTrackerName);
+    }
+
+    private List<TaskStatus> collectAndCloneTaskStatuses() {
+      ArrayList<TaskStatus> statuses = new ArrayList<TaskStatus>();
+      Set<TaskAttemptID> mark = new HashSet<TaskAttemptID>();
+      for (SimulatorTaskInProgress tip : tasks.values()) {
+        statuses.add((TaskStatus) tip.getTaskStatus().clone());
+        if (tip.getFinalRunState() == State.SUCCEEDED) {
+          mark.add(tip.getTaskStatus().getTaskID());
+        }
+      }
+
+      for (TaskAttemptID taskId : mark) {
+        tasks.remove(taskId);
+      }
+
+      return statuses;
+    }
+
+    public int sendFakeHeartbeat(int current) throws IOException {
+
+      int numLaunchTaskActions = 0;
+      this.now = current;
+      List<TaskStatus> taskStatuses = collectAndCloneTaskStatuses();
+      TaskTrackerStatus taskTrackerStatus = new SimulatorTaskTrackerStatus(
+          taskTrackerName, hostName, httpPort, taskStatuses, 0, maxMapSlots,
+          maxReduceSlots, this.now);
+      // Transmit the heartbeat
+      HeartbeatResponse response = null;
+      LOG.debug("sending heartbeat at time = " + this.now + " responseId = "
+          + responseId);
+      response = jobTracker.heartbeat(taskTrackerStatus, false, firstHeartbeat,
+          true, responseId);
+
+      firstHeartbeat = false;
+      responseId = response.getResponseId();
+      numLaunchTaskActions = findLaunchTaskActions(response);
+
+      return numLaunchTaskActions;
+    }
+
+    int findLaunchTaskActions(HeartbeatResponse response) {
+      TaskTrackerAction[] actions = response.getActions();
+      int numLaunchTaskActions = 0;
+      // HashSet<> numLaunchTaskActions
+      for (TaskTrackerAction action : actions) {
+        if (action instanceof SimulatorLaunchTaskAction) {
+          Task task = ((SimulatorLaunchTaskAction) action).getTask();
+
+          numLaunchTaskActions++;
+          TaskAttemptID taskId = task.getTaskID();
+          if (tasks.containsKey(taskId)) {
+            // already have this task..do not need to generate new status
+            continue;
+          }
+          TaskStatus status;
+          if (task.isMapTask()) {
+            status = new MapTaskStatus(taskId, 0f, 1, State.RUNNING, "", "",
+                taskTrackerName, Phase.MAP, new Counters());
+          } else {
+            status = new ReduceTaskStatus(taskId, 0f, 1, State.RUNNING, "", "",
+                taskTrackerName, Phase.SHUFFLE, new Counters());
+          }
+          status.setRunState(State.SUCCEEDED);
+          status.setStartTime(this.now);
+          SimulatorTaskInProgress tip = new SimulatorTaskInProgress(
+              (SimulatorLaunchTaskAction) action, status, this.now);
+          tasks.put(taskId, tip);
+        }
+      }
+      return numLaunchTaskActions;
+    }
+
+  }
+  
+  @Test
+  public void testTrackerInteraction() throws IOException, InterruptedException {
+    LOG.info("Testing Inter Tracker protocols");
+    int now = 0;
+    JobConf jtConf = createJobConf();
+    int NoMaps = 2;
+    int NoReduces = 10;
+
+    // jtConf.set("mapred.jobtracker.taskScheduler",
+    // DummyTaskScheduler.class.getName());
+    jtConf.set("fs.default.name", "file:///");
+    jtConf.set("mapred.jobtracker.taskScheduler", JobQueueTaskScheduler.class
+        .getName());
+    SimulatorJobTracker sjobTracker = SimulatorJobTracker.startTracker(jtConf,
+        0);
+    System.out.println("Created the SimulatorJobTracker successfully");
+    sjobTracker.offerService();
+
+    FakeJobClient jbc = new FakeJobClient(sjobTracker, NoMaps, NoReduces);
+    int NoJobs = 1;
+    for (int i = 0; i < NoJobs; i++) {
+      jbc.submitNewJob();
+    }
+    org.apache.hadoop.mapreduce.JobStatus[] allJobs = sjobTracker.getAllJobs();
+    Assert.assertTrue("allJobs queue length is " + allJobs.length, allJobs.length >= 1);
+    for (org.apache.hadoop.mapreduce.JobStatus js : allJobs) {
+      LOG.info("From JTQueue: job id = " + js.getJobID());
+    }
+
+    FakeTaskTracker fakeTracker = new FakeTaskTracker(sjobTracker,
+        "tracker_host1.foo.com:localhost/127.0.0.1:9010", "host1.foo.com", 10,
+        10);
+    int numLaunchTaskActions = 0;
+
+    for (int i = 0; i < NoMaps * 2; ++i) { // we should be able to assign all
+                                           // tasks within 2X of NoMaps
+                                           // heartbeats
+      numLaunchTaskActions += fakeTracker.sendFakeHeartbeat(now);
+      if (numLaunchTaskActions >= NoMaps) {
+        break;
+      }
+      now += 5;
+      LOG.debug("Number of MapLaunchTasks=" + numLaunchTaskActions + " now = "
+          + now);
+    }
+
+    Assert.assertTrue("Failed to launch all maps: " + numLaunchTaskActions,
+        numLaunchTaskActions >= NoMaps);
+
+    // sending the completed status
+    LOG.info("Sending task completed status");
+    numLaunchTaskActions += fakeTracker.sendFakeHeartbeat(now);
+    // now for the reduce tasks
+    for (int i = 0; i < NoReduces * 2; ++i) { // we should be able to assign all
+                                              // tasks within 2X of NoReduces
+                                              // heartbeats
+      if (numLaunchTaskActions >= NoMaps + NoReduces) {
+        break;
+      }
+      numLaunchTaskActions += fakeTracker.sendFakeHeartbeat(now);
+      now += 5;
+      LOG.debug("Number of ReduceLaunchTasks=" + numLaunchTaskActions
+          + " now = " + now);
+    }
+    Assert.assertTrue("Failed to launch all reduces: " + numLaunchTaskActions,
+        numLaunchTaskActions >= NoMaps + NoReduces);
+
+    // sending the reduce completion
+    numLaunchTaskActions += fakeTracker.sendFakeHeartbeat(now);
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorTaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorTaskTracker.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorTaskTracker.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorTaskTracker.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.junit.Before;
+import org.junit.Test;
+
+//
+// Test case for SimulatorJobTracker.
+// We create a table of expected list of new events generated and 
+// another table of expected heartbeat() in parameters and the task actions 
+// to performe for each timestamp t. We then run the task tracker with our
+// own little event queue and check if exactly those things happen that
+// are listed in the two tables/
+//
+public class TestSimulatorTaskTracker {
+  MockSimulatorJobTracker jobTracker;
+  SimulatorTaskTracker taskTracker;
+
+  static final Log LOG = LogFactory.getLog(TestSimulatorTaskTracker.class);
+  
+  // Our own little event queue, checks the events against the expected before \
+  // enqueueing them
+  CheckedEventQueue eventQueue;
+  
+  // Global test parameters
+  final int heartbeatInterval = 10;
+  final long simulationStartTime = 100;
+  
+  // specify number of heartbeats since simulation start for mapStarts
+  final long[] mapStarts = {2, 3, 4};
+  // specify number of heartbeats since mapStarts[i] for mapKills[i], 
+  // use -1 for no kill
+  final long[] mapKills = {1, -1, 2};
+  final long[] mapRuntimes = {53, 17, 42};
+  
+  // specify number of heartbeats since start for reduceStarts
+  final long[] reduceStarts = {3, 4, 6};
+  // specify number of heartbeats since mapStarts[i] for mapKills[i], 
+  // use -1 for no kill
+  final long[] reduceKills = {1, -1, 6};  
+  final long[] mapDoneDelays = {11, 0, 33};
+  final long[] reduceRuntimes = {49, 25, 64};
+  
+  final static String taskAttemptIdPrefix = "attempt_200907150128_0007_";
+  final String taskTrackerName = "test_task_tracker";
+  final int maxMapSlots = 3;
+  final int maxReduceSlots = 3;
+  
+  @Before
+  public void setUp() {
+    try {
+      jobTracker = new MockSimulatorJobTracker(simulationStartTime,
+                                               heartbeatInterval, true);
+    } catch (Exception e) {
+      Assert.fail("Couldn't set up the mock job tracker: " + e);
+    }
+    taskTracker = new SimulatorTaskTracker(jobTracker, taskTrackerName,
+                                           "test_host", 
+                                           maxMapSlots, maxReduceSlots);
+    eventQueue = new CheckedEventQueue(simulationStartTime); 
+  }
+  
+  @Test
+  public void testInitAndHeartbeat() {
+    LOG.debug("Testing init and hearbeat mechanism");
+    genericTest(5, 0, 0, false);
+  }
+
+  // All further tests assume that testInitAndHeartbeat passed
+  @Test
+  public void testSingleMapTask() {
+    LOG.debug("Testing with a single map task");
+    genericTest(20, 1, 0, false);
+  }
+
+  @Test
+  public void testSingleReduceTask() {
+    LOG.debug("Testing with a single reduce task");
+    genericTest(20, 0, 1, false);
+  }
+  
+  @Test
+  public void testMultipleMapTasks() {
+    LOG.debug("Testing with multiple map tasks");
+    genericTest(20, mapStarts.length, 0, false);
+  }
+  
+  @Test
+  public void testMultipleReduceTasks() {
+    LOG.debug("Testing with multiple reduce tasks");
+    genericTest(20, 0, reduceStarts.length, false);
+  }
+
+  @Test
+  public void testMultipleMapAndReduceTasks() {
+    LOG.debug("Testing with multiple map and reduce tasks");
+    genericTest(20, mapStarts.length, reduceStarts.length, false);
+  }
+
+  @Test
+  public void testKillSingleMapTask() {
+    LOG.debug("Testing killing a single map task");
+    genericTest(20, 1, 0, true);
+  }
+
+  @Test
+  public void testKillSingleReduceTask() {
+    LOG.debug("Testing killing a single reduce task");
+    genericTest(20, 0, 1, true);
+  }
+  
+  @Test
+  public void testKillMultipleMapTasks() {
+    LOG.debug("Testing killing multiple map tasks");
+    genericTest(20, mapStarts.length, 0, true);
+  }
+
+  @Test
+  public void testKillMultipleReduceTasks() {
+    LOG.debug("Testing killing multiple reduce tasks");
+    genericTest(20, 0, reduceStarts.length, true);
+  }
+  
+  @Test
+  public void testKillMultipleMapAndReduceTasks() {
+    LOG.debug("Testing killing multiple map and reduce tasks");
+    genericTest(20, mapStarts.length, reduceStarts.length, true);
+  }
+  
+  protected void genericTest(int numAccepts, int numMaps, int numReduces, 
+                             boolean testKill) {
+    LOG.debug("Generic test with numAccepts=" + numAccepts +
+              ", numMaps=" + numMaps + ", numReduces=" + numReduces +
+              ", testKill=" + testKill);
+    
+    setUpHeartbeats(numAccepts);
+    for(int i=0; i<numMaps; i++) {
+      setUpMapTask(i, testKill);
+    }
+    for(int i=0; i<numReduces; i++) {
+      setUpReduceTask(i, testKill);
+    }    
+    runTaskTracker();
+  }
+  
+  // numAccepts must be at least 1
+  private void setUpHeartbeats(int numAccepts) {
+    eventQueue.expectHeartbeats(taskTracker, numAccepts, heartbeatInterval);
+    jobTracker.expectEmptyHeartbeats(taskTrackerName, numAccepts);
+  }
+  
+  private void setUpMapTask(TaskAttemptID mapTaskId, long mapStart, 
+                            long mapRuntime, long mapKill) {
+    jobTracker.runMapTask(taskTrackerName, mapTaskId, mapStart, mapRuntime, 
+                          mapKill);
+    eventQueue.expectMapTask(taskTracker, mapTaskId, mapStart, mapRuntime);
+  }
+
+  private void setUpMapTask(int idx, boolean testKill) {
+    TaskAttemptID mapTaskId = createTaskAttemptID(true, idx);
+    long mapStart = simulationStartTime + heartbeatInterval*mapStarts[idx];
+    long mapKill = -1;
+    if (testKill && 0 <= mapKills[idx]) {
+      mapKill = mapStart + heartbeatInterval*mapKills[idx];
+    }
+    setUpMapTask(mapTaskId, mapStart, mapRuntimes[idx], mapKill);
+  }
+
+  private void setUpReduceTask(TaskAttemptID reduceTaskId, long reduceStart, 
+                               long mapDoneDelay, long reduceRuntime,
+                               long reduceKill) {
+    jobTracker.runReduceTask(taskTrackerName, reduceTaskId, reduceStart,
+                             mapDoneDelay, reduceRuntime, reduceKill);
+    long mapDone = jobTracker.nextHeartbeat(reduceStart + mapDoneDelay);
+    if (reduceKill < 0 ||  mapDone < reduceKill) {
+      // it generates completion events iff it survives mapDone 
+      eventQueue.expectReduceTask(taskTracker, reduceTaskId, 
+                                  mapDone, reduceRuntime);
+    }
+  }
+
+  private void setUpReduceTask(int idx, boolean testKill) {
+    TaskAttemptID reduceTaskId = createTaskAttemptID(false, idx);
+    long reduceStart = simulationStartTime + 
+                       heartbeatInterval*reduceStarts[idx];
+    long reduceKill = -1;
+    if (testKill && 0 <= reduceKills[idx]) {
+      reduceKill = reduceStart + heartbeatInterval*reduceKills[idx];
+    }
+    setUpReduceTask(reduceTaskId, reduceStart, mapDoneDelays[idx],
+                    reduceRuntimes[idx], reduceKill);
+  }
+   
+  //
+  // runs a single task tracker
+  // checks that generated events conform to expectedEvents
+  // and the mock jobtracker checks that the heartbeats() sent to it are right
+  //
+  private void runTaskTracker() {
+    long runUntil = eventQueue.getLastCheckTime();
+    LOG.debug("Running task tracker until simulation time=" + runUntil);
+
+    List<SimulatorEvent> events = taskTracker.init(simulationStartTime);
+    eventQueue.addAll(events);
+    while (true) {
+      // can't be empty as it must go past runUntil for verifiability
+      // besides it is never empty because of HeartbeatEvent            
+      SimulatorEvent currentEvent = eventQueue.get();
+      // copy time, make sure TT does not modify it
+      long now = currentEvent.getTimeStamp();
+      LOG.debug("Number of events to deliver=" + (eventQueue.getSize()+1) +
+                ", now=" + now);
+      if (now > runUntil) {
+        break;
+      }                             
+      LOG.debug("Calling accept(), event=" + currentEvent + ", now=" + now);
+      events = taskTracker.accept(currentEvent);
+      LOG.debug("Accept() returned " + events.size() + " new event(s)");
+      for (SimulatorEvent newEvent: events) {
+        LOG.debug("New event " + newEvent);
+      }
+      eventQueue.addAll(events);
+      LOG.debug("Done checking and enqueuing new events");
+    }
+    
+    // make sure we have seen all expected events, even for the last 
+    // time checked
+    eventQueue.checkMissingExpected();
+    // Mock JT should have consumed all entries from its heartbeat table
+    jobTracker.checkMissingHeartbeats();
+  }
+  
+  // taskNumber should be < 10
+  static private TaskAttemptID createTaskAttemptID(boolean isMap, 
+                                                   int taskNumber) {
+    String attempt = taskAttemptIdPrefix + (isMap ? "m" : "r") + 
+                     "_00000" + taskNumber + "_0";
+    TaskAttemptID taskId = null;
+    try {
+      taskId = TaskAttemptID.forName(attempt);
+    } catch (IllegalArgumentException iae) {
+      Assert.fail("Invalid task attempt id string " + iae);
+    }
+    return taskId;
+  }
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/EagerTaskInitializationListener.java?rev=818674&r1=818673&r2=818674&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/EagerTaskInitializationListener.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/EagerTaskInitializationListener.java Fri Sep 25 00:25:28 2009
@@ -85,7 +85,7 @@
   private List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();
   private ExecutorService threadPool;
   private int numThreads;
-  private TaskTrackerManager ttm;
+  TaskTrackerManager ttm;
   
   public EagerTaskInitializationListener(Configuration conf) {
     numThreads = 



Mime
View raw message