hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r818674 [2/4] - in /hadoop/mapreduce/trunk: ./ src/contrib/ src/contrib/mumak/ src/contrib/mumak/bin/ src/contrib/mumak/conf/ src/contrib/mumak/ivy/ src/contrib/mumak/src/ src/contrib/mumak/src/java/ src/contrib/mumak/src/java/org/ src/cont...
Date Fri, 25 Sep 2009 00:25:31 GMT
Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job.RawSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
+import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+
+public class SimulatorJobInProgress extends JobInProgress {
+  static final Log LOG = LogFactory.getLog(SimulatorJobInProgress.class);
+  
+  // JobStory that contains all information that should be read from the
+  // cache
+  private final JobStory jobStory;
+
+  RawSplit[] splits;
+
+  @SuppressWarnings("deprecation")
+  public SimulatorJobInProgress(JobID jobid, JobTracker jobtracker,
+      JobConf default_conf, JobStory jobStory) {
+    super();
+    // jobSetupCleanupNeeded set to false in parent cstr, though
+    // default is true
+
+    this.jobId = jobid;
+    String url = "http://" + jobtracker.getJobTrackerMachine() + ":"
+        + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;
+    this.jobtracker = jobtracker;
+    this.conf = jobStory.getJobConf();
+    this.priority = conf.getJobPriority();
+    Path jobDir = jobtracker.getSystemDirectoryForJob(jobid);
+    this.jobFile = new Path(jobDir, "job.xml");
+    this.status = new JobStatus(jobid, 0.0f, 0.0f, 0.0f, 0.0f, JobStatus.PREP,
+        priority, conf.getUser(), conf.getJobName(), jobFile.toString(), url);
+    this.profile = new JobProfile(jobStory.getUser(), jobid, this.jobFile
+        .toString(), url, jobStory.getName(), conf.getQueueName());
+    this.startTime = JobTracker.getClock().getTime();
+    status.setStartTime(startTime);
+    this.resourceEstimator = new ResourceEstimator(this);
+
+    this.numMapTasks = jobStory.getNumberMaps();
+    this.numReduceTasks = jobStory.getNumberReduces();
+    this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(numMapTasks
+        + numReduceTasks + 10);
+
+    this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
+    this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
+    MetricsContext metricsContext = MetricsUtil.getContext("mapred");
+    this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
+    this.jobMetrics.setTag("user", conf.getUser());
+    this.jobMetrics.setTag("sessionId", conf.getSessionId());
+    this.jobMetrics.setTag("jobName", conf.getJobName());
+    this.jobMetrics.setTag("jobId", jobid.toString());
+
+    this.maxLevel = jobtracker.getNumTaskCacheLevels();
+    this.anyCacheLevel = this.maxLevel + 1;
+    this.nonLocalMaps = new LinkedList<TaskInProgress>();
+    this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+    this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+    this.nonRunningReduces = new LinkedList<TaskInProgress>();
+    this.runningReduces = new LinkedHashSet<TaskInProgress>();
+    this.slowTaskThreshold = Math.max(0.0f, conf.getFloat(
+        "mapred.speculative.execution.slowTaskThreshold", 1.0f));
+    this.speculativeCap = conf.getFloat(
+        "mapred.speculative.execution.speculativeCap", 0.1f);
+    this.slowNodeThreshold = conf.getFloat(
+        "mapred.speculative.execution.slowNodeThreshold", 1.0f);
+
+    this.jobStory = jobStory;
+    this.jobHistory = this.jobtracker.getJobHistory();
+  }
+
+  // for initTasks, update information from JobStory object
+  @Override
+  public synchronized void initTasks() throws IOException {
+    boolean loggingEnabled = LOG.isDebugEnabled();
+    if (loggingEnabled) {
+      LOG.debug("(initTasks@SJIP) Starting Initialization for " + jobId);
+    }
+    numMapTasks = jobStory.getNumberMaps();
+    numReduceTasks = jobStory.getNumberReduces();
+
+    logSubmissionToJobHistory();
+    if (loggingEnabled) {
+      LOG.debug("(initTasks@SJIP) Logged to job history for " + jobId);
+    }
+
+    checkTaskLimits();
+
+    if (loggingEnabled) {
+      LOG.debug("(initTasks@SJIP) Checked task limits for " + jobId);
+    }
+
+    final String jobFile = "default";
+    splits = getRawSplits(jobStory.getInputSplits());
+    if (loggingEnabled) {
+      LOG.debug("(initTasks@SJIP) Created splits for job = " + jobId
+          + " number of splits = " + splits.length);
+    }
+
+    createMapTasks(jobFile, splits);
+
+    if (numMapTasks > 0) {
+      nonRunningMapCache = createCache(splits, maxLevel);
+      if (loggingEnabled) {
+        LOG.debug("initTasks:numMaps=" + numMapTasks
+            + " Size of nonRunningMapCache=" + nonRunningMapCache.size()
+            + " for " + jobId);
+      }
+    }
+
+    // set the launch time
+    this.launchTime = JobTracker.getClock().getTime();
+
+    createReduceTasks(jobFile);
+    // Calculate the minimum number of maps to be complete before
+    // we should start scheduling reduces
+    completedMapsForReduceSlowstart = (int) Math.ceil((conf.getFloat(
+        "mapred.reduce.slowstart." + "completed.maps",
+        DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * numMapTasks));
+
+    tasksInited.set(true);
+    if (loggingEnabled) {
+      LOG.debug("Initializing job, nowstatus = "
+          + JobStatus.getJobRunState(getStatus().getRunState()));
+    }
+    setupComplete();
+
+    if (loggingEnabled) {
+      LOG.debug("Initializing job, inited-status = "
+          + JobStatus.getJobRunState(getStatus().getRunState()));
+    }
+  }
+
+  RawSplit[] getRawSplits(InputSplit[] splits) throws IOException {
+    if (splits == null || splits.length != numMapTasks) {
+      throw new IllegalArgumentException("Input split size mismatch: expected="
+          + numMapTasks + ", actual=" + ((splits == null) ? -1 : splits.length));
+    }
+
+    RawSplit rawSplits[] = new RawSplit[splits.length];
+    for (int i = 0; i < splits.length; i++) {
+      try {
+        rawSplits[i] = new RawSplit();
+        rawSplits[i].setClassName(splits[i].getClass().getName());
+        rawSplits[i].setDataLength(splits[i].getLength());
+        rawSplits[i].setLocations(splits[i].getLocations());
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
+    }
+
+    return rawSplits;
+  }
+
+  /**
+   * Given the map taskAttemptID, returns the TaskAttemptInfo. Deconstructs the
+   * map's taskAttemptID and looks up the jobStory with the parts taskType, id
+   * of task, id of task attempt.
+   * 
+   * @param taskTracker
+   *          tasktracker
+   * @param taskAttemptID
+   *          task-attempt
+   * @return TaskAttemptInfo for the map task-attempt
+   */
+  @SuppressWarnings("deprecation")
+  private synchronized TaskAttemptInfo getMapTaskAttemptInfo(
+      TaskTracker taskTracker, TaskAttemptID taskAttemptID) {
+    assert (taskAttemptID.getTaskType() == TaskType.MAP);
+
+    JobID jobid = (JobID) taskAttemptID.getJobID();
+    assert (jobid == getJobID());
+
+    // Get splits for the TaskAttempt
+    RawSplit split = splits[taskAttemptID.getTaskID().getId()];
+    int locality = getClosestLocality(taskTracker, split);
+
+    TaskID taskId = taskAttemptID.getTaskID();
+    TaskType taskType = taskAttemptID.getTaskType();
+    if (taskId.getTaskType() != TaskType.MAP) {
+      assert false : "Task " + taskId + " is not MAP :" + taskId.getTaskType();
+    }
+    
+    TaskAttemptInfo taskAttemptInfo = jobStory.getMapTaskAttemptInfoAdjusted(
+        taskId.getId(), taskAttemptID.getId(), locality);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("get an attempt: "
+          + taskAttemptID.toString()
+          + ", state="
+          + taskAttemptInfo.getRunState()
+          + ", runtime="
+          + ((taskType == TaskType.MAP) ? taskAttemptInfo.getRuntime()
+              : ((ReduceTaskAttemptInfo) taskAttemptInfo).getReduceRuntime()));
+    }
+    return taskAttemptInfo;
+  }
+
+  private int getClosestLocality(TaskTracker taskTracker, RawSplit split) {
+    int locality = 2;
+
+    Node taskTrackerNode = jobtracker
+        .getNode(taskTracker.getStatus().getHost());
+    if (taskTrackerNode == null) {
+      throw new IllegalArgumentException(
+          "Cannot determine network topology node for TaskTracker "
+              + taskTracker.getTrackerName());
+    }
+    for (String location : split.getLocations()) {
+      Node dataNode = jobtracker.getNode(location);
+      if (dataNode == null) {
+        throw new IllegalArgumentException(
+            "Cannot determine network topology node for split location "
+                + location);
+      }
+      locality = Math.min(locality, jobtracker.clusterMap.getDistance(
+          taskTrackerNode, dataNode));
+    }
+    return locality;
+  }
+
+  @SuppressWarnings("deprecation")
+  public TaskAttemptInfo getTaskAttemptInfo(TaskTracker taskTracker,
+      TaskAttemptID taskAttemptId) {
+    JobID jobid = (JobID) taskAttemptId.getJobID();
+    assert (jobid == getJobID());
+
+    return (taskAttemptId.getTaskType() == TaskType.MAP) ? getMapTaskAttemptInfo(
+        taskTracker, taskAttemptId)
+        : getReduceTaskAttemptInfo(taskTracker, taskAttemptId);
+  }
+
+  /**
+   * Given the reduce taskAttemptID, returns the TaskAttemptInfo. Deconstructs
+   * the reduce taskAttemptID and looks up the jobStory with the parts taskType,
+   * id of task, id of task attempt.
+   * 
+   * @param taskTracker
+   *          tasktracker
+   * @param taskAttemptID
+   *          task-attempt
+   * @return TaskAttemptInfo for the reduce task-attempt
+   */
+  private TaskAttemptInfo getReduceTaskAttemptInfo(TaskTracker taskTracker,
+      TaskAttemptID taskAttemptID) {
+    assert (taskAttemptID.getTaskType() == TaskType.REDUCE);
+    TaskID taskId = taskAttemptID.getTaskID();
+    TaskType taskType = taskAttemptID.getTaskType();
+
+    TaskAttemptInfo taskAttemptInfo = jobStory.getTaskAttemptInfo(taskType,
+        taskId.getId(), taskAttemptID.getId());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("get an attempt: "
+          + taskAttemptID.toString()
+          + ", state="
+          + taskAttemptInfo.getRunState()
+          + ", runtime="
+          + ((taskType == TaskType.MAP) ? taskAttemptInfo.getRuntime()
+              : ((ReduceTaskAttemptInfo) taskAttemptInfo).getReduceRuntime()));
+    }
+    return taskAttemptInfo;
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobStory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobStory.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobStory.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobStory.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+
+/**
+ * This class is a proxy class for JobStory/ZombieJob for a customized
+ * submission time. Because in the simulation, submission time is totally
+ * re-produced by the simulator, original submission time in job trace should be
+ * ignored.
+ */
+public class SimulatorJobStory implements JobStory {
+  private JobStory job;
+  private long submissionTime;
+
+  public SimulatorJobStory(JobStory job, long time) {
+    this.job = job;
+    this.submissionTime = time;
+  }
+
+  @Override
+  public long getSubmissionTime() {
+    return submissionTime;
+  }
+
+  @Override
+  public InputSplit[] getInputSplits() {
+    return job.getInputSplits();
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public JobConf getJobConf() {
+    return job.getJobConf();
+  }
+
+  @Override
+  public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
+      int taskAttemptNumber, int locality) {
+    return job.getMapTaskAttemptInfoAdjusted(taskNumber, taskAttemptNumber,
+        locality);
+  }
+
+  @Override
+  public String getName() {
+    return job.getName();
+  }
+
+  @Override
+  public org.apache.hadoop.mapreduce.JobID getJobID() {
+    return job.getJobID();
+  }
+
+  @Override
+  public int getNumberMaps() {
+    return job.getNumberMaps();
+  }
+
+  @Override
+  public int getNumberReduces() {
+    return job.getNumberReduces();
+  }
+
+  @Override
+  public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber,
+      int taskAttemptNumber) {
+    return job.getTaskAttemptInfo(taskType, taskNumber, taskAttemptNumber);
+  }
+
+  @Override
+  public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+    return job.getTaskInfo(taskType, taskNumber);
+  }
+
+  @Override
+  public String getUser() {
+    return job.getUser();
+  }
+
+  @Override
+  public Pre21JobHistoryConstants.Values getOutcome() {
+    return job.getOutcome();
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobStoryProducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobStoryProducer.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobStoryProducer.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobStoryProducer.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
+import org.apache.hadoop.tools.rumen.ZombieCluster;
+import org.apache.hadoop.tools.rumen.ZombieJob;
+import org.apache.hadoop.tools.rumen.ZombieJobProducer;
+
+/**
+ * This class creates {@link JobStory} objects from trace file in rumen format.
+ * It is a proxy class over {@link ZombieJobProducer}, and adjusts the
+ * submission time to be aligned with simulation time.
+ */
+public class SimulatorJobStoryProducer implements JobStoryProducer {
+  private final ZombieJobProducer producer;
+  private final long firstJobStartTime;
+  private long relativeTime = -1;
+  private boolean firstJob = true;
+
+  public SimulatorJobStoryProducer(Path path, ZombieCluster cluster,
+      long firstJobStartTime, Configuration conf) throws IOException {
+    producer = new ZombieJobProducer(path, cluster, conf);
+    this.firstJobStartTime = firstJobStartTime;
+  }
+
+  /**
+   * Filter some jobs being fed to the simulator. For now, we filter out killed
+   * jobs to facilitate debugging.
+   * 
+   * @throws IOException
+   */
+  private JobStory getNextJobFiltered() throws IOException {
+    while (true) {
+      ZombieJob job = producer.getNextJob();
+      if (job == null) {
+        return null;
+      }
+      if (job.getOutcome() == Pre21JobHistoryConstants.Values.KILLED) {
+        continue;
+      }
+      if (job.getNumberMaps() == 0) {
+        continue;
+      }
+      if (job.getNumLoggedMaps() == 0) {
+        continue;
+      }
+      return job;
+    }
+  }
+
+  @Override
+  public JobStory getNextJob() throws IOException {
+    JobStory job = getNextJobFiltered();
+    if (job == null)
+      return null;
+    if (firstJob) {
+      firstJob = false;
+      relativeTime = job.getSubmissionTime() - firstJobStartTime;
+    }
+
+    return new SimulatorJobStory(job, job.getSubmissionTime() - relativeTime);
+  }
+
+  @Override
+  public void close() throws IOException {
+    producer.close();
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,667 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobInProgress;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.mapred.SimulatorJobInProgress;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * {@link SimulatorJobTracker} extends {@link JobTracker}. It implements the
+ * {@link JobSubmissionProtocol} and the {@link InterTrackerProtocol} protocols.
+ */
+@SuppressWarnings("deprecation")
+public class SimulatorJobTracker extends JobTracker {
+  // A queue for cleaning up jobs from the memory. The length of this queue
+  // is always less than the constant specified by JOBS_IN_MUMAK_MEMORY.
+  private LinkedList<JobID> cleanupQueue;
+
+  // The simulatorClock maintains the current simulation time 
+  // and should always be synchronized with the time maintained by the engine.
+  private static SimulatorClock clock = null;
+
+  public static final Log LOG = LogFactory.getLog(SimulatorJobTracker.class);
+
+  // This constant is used to specify how many jobs should be maintained in the
+  // memory of the mumak simulator. 
+  private static final int JOBS_IN_MUMAK_MEMORY = 50;
+
+  // The SimulatorEngine data structure is engine that drives the simulator.
+  private static SimulatorEngine engine = null;
+
+  private static synchronized void resetEngineClock(SimulatorEngine engine, SimulatorClock clock) {
+    SimulatorJobTracker.engine = engine;
+    SimulatorJobTracker.clock = clock;
+  }
+  
+  /**
+   * In addition to the standard JobConf object, the constructor for SimulatorJobTracker requires a 
+   * start time for simulation and a reference to the SimulatorEngine object. The clock of the
+   * JobTracker is set with this start time.
+   * @param conf the starting configuration of the SimulatorJobTracker.
+   * @param clock the SimulatorClock object that we use to maintain simulator time.
+   * @param simulatorEngine the simulatorEngine that is running the simulation.
+   */
+  SimulatorJobTracker(JobConf conf, SimulatorClock clock, 
+		              SimulatorEngine simulatorEngine) 
+      throws IOException {
+    // Invoke the super constructor with a flag that 
+    // indicates simulation
+    super(conf, clock, true);
+    resetEngineClock(simulatorEngine, clock);
+    cleanupQueue = new LinkedList<JobID>();
+  }
+
+  /**
+   * Starts the JobTracker with given configuration and a given time. It also
+   * starts the JobNotifier thread. 
+   * @param conf the starting configuration of the SimulatorJobTracker.
+   * @param startTime the starting time of simulation -- this is used to
+   * initialize the clock.
+   * @param engine the SimulatorEngine that we talk to. 
+   * @throws IOException
+   */
+  public static SimulatorJobTracker startTracker(JobConf conf, long startTime, SimulatorEngine engine)
+  throws IOException {
+    SimulatorJobTracker result = null;
+    try {
+      SimulatorClock simClock = new SimulatorClock(startTime);
+      result = new SimulatorJobTracker(conf, simClock, engine);
+      result.taskScheduler.setTaskTrackerManager(result);
+    } catch (IOException e) {
+      LOG.warn("Error starting tracker: "
+          + StringUtils.stringifyException(e));
+    }
+    if (result != null) {
+      JobEndNotifier.startNotifier();
+    }
+
+    return result;
+  }
+
+  /**
+   * Start the SimulatorJobTracker with given configuration after
+   * creating its own SimulatorEngine. Pretty much
+   * used for debugging only. 
+   * @param conf :The starting configuration of the SimulatorJobTracker
+   * @param startTime :The starting time of simulation
+   * @return void
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public static SimulatorJobTracker startTracker(JobConf conf, long startTime)
+  throws IOException, InterruptedException {
+    return startTracker(conf, startTime, new SimulatorEngine());
+  }
+
+  @Override
+  public void offerService() throws InterruptedException, IOException {
+    taskScheduler.start();
+    LOG.info("started taskScheduler...");
+    synchronized (this) {
+      state = State.RUNNING;
+    }
+  }
+
+  /**
+   * Returns the simulatorClock in that is a static object in SimulatorJobTracker. 
+   * 
+   * @return SimulatorClock object.
+   */
+  static Clock getClock() {
+    assert(engine.getCurrentTime() == clock.getTime()): 
+    	   " Engine time = " + engine.getCurrentTime() + 
+    	   " JobTracker time = " + clock.getTime();
+    return clock;
+  }
+
+  /**
+   * Overriding the getCleanTaskReports function of the
+   * original JobTracker since we do not have setup and cleanup 
+   * tasks.
+   * @param jobid JobID for which we desire cleanup reports.
+   */
+  @Override
+  public synchronized TaskReport[] getCleanupTaskReports(JobID jobid) {
+    return null;
+  }
+  
+  /**
+   * Overriding since we do not support queue acls.
+   */
+  @Override
+  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
+    return null;
+  }
+
+  /**
+   * Overriding since we do not simulate setup/cleanup tasks.
+   */
+  @Override
+  public synchronized TaskReport[] getSetupTaskReports(JobID jobid) {
+    return null;
+  }
+
+  @Override
+  public synchronized JobStatus submitJob(JobID jobId) throws IOException {
+    boolean loggingEnabled = LOG.isDebugEnabled();
+    if (loggingEnabled) {
+      LOG.debug("submitJob for jobname = " + jobId);
+    }
+    if (jobs.containsKey(jobId)) {
+      // job already running, don't start twice
+      if (loggingEnabled) {
+        LOG.debug("Job '" + jobId.getId() + "' already present ");
+      }
+      return jobs.get(jobId).getStatus();
+    }
+    JobStory jobStory = SimulatorJobCache.get(jobId);
+    if (jobStory == null) {
+      throw new IllegalArgumentException("Job not found in SimulatorJobCache: "+jobId);
+    }
+    validateAndSetClock(jobStory.getSubmissionTime());
+    
+    SimulatorJobInProgress job = new SimulatorJobInProgress(jobId, this,
+                                                            this.conf, 
+                                                            jobStory);
+    return addJob(jobId, job);
+  }
+  
+  /**
+   * Return the SimulatorJob object given a jobID.
+   * @param jobid
+   * @return
+   */
+  private SimulatorJobInProgress getSimulatorJob(JobID jobid) {
+    return (SimulatorJobInProgress)jobs.get(jobid);
+  }
+  
+  /**
+   * Safely clean-up all data structures at the end of the 
+   * job (success/failure/killed). In addition to performing the tasks that the
+   * original finalizeJob does, we also inform the SimulatorEngine about the 
+   * completion of this job. 
+   *  
+   * @param job completed job.
+   */
+  @Override
+  synchronized void finalizeJob(JobInProgress job) {
+
+    // Let the SimulatorEngine know that the job is done
+    JobStatus cloneStatus = (JobStatus)job.getStatus().clone();
+    engine.markCompletedJob(cloneStatus, 
+                            SimulatorJobTracker.getClock().getTime());
+
+    JobID jobId = job.getStatus().getJobID();
+    LOG.info("Finished job " + jobId + " endtime = " +
+              getClock().getTime() + " with status: " +
+              JobStatus.getJobRunState(job.getStatus().getRunState()));
+    
+    // for updating the metrics and JobHistory, invoke the original 
+    // finalizeJob.
+    super.finalizeJob(job);
+    
+    // now placing this job in queue for future nuking
+    cleanupJob(job);
+  }
+
+  /**
+   * The cleanupJob method maintains the queue cleanQueue. When a job is finalized, 
+   * it is added to the cleanupQueue. Jobs are removed from the cleanupQueue
+   * so that its size is maintained to be less than that specified by
+   * JOBS_IN_MUMAK_MEMORY.
+   * @param job : The JobInProgress object that was just finalized and is 
+   * going to be added to the cleanupQueue. 
+   */
+  private void cleanupJob(JobInProgress job) {
+   
+    cleanupQueue.add(job.getJobID());
+    
+    while(cleanupQueue.size()> JOBS_IN_MUMAK_MEMORY) {
+      JobID removedJob = cleanupQueue.poll();
+      retireJob(removedJob, "");
+    } 
+  }
+  // //////////////////////////////////////////////////
+  // InterTrackerProtocol
+  // //////////////////////////////////////////////////
+
+  @Override
+  synchronized boolean processHeartbeat(TaskTrackerStatus trackerStatus,
+      boolean initialContact) {
+    boolean loggingEnabled = LOG.isDebugEnabled();
+    String trackerName = trackerStatus.getTrackerName();
+    boolean seenBefore = updateTaskTrackerStatus(trackerName, trackerStatus);
+    TaskTracker taskTracker = getTaskTracker(trackerName);
+    // update the status of the task tracker. Also updates all aggregate
+    // statistics
+    if (loggingEnabled) {
+      LOG.debug("processing heartbeat for " + trackerName);
+      LOG.debug("updating TaskTracker status for " + trackerName);
+    }
+    if (initialContact) {
+      // If it's first contact, then clear out 
+      // any state hanging around
+      if (seenBefore) {
+        lostTaskTracker(taskTracker);
+      }
+    } else {
+      // If not first contact, there should be some record of the tracker
+      if (!seenBefore) {
+        LOG.warn("Status from unknown Tracker : " + trackerName);
+        updateTaskTrackerStatus(trackerName, null);
+        return false;
+      }
+    }
+
+    if (initialContact) {
+      if (loggingEnabled) {
+        LOG.debug("adding new tracker name = " + trackerName);
+      }
+      addNewTracker(taskTracker);
+    }
+
+    if (loggingEnabled) {
+      LOG.debug("updating TaskStatus for " + trackerName);
+    }
+    // update status of all tasks from heartbeat
+    updateTaskStatuses(trackerStatus);
+
+    return true;
+  }
+  
+  /**
+   * Utility to validate the current simulation time
+   * @param newSimulationTime
+   */
+  
+  private void validateAndSetClock(long newSimulationTime) {
+     
+    // We do not use the getClock routine here as 
+    // the Engine and JobTracker clocks are different at
+    // this point.
+    long currentSimulationTime = clock.getTime();   
+    if (newSimulationTime < currentSimulationTime) {
+      // time has gone backwards
+      throw new IllegalArgumentException("Time has gone backwards! " +
+                                 "newSimulationTime: " + newSimulationTime +
+                                 " while currentTime: " + 
+                                 currentSimulationTime);
+    }
+    // the simulation time should also match that in the engine
+    assert(newSimulationTime == engine.getCurrentTime()) : 
+    	     " newTime =" + newSimulationTime + 
+    	     " engineTime = " + engine.getCurrentTime();
+
+    // set the current simulation time
+    clock.setTime(newSimulationTime);
+  }
+
+  @Override
+  public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
+      boolean restarted, boolean initialContact, boolean acceptNewTasks,
+      short responseId) throws IOException {
+    boolean loggingEnabled = LOG.isDebugEnabled();
+    if (loggingEnabled) {
+      LOG.debug("Got heartbeat from: " + status.getTrackerName()
+          + " (restarted: " + restarted + " initialContact: " + initialContact
+          + " acceptNewTasks: " + acceptNewTasks + ")" + " with responseId: "
+          + responseId);
+    }
+    if (!(status instanceof SimulatorTaskTrackerStatus)) {
+      throw new IllegalArgumentException(
+          "Expecting SimulatorTaskTrackerStatus, but got " + status.getClass());
+    }
+    SimulatorTaskTrackerStatus taskTrackerStatus = 
+      (SimulatorTaskTrackerStatus) status;
+    
+    String trackerName = taskTrackerStatus.getTrackerName();
+
+    // validate and set the simulation time
+    // according to the time sent by the tracker
+    validateAndSetClock(taskTrackerStatus.getCurrentSimulationTime());
+
+    HeartbeatResponse prevHeartbeatResponse = 
+      trackerToHeartbeatResponseMap.get(trackerName);
+
+    if (initialContact != true) {
+      // If this isn't the 'initial contact' from the tasktracker,
+      // there is something seriously wrong if the JobTracker has
+      // no record of the 'previous heartbeat'; if so, ask the
+      // tasktracker to re-initialize itself.
+      if (prevHeartbeatResponse == null) {
+        // This is the first heartbeat from the old tracker to the newly
+        // started JobTracker
+        // Jobtracker might have restarted but no recovery is needed
+        // otherwise this code should not be reached
+        LOG.warn("Serious problem, cannot find record of 'previous' " +
+                 " heartbeat for '" + trackerName +
+                 "'; reinitializing the tasktracker");
+        return new HeartbeatResponse(responseId,
+            new TaskTrackerAction[] { new ReinitTrackerAction() });
+      } else {
+
+        // It is completely safe to not process a 'duplicate' heartbeat
+        // from a
+        // {@link TaskTracker} since it resends the heartbeat when rpcs
+        // are
+        // lost see {@link TaskTracker.transmitHeartbeat()};
+        // acknowledge it by re-sending the previous response to let the
+        // {@link TaskTracker} go forward.
+        if (prevHeartbeatResponse.getResponseId() != responseId) {
+          if (loggingEnabled) {
+            LOG.debug("Ignoring 'duplicate' heartbeat from '" + trackerName
+                + "'; resending the previous 'lost' response");
+          }
+          return prevHeartbeatResponse;
+        }
+      }
+    }
+
+    if (loggingEnabled) {
+      LOG.debug("processed heartbeat for responseId = " + responseId);
+    }
+    short newResponseId = (short) (responseId + 1);
+    status.setLastSeen(getClock().getTime());
+    
+    // process the incoming heartbeat 
+    if (!processHeartbeat(taskTrackerStatus, initialContact)) {
+      if (prevHeartbeatResponse != null) {
+        trackerToHeartbeatResponseMap.remove(trackerName);
+      }
+      return new HeartbeatResponse(newResponseId,
+          new TaskTrackerAction[] { new ReinitTrackerAction() });
+    }
+
+    
+    // Initialize the response to be sent for the heartbeat
+    HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
+    List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
+    if (acceptNewTasks) {
+      TaskTracker taskTracker = getTaskTracker(trackerName);
+      // get the list of tasks to be executed on this tasktracker
+      List<Task> tasks = taskScheduler.assignTasks(taskTracker);
+      if (tasks != null) {
+        if (loggingEnabled && tasks.size()>0) {
+          LOG.debug("Tasks found from TaskScheduler: number = " + tasks.size());
+        }
+
+        for (Task task : tasks) {
+          TaskAttemptID taskAttemptID = task.getTaskID();
+          // get the JobID and the JIP object for this taskAttempt
+          JobID jobID = taskAttemptID.getJobID();
+          SimulatorJobInProgress job = getSimulatorJob(jobID);
+
+          if (job == null) {
+        	  LOG.error("Getting taskAttemptId=" + taskAttemptID + 
+                      " for job " + jobID + 
+                      " not present in SimulatorJobTracker");
+            
+            throw new IOException("Getting taskAttemptId=" + taskAttemptID + 
+                                  " for job " + jobID + 
+                                  " not present in SimulatorJobTracker");
+          }
+          // add the launch task action to the list
+          if (loggingEnabled) {
+            LOG.debug("Getting taskAttemptInfo for '" + taskAttemptID
+                + "' for tracker '" + trackerName + "'");
+          }
+          TaskAttemptInfo taskAttemptInfo = 
+                          job.getTaskAttemptInfo(taskTracker, taskAttemptID);
+
+          if (taskAttemptInfo == null) { 
+            throw new RuntimeException("Empty taskAttemptInfo: " +
+            		                       "task information missing");
+          }
+
+          // create the SLTA object using the task attempt information
+          if (loggingEnabled) {
+            LOG
+                .debug("Adding LaunchTaskAction for '" + taskAttemptID
+                    + "' for tracker " + trackerName + " taskType="
+                    + taskAttemptID.getTaskType() + " time="
+                    + getClock().getTime());
+          }
+          SimulatorLaunchTaskAction newlaunchTask = 
+        	  new SimulatorLaunchTaskAction(task, taskAttemptInfo);
+          
+          actions.add(newlaunchTask);
+        }
+      }
+    }
+
+    // Check for tasks to be killed
+    // also get the attemptIDs in a separate set for quick lookup
+    // during the MapCompletion generation
+    List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
+     
+    if (killTasksList != null) {
+      if (loggingEnabled) {
+        for (TaskTrackerAction ttAction : killTasksList) {
+          LOG.debug("Time =" + getClock().getTime() + " tracker=" + trackerName
+              + " KillTaskAction for:"
+              + ((KillTaskAction) ttAction).getTaskID());
+        }
+      }
+      actions.addAll(killTasksList);
+    }
+
+    // Check for tasks whose outputs can be saved
+    // this is currently a no-op
+    List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
+    if (commitTasksList != null) {
+      actions.addAll(commitTasksList);
+    }
+
+    // check the reduce tasks in this task-tracker, and add in the
+    // AllMapTasksCompletedTaskAction for each of the reduce tasks
+    // this enables the reduce tasks to move from shuffle to reduce phase
+    List<TaskTrackerAction> mapCompletionTasks = 
+      getMapCompletionTasks(taskTrackerStatus, killTasksList);
+
+    if (mapCompletionTasks != null) {
+      actions.addAll(mapCompletionTasks);
+    }
+
+    if (loggingEnabled) {
+      LOG.debug("Done with collection actions for tracker " + trackerName
+          + " for responseId " + responseId);
+    }
+    // calculate next heartbeat interval and put in heartbeat response
+    int nextInterval = getNextHeartbeatInterval();
+    response.setHeartbeatInterval(nextInterval);
+    response.setActions(actions.toArray(new TaskTrackerAction[actions
+                                                              .size()]));
+    if (loggingEnabled) {
+      LOG.debug("Nextinterval for tracker " + trackerName + " is "
+          + nextInterval);
+    }
+    // Update the trackerToHeartbeatResponseMap
+    trackerToHeartbeatResponseMap.put(trackerName, response);
+
+    // Done processing the hearbeat, now remove 'marked' tasks
+    removeMarkedTasks(trackerName);
+
+    return response;
+  }
+
+  /**
+   * The getMapCompletion method is intended to inform taskTrackes when to change the status
+   * of reduce tasks from "shuffle" to "reduce".
+   * For all reduce tasks in this TaskTracker that are
+   * in the shuffle phase, getMapCompletionTasks finds the number of finished maps for 
+   * this job from the jobInProgress object. If this
+   * number equals the number of desired maps for this job, then it adds an 
+   * AllMapsCompletedTaskAction for this reduce task-attempt.
+   * 
+   * @param status
+   *            The status of the task tracker
+   * @return List of TaskTrackerActions
+   */
+  private List<TaskTrackerAction> getMapCompletionTasks(
+      TaskTrackerStatus status, 
+      List<TaskTrackerAction> tasksToKill) {
+    boolean loggingEnabled = LOG.isDebugEnabled();
+    
+    // Build up the list of tasks about to be killed
+    Set<TaskAttemptID> killedTasks = new HashSet<TaskAttemptID>();
+    if (tasksToKill != null) {
+      for (TaskTrackerAction taskToKill : tasksToKill) {
+        killedTasks.add(((KillTaskAction)taskToKill).getTaskID());
+      }
+    }
+
+    String trackerName = status.getTrackerName();
+
+    List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
+
+    // loop through the list of task statuses
+    for (TaskStatus report : status.getTaskReports()) {
+
+      TaskAttemptID taskAttemptId = report.getTaskID();
+      SimulatorJobInProgress job = getSimulatorJob(taskAttemptId.getJobID());
+      
+      if(job ==null) {
+        // This job has completed before.
+        // and this is a zombie reduce-task
+        Set<JobID> jobsToCleanup = trackerToJobsToCleanup.get(trackerName);
+        if (jobsToCleanup == null) {
+          jobsToCleanup = new HashSet<JobID>();
+          trackerToJobsToCleanup.put(trackerName, jobsToCleanup);
+        }
+        jobsToCleanup.add(taskAttemptId.getJobID());
+        continue;
+      }   
+      JobStatus jobStatus = job.getStatus();
+      TaskInProgress tip = taskidToTIPMap.get(taskAttemptId);
+
+      // if the  job is running, attempt is running
+      // no KillTask is being sent for this attempt
+      // task is a reduce and attempt is in shuffle phase
+      // this precludes sending both KillTask and AllMapsCompletion 
+      // for same reduce-attempt 
+
+      if (jobStatus.getRunState()== JobStatus.RUNNING && 
+          tip.isRunningTask(taskAttemptId) &&
+          !killedTasks.contains(taskAttemptId) && 
+          !report.getIsMap() &&
+          report.getPhase() == TaskStatus.Phase.SHUFFLE) {
+
+        if (loggingEnabled) {
+          LOG.debug("Need map-completion information for REDUCEattempt "
+              + taskAttemptId + " in tracker " + trackerName);
+
+          LOG.debug("getMapCompletion: job=" + job.getJobID() + " pendingMaps="
+              + job.pendingMaps());
+        }
+        // Check whether the number of finishedMaps equals the
+        // number of maps
+        boolean canSendMapCompletion = false;
+       
+        canSendMapCompletion = (job.finishedMaps()==job.desiredMaps());	
+
+        if (canSendMapCompletion) {
+          if (loggingEnabled) {
+            LOG.debug("Adding MapCompletion for taskAttempt " + taskAttemptId
+                + " in tracker " + trackerName);
+
+            LOG.debug("FinishedMaps for job:" + job.getJobID() + " is = "
+                + job.finishedMaps() + "/" + job.desiredMaps());
+
+            LOG.debug("AllMapsCompleted for task " + taskAttemptId + " time="
+                + getClock().getTime());
+          }
+          actions.add(new AllMapsCompletedTaskAction(taskAttemptId));
+        }
+      }
+    }
+    return actions;
+  }
+
+ 
+  @Override
+  void updateTaskStatuses(TaskTrackerStatus status) {
+    boolean loggingEnabled = LOG.isDebugEnabled();
+    String trackerName = status.getTrackerName();
+    // loop through the list of task statuses
+    if (loggingEnabled) {
+      LOG.debug("Updating task statuses for tracker " + trackerName);
+    }
+    for (TaskStatus report : status.getTaskReports()) {
+      report.setTaskTracker(trackerName);
+      TaskAttemptID taskAttemptId = report.getTaskID();
+      JobID jobid = taskAttemptId.getJobID();
+      if (loggingEnabled) {
+        LOG.debug("Updating status for job " + jobid + " for task = "
+            + taskAttemptId + " status=" + report.getProgress()
+            + " for tracker: " + trackerName);
+      }
+      SimulatorJobInProgress job = 
+        getSimulatorJob(taskAttemptId.getJobID());
+
+      if(job ==null) {
+        // This job bas completed before.
+        Set<JobID> jobsToCleanup = trackerToJobsToCleanup.get(trackerName);
+        if (jobsToCleanup == null) {
+          jobsToCleanup = new HashSet<JobID>();
+          trackerToJobsToCleanup.put(trackerName, jobsToCleanup);
+        }
+        jobsToCleanup.add(taskAttemptId.getJobID());
+        continue;
+      }
+      TaskInProgress tip = taskidToTIPMap.get(taskAttemptId);
+
+      JobStatus prevStatus = (JobStatus) job.getStatus().clone();
+      job.updateTaskStatus(tip, (TaskStatus) report.clone());
+      JobStatus newStatus = (JobStatus) job.getStatus().clone();
+      if (tip.isComplete()) {
+        if (loggingEnabled) {
+          LOG.debug("Completed task attempt " + taskAttemptId + " tracker:"
+              + trackerName + " time=" + getClock().getTime());
+        }
+      }
+
+      if (prevStatus.getRunState() != newStatus.getRunState()) {
+        if (loggingEnabled) {
+          LOG.debug("Informing Listeners of job " + jobid + " of newStatus "
+              + JobStatus.getJobRunState(newStatus.getRunState()));
+        }
+        JobStatusChangeEvent event = new JobStatusChangeEvent(job,
+            EventType.RUN_STATE_CHANGED, prevStatus, newStatus);
+
+        updateJobInProgressListeners(event);
+      }
+
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorLaunchTaskAction.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorLaunchTaskAction.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorLaunchTaskAction.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorLaunchTaskAction.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+
+/**
+ * This class is used to augment {@link LaunchTaskAction} with run time statistics 
+ * and the final task state (successfull xor failed).
+ */
+class SimulatorLaunchTaskAction extends LaunchTaskAction {
+  /**
+   * Run time resource usage of the task.
+   */
+  private TaskAttemptInfo taskAttemptInfo;
+
+  /**
+   * Constructs a SimulatorLaunchTaskAction object for a {@link Task}.
+   * @param task Task task to be launched
+   * @param taskAttemptInfo resource usage model for task execution
+   */            
+  public SimulatorLaunchTaskAction(Task task,
+                                   TaskAttemptInfo taskAttemptInfo) {
+    super(task);
+    this.taskAttemptInfo = taskAttemptInfo;
+  }
+  
+  /** Get the resource usage model for the task. */
+  public TaskAttemptInfo getTaskAttemptInfo() {
+    return taskAttemptInfo;
+  }
+  
+  @Override
+  public String toString() {
+    return this.getClass().getName() + "[taskID=" + 
+           this.getTask().getTaskID() + "]";
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTracker.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTracker.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTracker.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,738 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.TaskStatus.Phase;
+import org.apache.hadoop.mapred.TaskStatus.State;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo;
+// Explicitly use the new api, older o.a.h.mapred.TaskAttemptID is deprecated
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * This class simulates a {@link TaskTracker}. Its main purpose is to call heartbeat()
+ * of the simulated Job Tracker with apropriately updated statuses of the 
+ * tasks assigned to it.
+ *
+ * The events emitted and consumed are HeartbeatEvent and 
+ * TaskAttemptCompletionEvent .
+ *
+ * Internal naming convention: accept() dispatches simulation events to 
+ * process*Event() methods. heartbeat() dispactches task tracker actions to 
+ * handle*Action() methods.
+ */
+public class SimulatorTaskTracker implements SimulatorEventListener {
+  /** The name of the task tracker. */
+  protected final String taskTrackerName;
+  /** The name of the host the task tracker is running on. */
+  protected final String hostName;
+  /** The http port the simulated task tracker reports to the jobtracker. */  
+  protected final int httpPort = 80;
+  /** Number of map slots. */
+  protected final int maxMapSlots;
+  /** Number of reduce slots. */
+  protected final int maxReduceSlots;
+  /** The job tracker this task tracker is a slave of. */
+  protected final InterTrackerProtocol jobTracker;
+
+  /**
+   * State of and bookkeeping information for all tasks assigned to the task 
+   * tracker. Contains all the information about running or completed but
+   * not yet reported tasks. We manage it in a mark & sweep garbage collector 
+   * manner. We insert tasks on launch, mark them on completion, and remove
+   * completed tasks at heartbeat() reports.
+   */
+  protected Map<TaskAttemptID, SimulatorTaskInProgress> tasks = 
+      new HashMap<TaskAttemptID, SimulatorTaskInProgress>();
+  /** 
+   * Number of map slots allocated to tasks in RUNNING state on this task 
+   * tracker. Must be in sync with the tasks map above. 
+   */
+  private int usedMapSlots = 0;
+  /**
+   * Number of reduce slots allocated to tasks in RUNNING state on this task
+   * tracker. Must be in sync with the tasks map above.
+   */
+  private int usedReduceSlots = 0;  
+  /**
+   * True if the jobTracker.heartbeat() call to be made is the first one.
+   * We need this to mimick the InterTrackerProtocol properly.
+   */
+  private boolean firstHeartbeat = true;
+
+  // last heartbeat response recieved
+  private short heartbeatResponseId = -1;
+
+  /**
+   * Task attempt ids for which TaskAttemptCompletionEvent was created but the 
+   * task attempt got killed. 
+   */
+  private Set<TaskAttemptID> orphanTaskCompletions = 
+    new HashSet<TaskAttemptID>();
+
+  /** The log object to send our messages to; only used for debugging. */
+  private static final Log LOG = LogFactory.getLog(SimulatorTaskTracker.class);
+  
+  /**
+   * Constructs a task tracker. 
+   *
+   * @param jobTracker the SimulatorJobTracker we talk to
+   * @param taskTrackerName the task tracker name to report, otherwise unused
+   * @param hostName the host name to report, otherwise unused
+   * @param maxMapTasks the number of map slots
+   * @param maxReduceTasks the number of reduce slots
+   */
+  public SimulatorTaskTracker(InterTrackerProtocol jobTracker, 
+                              String taskTrackerName, String hostName, 
+                              int maxMapTasks, int maxReduceTasks) {
+    LOG.debug("SimulatorTaskTracker constructor, taskTrackerName=" +
+              taskTrackerName);
+
+    this.jobTracker = jobTracker;    
+    this.taskTrackerName = taskTrackerName;
+    this.hostName = hostName;
+    this.maxMapSlots = maxMapTasks;
+    this.maxReduceSlots = maxReduceTasks;
+  }
+  
+  /**
+   * Processes a simulation event. 
+   *
+   * @param event the event to process, should be an instance of HeartbeatEvent
+   *        or TaskAttemptCompletionEvent
+   * @return the list of new events generated in response
+   */
+  @Override
+  public List<SimulatorEvent> accept(SimulatorEvent event) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Accepted event " + event);
+    }
+
+    if (event instanceof HeartbeatEvent) {
+      return processHeartbeatEvent((HeartbeatEvent)event);
+    } else if (event instanceof TaskAttemptCompletionEvent) {
+      return processTaskAttemptCompletionEvent((TaskAttemptCompletionEvent)
+                                               event);
+    } else {
+      throw new IllegalArgumentException("Unhandled event "+event);
+    }
+  }
+  
+  /**
+   * Called once at the start of the simulation.
+   *
+   * @param when Time when the task tracker starts.
+   * @return the initial HeartbeatEvent for ourselves.
+   */ 
+  public List<SimulatorEvent> init(long when) {
+    LOG.debug("TaskTracker starting up, current simulation time=" + when);
+
+    return Collections.<SimulatorEvent>singletonList(new HeartbeatEvent(this, when));  
+  }
+ 
+  /**
+   * Stops running a task attempt on the task tracker. It also updates the 
+   * number of available slots accordingly.
+   * 
+   * @param finalStatus the TaskStatus containing the task id and final 
+   *        status of the task attempt. This rountine asserts a lot of the
+   *        finalStatus params, in case it is coming from a task attempt
+   *        completion event sent to ourselves. Only the run state, finish time,
+   *        and progress fields of the task attempt are updated.
+   * @param now Current simulation time, used for assert only
+   */
+  private void finishRunningTask(TaskStatus finalStatus, long now) {
+    TaskAttemptID taskId = finalStatus.getTaskID();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Finishing running task id=" + taskId + ", now=" + now);
+    }
+
+    SimulatorTaskInProgress tip = tasks.get(taskId);
+    if (tip == null) {
+      throw new IllegalArgumentException("Unknown task attempt " + taskId
+          + " completed");
+    }
+    TaskStatus currentStatus = tip.getTaskStatus();
+    if (currentStatus.getRunState() != State.RUNNING) {
+      throw new IllegalArgumentException(
+          "Task attempt to finish is not running: " + tip);
+    }
+
+    // Check that finalStatus describes a task attempt that has just been
+    // completed
+    State finalRunState = finalStatus.getRunState();
+    if (finalRunState != State.SUCCEEDED && finalRunState != State.FAILED
+        && finalRunState != State.KILLED) {
+      throw new IllegalArgumentException(
+          "Final run state for completed task can't be : " + finalRunState
+              + " " + tip);
+    }
+
+    if (now != finalStatus.getFinishTime()) {
+      throw new IllegalArgumentException(
+          "Current time does not match task finish time: now=" + now
+              + ", finish=" + finalStatus.getFinishTime());
+    }
+
+    if (currentStatus.getIsMap() != finalStatus.getIsMap()
+        || currentStatus.getNumSlots() != finalStatus.getNumSlots()
+        || currentStatus.getPhase() != finalStatus.getPhase()
+        || currentStatus.getStartTime() != finalStatus.getStartTime()) {
+      throw new IllegalArgumentException(
+          "Current status does not match final status");
+    }
+
+    // We can't assert getShuffleFinishTime() and getSortFinishTime() for
+    // reduces as those were unknown when the task attempt completion event
+    // was created. We have not called setMapFinishTime() for maps either.
+    // If we were really thorough we could update the progress of the task
+    // and check if it is consistent with finalStatus.
+
+    // If we've got this far it is safe to update the task status
+    currentStatus.setRunState(finalStatus.getRunState());
+    currentStatus.setFinishTime(finalStatus.getFinishTime());
+    currentStatus.setProgress(finalStatus.getProgress());
+
+    // and update the free slots
+    int numSlots = currentStatus.getNumSlots();
+    if (tip.isMapTask()) {
+      usedMapSlots -= numSlots;
+      if (usedMapSlots < 0) {
+        throw new IllegalStateException(
+            "TaskTracker reaches negative map slots: " + usedMapSlots);
+      }
+    } else {
+      usedReduceSlots -= numSlots;
+      if (usedReduceSlots < 0) {
+        throw new IllegalStateException(
+            "TaskTracker reaches negative reduce slots: " + usedReduceSlots);
+      }
+    }
+  }
+
+  /**
+   * Records that a task attempt has completed. Ignores the event for tasks
+   * that got killed after the creation of the completion event.
+   * 
+   * @param event the TaskAttemptCompletionEvent the tracker sent to itself
+   * @return the list of response events, empty
+   */ 
+  private List<SimulatorEvent> processTaskAttemptCompletionEvent(
+      TaskAttemptCompletionEvent event) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing task attempt completion event" + event);
+    }
+    
+    long now = event.getTimeStamp();  
+    TaskStatus finalStatus = event.getStatus();
+    TaskAttemptID taskID = finalStatus.getTaskID();
+    boolean killedEarlier = orphanTaskCompletions.remove(taskID);
+    if (!killedEarlier) {
+      finishRunningTask(finalStatus, now);
+    }
+    return SimulatorEngine.EMPTY_EVENTS;
+  }
+  
+ 
+  /** 
+   * Creates a signal for itself marking the completion of a task attempt. 
+   * It assumes that the task attempt hasn't made any progress in the user 
+   * space code so far, i.e. it is called right at launch for map tasks and
+   * immediately after all maps completed for reduce tasks.
+   *
+   * @param tip the simulator task in progress
+   * @param now the current simulation time
+   * @return the TaskAttemptCompletionEvent we are sending to ourselves
+   */
+  private TaskAttemptCompletionEvent createTaskAttemptCompletionEvent(
+                                       SimulatorTaskInProgress tip, long now) {
+    // We need to clone() status as we modify and it goes into an Event
+    TaskStatus status = (TaskStatus)tip.getTaskStatus().clone();
+    long delta = tip.getUserSpaceRunTime();
+    assert delta >= 0 : "TaskAttempt " + tip.getTaskStatus().getTaskID()
+        + " has negative UserSpaceRunTime = " + delta;
+    long finishTime = now + delta;
+    status.setFinishTime(finishTime);
+    if (tip.isMapTask()) {
+      status.setMapFinishTime(finishTime);
+    }
+    status.setProgress(1.0f);
+    status.setRunState(tip.getFinalRunState());
+    TaskAttemptCompletionEvent event = 
+        new TaskAttemptCompletionEvent(this, status);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Created task attempt completion event " + event);
+    }
+    return event;
+  }
+  
+  /**
+   * Launches a task on the simulated task tracker. 
+   * 
+   * @param action SimulatorLaunchTaskAction sent by the job tracker
+   * @param now current simulation time
+   * @return new events generated, a TaskAttemptCompletionEvent for map
+   *         tasks, empty otherwise
+   */
+  private List<SimulatorEvent> handleSimulatorLaunchTaskAction(
+                         SimulatorLaunchTaskAction action, long now) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Handling launch task action " + action);
+    }
+    // First, create statuses and update used slots for map and reduce 
+    // task separately
+    Task task = action.getTask();
+    TaskAttemptID taskId = task.getTaskID();
+    if (tasks.containsKey(taskId)) {
+      throw new IllegalArgumentException("Multiple launch of task id =" + taskId);
+    }
+
+    // Ctor of MapTaskStatus and ReduceTaskStatus need deprecated 
+    // o.a.h.mapred.TaskAttemptID, hence the downgrade
+    org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi = 
+        org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId);
+    TaskStatus status;
+    int numSlotsRequired = task.getNumSlotsRequired();    
+    Counters emptyCounters = new Counters();
+    if (task.isMapTask()) {     
+      status = new MapTaskStatus(taskIdOldApi, 0f, numSlotsRequired,
+                                 State.RUNNING, "", "", taskTrackerName, 
+                                 Phase.MAP, emptyCounters);
+      usedMapSlots += numSlotsRequired;
+      if (usedMapSlots > maxMapSlots) {
+        throw new IllegalStateException("usedMapSlots exceeds maxMapSlots: " + 
+          usedMapSlots + " > " + maxMapSlots);
+      }
+    } else {
+      status = new ReduceTaskStatus(taskIdOldApi, 0f, numSlotsRequired, 
+                                    State.RUNNING, "", "", taskTrackerName,
+                                    Phase.SHUFFLE, emptyCounters);
+      usedReduceSlots += numSlotsRequired;
+      if (usedReduceSlots > maxReduceSlots) {
+        throw new IllegalStateException("usedReduceSlots exceeds usedReduceSlots: " + 
+            usedReduceSlots + " > " + usedReduceSlots);
+      }
+    }
+    
+    //  Second, create and store a TIP
+    status.setStartTime(now);
+    SimulatorTaskInProgress tip = 
+      new SimulatorTaskInProgress(action, status, now);
+    tasks.put(taskId, tip);
+
+    // Third, schedule events for ourselves
+    if (task.isMapTask()) {
+      // we know when this task attempts ends iff it is a map 
+      TaskAttemptCompletionEvent e = createTaskAttemptCompletionEvent(tip, now);
+      return Collections.<SimulatorEvent>singletonList(e);
+    } else { 
+      // reduce, completion time can only be determined when all maps are done
+      return SimulatorEngine.EMPTY_EVENTS;
+    }
+  }
+  
+  /** 
+   * Kills a task attempt.
+   *
+   * @param action contains the task attempt to kill
+   * @param now current simulation time
+   * @return new events generated in response, empty
+   */
+  private List<SimulatorEvent> handleKillTaskAction(KillTaskAction action, long now) {
+    TaskAttemptID taskId = action.getTaskID();
+    // we don't have a nice(r) toString() in Hadoop's TaskActions 
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Handling kill task action, taskId=" + taskId + ", now=" + now);
+    }
+    
+    SimulatorTaskInProgress tip = tasks.get(taskId);
+    
+    // Safety check: We might get a KillTaskAction even for completed reduces
+    if (tip == null) {
+      return SimulatorEngine.EMPTY_EVENTS;
+    }
+    
+    progressTaskStatus(tip, now); // make progress up to date
+    TaskStatus finalStatus = (TaskStatus)tip.getTaskStatus().clone();
+    finalStatus.setFinishTime(now);
+    finalStatus.setRunState(State.KILLED);
+    finishRunningTask(finalStatus, now);
+   
+    if (finalStatus.getIsMap() || finalStatus.getPhase() == Phase.REDUCE) {
+      // if we have already created a task attempt completion event we remember
+      // the task id, so that we can safely ignore the event when its delivered
+      orphanTaskCompletions.add(taskId);
+    }
+    return SimulatorEngine.EMPTY_EVENTS;
+  }  
+
+  /** 
+   * Starts "running" the REDUCE phase of reduce upon being notified that 
+   * all map tasks are (successfully) done.
+   *
+   * @param action contains the notification for one of the reduce tasks
+   * @param now current simulation time
+   * @return new events generated, a single TaskAttemptCompletionEvent for the
+   *         reduce
+   */
+  private List<SimulatorEvent> handleAllMapsCompletedTaskAction(
+                        AllMapsCompletedTaskAction action, long now) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Handling all maps completed task action " + action);
+    }
+    
+    TaskAttemptID taskId = action.getTaskID();
+    SimulatorTaskInProgress tip = tasks.get(taskId);
+    // If tip is null here it is because the task attempt to be notified is
+    // unknown to this TaskTracker.
+    TaskStatus status = tip.getTaskStatus();
+    if (status.getIsMap()) {
+      throw new IllegalStateException(
+          "Invalid AllMapsCompletedTaskAction, task attempt "
+              + "to be notified is a map: " + taskId + " " + status);
+    }
+    if (status.getPhase() != Phase.SHUFFLE) {
+      throw new IllegalArgumentException(
+          "Reducer task attempt already notified: " + taskId + " " + status);
+    }
+           
+    // Warning: setPhase() uses System.currentTimeMillis() internally to
+    // set shuffle and sort times, but we overwrite that manually anyway
+    status.setPhase(Phase.REDUCE);
+    status.setShuffleFinishTime(now);
+    status.setSortFinishTime(now);
+    
+    // Forecast the completion of this reduce
+    TaskAttemptCompletionEvent e = createTaskAttemptCompletionEvent(tip, now);
+    return Collections.<SimulatorEvent>singletonList(e);
+  }
+  
+  /** 
+   * Updates the progress indicator of a task if it is running.
+   * 
+   * @param tip simulator task in progress whose progress is to be updated
+   * @param now current simulation time
+   */
+  private void progressTaskStatus(SimulatorTaskInProgress tip, long now) {
+    TaskStatus status = tip.getTaskStatus();
+    if (status.getRunState() != State.RUNNING) {
+      return; // nothing to be done
+    }
+
+    boolean isMap = tip.isMapTask();
+    // Time when the user space code started
+    long startTime = -1;
+    // Time spent in map or just in the REDUCE phase of a reduce task
+    long runTime = tip.getUserSpaceRunTime();
+    float progress = 0.0f;
+    if (isMap) {
+      // We linearly estimate the progress of maps since their start 
+      startTime = status.getStartTime();
+      progress = ((float)(now - startTime)) / runTime;
+    } else {
+      // We don't model reduce progress in the SHUFFLE or SORT phases
+      // We use linear estimate for the 3rd, REDUCE phase
+      Phase reducePhase = status.getPhase();
+      switch (reducePhase) {
+      case SHUFFLE:
+        progress = 0.0f; // 0 phase is done out of 3
+        break;
+      case SORT:
+        progress = 1.0f/3; // 1 phase is done out of 3
+        break;
+      case REDUCE: {
+        // REDUCE phase with the user code started when sort finished
+        startTime = status.getSortFinishTime();
+        // 0.66f : 2 phases are done out of of 3
+        progress = 2.0f/3 + (((float) (now - startTime)) / runTime) / 3.0f;
+      }
+        break;
+      default:
+        // should never get here
+        throw new IllegalArgumentException("Invalid reducePhase=" + reducePhase);
+      }
+    }
+    
+    final float EPSILON = 0.0001f;
+    if (progress < -EPSILON || progress > 1 + EPSILON) {
+      throw new IllegalStateException("Task progress out of range: " + progress);
+    }
+    progress = Math.max(Math.min(1.0f, progress), 0.0f);
+    status.setProgress(progress);
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Updated task progress, taskId=" + status.getTaskID()
+          + ", progress=" + status.getProgress());
+    }
+  }
+  
+  /**
+   * Updates the progress indicator of all running tasks. 
+   *
+   * @param now current simulation time
+   */
+  private void progressTaskStatuses(long now) {
+    for (SimulatorTaskInProgress tip : tasks.values()) {
+      progressTaskStatus(tip, now);
+    }
+  }
+
+  /** 
+   * Frees up bookkeping memory used by completed tasks. 
+   * Has no effect on the events or logs produced by the SimulatorTaskTracker.
+   * We need this in order not to report completed task multiple times and 
+   * to ensure that we do not run out of Java heap memory in larger 
+   * simulations.
+   */
+  private void garbageCollectCompletedTasks() {
+    for (Iterator<TaskAttemptID> iter = tasks.keySet().iterator();
+         iter.hasNext();) {
+      TaskAttemptID taskId = iter.next();
+      SimulatorTaskInProgress tip = tasks.get(taskId);
+      if (tip.getTaskStatus().getRunState() != State.RUNNING) {
+        iter.remove();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Garbage collected SimulatorTIP, taskId=" + taskId);
+        }
+        // We don't have to / must not touch usedMapSlots and usedReduceSlots
+        // as those were already updated by processTaskAttemptCompletionEvent() 
+        // when the task switched its state from running
+      }
+    }
+  }
+  
+  /**
+   * Creates a list of task statuses suitable for transmission via heartbeat().
+   * The task statuses are cloned() so that the heartbeat() callee, the job 
+   * tracker, can't mess up the SimulatorTaskTracker's internal data.
+   *
+   * @return the list of running and recently completed task statuses 
+   * on the tracker
+   */
+  private List<TaskStatus> collectAndCloneTaskStatuses() {
+    ArrayList<TaskStatus> statuses = new ArrayList<TaskStatus>();
+    for (SimulatorTaskInProgress tip : tasks.values()) {
+      statuses.add((TaskStatus)tip.getTaskStatus().clone());
+    }
+    return statuses;
+  }
+
+  /**
+   * Handles the HeartbeatResponse received from the job tracker upon 
+   * heartbeat(). Dispatches to handle*Action() methods.
+   *
+   * @param response HeartbeatResponse received from the job tracker
+   * @param now current simulation time
+   * @return list of new events generated in response to the task actions
+   */
+  private List<SimulatorEvent> handleHeartbeatResponse(HeartbeatResponse response,
+                                              long now) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Handling heartbeat response " + response);
+    }
+
+    List<SimulatorEvent> events = new ArrayList<SimulatorEvent>();  
+    TaskTrackerAction[] actions = response.getActions();
+    for (TaskTrackerAction action : actions) {
+      List<SimulatorEvent> actionEvents;
+      if (action instanceof SimulatorLaunchTaskAction) {
+        actionEvents = handleSimulatorLaunchTaskAction(
+            (SimulatorLaunchTaskAction)action, now);            
+      } else if(action instanceof KillTaskAction) {
+        actionEvents = handleKillTaskAction((KillTaskAction)action, now);
+      } else if(action instanceof AllMapsCompletedTaskAction) {
+        // our extra task action for notifying the reducers
+        actionEvents = handleAllMapsCompletedTaskAction(
+            (AllMapsCompletedTaskAction)action, now);
+      } else {
+        // Should never get here.
+        // CommitTaskAction is not implemented in the simulator
+        // LaunchTaskAction has to be SimulatorLaunchTaskAction
+        throw new UnsupportedOperationException("Unimplemented TaskAction: "
+            + action);
+      }
+      events.addAll(actionEvents);
+    }
+    return events;
+  }
+  
+  /** 
+   * Transmits a heartbeat event to the jobtracker and processes the response.
+   *
+   * @param event HeartbeatEvent to process
+   * @return list of new events generated in response
+   */
+  private List<SimulatorEvent> processHeartbeatEvent(HeartbeatEvent event) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing heartbeat event " + event);
+    }
+    
+    long now = event.getTimeStamp();
+    
+    // Create the TaskTrackerStatus to report
+    progressTaskStatuses(now);
+    List<TaskStatus> taskStatuses = collectAndCloneTaskStatuses();
+    boolean askForNewTask = (usedMapSlots < maxMapSlots ||
+                             usedReduceSlots < maxReduceSlots);
+
+    // 0 means failures == 0 here. Undocumented in TaskTracker, but does not 
+    // seem to be used at all in org.apache.hadoop.mapred .
+    TaskTrackerStatus taskTrackerStatus = 
+      new SimulatorTaskTrackerStatus(taskTrackerName, hostName, httpPort, 
+          taskStatuses, 0, 
+          maxMapSlots, maxReduceSlots, now);
+    
+    // This is the right, and only, place to release bookkeping memory held 
+    // by completed tasks: after collectAndCloneTaskStatuses() and before 
+    // heartbeat().
+    // The status of TIPs to be purged is already cloned & copied to
+    // taskStatuses for reporting
+    // We shouldn't run the gc after heartbeat() since  KillTaskAction might 
+    // produce new completed tasks that we have not yet reported back and 
+    // don't want to purge immediately.
+    garbageCollectCompletedTasks();
+    
+    // Transmit the heartbeat 
+    HeartbeatResponse response = null;
+    try {
+      response = 
+        jobTracker.heartbeat(taskTrackerStatus, false, firstHeartbeat, 
+                             askForNewTask, heartbeatResponseId);
+    } catch (IOException ioe) {
+      throw new IllegalStateException("Internal error", ioe);
+    }
+    firstHeartbeat = false;
+
+    // The heartbeat got through successfully!
+    heartbeatResponseId = response.getResponseId();
+
+    // Process the heartbeat response
+    List<SimulatorEvent> events = handleHeartbeatResponse(response, now);
+    
+    // Next heartbeat
+    events.add(new HeartbeatEvent(this, now + response.getHeartbeatInterval()));
+
+    return events;
+  }
+  
+  /**
+   * Internal helper class used for storing the current status and other
+   * auxilliary information associated with a task attempt assigned to
+   * a simulator task tracker.
+   * WARNING: This is a completely different inner class than the one with
+   *          the same name in SimulatorJobTracker.
+   */
+  static class SimulatorTaskInProgress {
+  
+    /**
+     * Current status of the task attempt. 
+     * We store the start time, the start time of reduce phases and the
+     * run state of the task in this object.
+     */
+    private TaskStatus taskStatus;
+    
+    /** 
+     * Object storing the run time and the final state of the task attempt.
+     * It is never read directly by the SimulatorTaskTracker.
+     */
+    private TaskAttemptInfo taskAttempInfo;
+
+    /**
+     * Runtime of the user-space code of the task attempt. This is the full
+     * runtime for map tasks, and only that of the REDUCE phase for reduce
+     * tasks.
+     */ 
+    private final long userSpaceRunTime;
+    
+    /** 
+     * Constructs an object by copying most of the fields from a
+     * SimulatorTaskAction.
+     */
+    public SimulatorTaskInProgress(SimulatorLaunchTaskAction action,
+                                   TaskStatus taskStatus, long now) {
+      this.taskStatus = taskStatus;
+      this.taskAttempInfo = action.getTaskAttemptInfo();
+      if (taskStatus.getIsMap()) {
+        this.userSpaceRunTime = taskAttempInfo.getRuntime();
+      } else {
+        this.userSpaceRunTime = 
+          ((ReduceTaskAttemptInfo)taskAttempInfo).getReduceRuntime();
+      }
+    }
+    
+    /**
+     * Returns whether the task attempt is a map. 
+     * 
+     * @return true iff the task attempt is a map
+     */
+    public boolean isMapTask() {
+      return taskStatus.getIsMap();
+    }
+
+    /*
+     * Returns the current status of the task attempt. 
+     *
+     * @return current task status
+     */
+    public TaskStatus getTaskStatus() {
+      return taskStatus;
+    }
+    
+    /** 
+     * Sets the status of the task attempt.
+     *
+     * @param status the new task status
+     */
+    public void setTaskStatus(TaskStatus status) {
+      this.taskStatus = status;
+    }
+        
+    /** 
+     * Returns the final state of the completed task.
+     * 
+     * @return the final state of the completed task; 
+     *        it is either State.SUCCEEDED or State.FAILED
+     */
+    public State getFinalRunState() {
+      return taskAttempInfo.getRunState();
+    }
+    
+    /**
+     * Gets the time spent in the user space code of the task attempt.
+     * This is the full runtime for map tasks, and only that of the REDUCE 
+     * phase for reduce tasks.
+     *
+     * @return the user space runtime 
+     */
+    public long getUserSpaceRunTime() {
+      return userSpaceRunTime;
+    }          
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTrackerStatus.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTrackerStatus.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTrackerStatus.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.List;
+
+/**
+ * This class only exists to pass the current simulation time to the 
+ * JobTracker in the heartbeat() call.
+ */
+class SimulatorTaskTrackerStatus extends TaskTrackerStatus {
+  /**
+   * The virtual, simulation time, when the hearbeat() call transmitting 
+   * this TaskTrackerSatus occured. 
+   */
+  private final long currentSimulationTime;
+
+  /**
+   * Constructs a SimulatorTaskTrackerStatus object. All parameters are 
+   * the same as in {@link TaskTrackerStatus}. The only extra is
+   * @param currentSimulationTime the current time in the simulation when the 
+   *                              heartbeat() call transmitting this 
+   *                              TaskTrackerStatus occured.
+   */ 
+  public SimulatorTaskTrackerStatus(String trackerName, String host, 
+                                    int httpPort, List<TaskStatus> taskReports, 
+                                    int failures, int maxMapTasks,
+                                    int maxReduceTasks,
+                                    long currentSimulationTime) {
+    super(trackerName, host, httpPort, taskReports,
+          failures, maxMapTasks, maxReduceTasks);
+    this.currentSimulationTime = currentSimulationTime;
+  }
+
+  /** 
+   * Returns the current time in the simulation.
+   */
+  public long getCurrentSimulationTime() {
+    return currentSimulationTime;
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/TaskAttemptCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/TaskAttemptCompletionEvent.java?rev=818674&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/TaskAttemptCompletionEvent.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/TaskAttemptCompletionEvent.java Fri Sep 25 00:25:28 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+/**
+ * This class is used by SimulatorTaskTrackers for signaling themselves when
+ * a task attempt finishes. The rationale for having this redundant event sent 
+ * is that (1) this way it is possible to monitor all task completion events
+ * centrally from the Engine. (2) TTs used to call heartbeat() of the job 
+ * tracker right after the task completed (so called "crazy heartbeats") not 
+ * waiting for the heartbeat interval. If we wanted to simulate that we need 
+ * to decouple task completion monitoring from periodic heartbeats. 
+ */
+public class TaskAttemptCompletionEvent extends SimulatorEvent {
+
+  /** The final status of the completed task. */  
+  private final TaskStatus status;
+
+  /**
+   * Constructs a task completion event from a task status.
+   * @param listener the SimulatorTaskTracker the task is running on
+   * @param status the final status of the completed task. Precondition: 
+   *                status.getRunState() must be either State.SUCCEEDED or 
+   *                State.FAILED.
+   */
+  public TaskAttemptCompletionEvent(SimulatorEventListener listener,
+                                    TaskStatus status) {
+    super(listener, status.getFinishTime());
+    this.status = status;
+  }
+  
+  /** Returns the final status of the task. */
+  public TaskStatus getStatus() {
+    return status;
+  }
+  
+  @Override
+  protected String realToString() {
+    return super.realToString() + ", taskID=" + status.getTaskID();
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/data/19-jobs.topology.json.gz
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/data/19-jobs.topology.json.gz?rev=818674&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/data/19-jobs.topology.json.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/data/19-jobs.trace.json.gz
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/data/19-jobs.trace.json.gz?rev=818674&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/data/19-jobs.trace.json.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream



Mime
View raw message