hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r814122 [2/2] - in /hadoop/mapreduce/trunk: ./ src/test/mapred/org/apache/hadoop/tools/rumen/ src/test/tools/data/rumen/zombie/ src/tools/org/apache/hadoop/tools/rumen/
Date Sat, 12 Sep 2009 09:31:36 GMT
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RackNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RackNode.java?rev=814122&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RackNode.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RackNode.java Sat Sep 12
09:31:34 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Set;
+
+/**
+ * {@link RackNode} represents a rack node in the cluster topology.
+ */
+public final class RackNode extends Node {
+  public RackNode(String name, int level) {
+    // Hack: ensuring rack name starts with "/".
+    super(name.startsWith("/") ? name : "/" + name, level);
+  }
+  
+  @Override
+  public synchronized boolean addChild(Node child) {
+    if (!(child instanceof MachineNode)) {
+      throw new IllegalArgumentException(
+          "Only MachineNode can be added to RackNode");
+    }
+    return super.addChild(child);
+  }
+  
+  /**
+   * Get the machine nodes that belong to the rack.
+   * @return The machine nodes that belong to the rack.
+   */
+  @SuppressWarnings({ "cast", "unchecked" })
+  public Set<MachineNode> getMachinesInRack() {
+    return (Set<MachineNode>)(Set)getChildren();
+  }
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java
(original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java
Sat Sep 12 09:31:34 2009
@@ -19,8 +19,11 @@
 
 import org.apache.hadoop.mapred.TaskStatus.State;
 
+/**
+ * {@link ReduceTaskAttemptInfo} represents the information with regard to a
+ * reduce task attempt.
+ */
 public class ReduceTaskAttemptInfo extends TaskAttemptInfo {
-
   private long shuffleTime;
   private long mergeTime;
   private long reduceTime;
@@ -62,8 +65,6 @@
 
   @Override
   public long getRuntime() {
-    // XXX Does this make sense? Should TaskAttemptInfo.getRuntime() return
-    // total reduce runtime, rather than reduce phase time?
     return (getShuffleRuntime() + getMergeRuntime() + getReduceRuntime());
   }
 

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java Sat Sep 12
09:31:34 2009
@@ -18,7 +18,6 @@
 package org.apache.hadoop.tools.rumen;
 
 public class TaskInfo {
-
   private final long bytesIn;
   private final int recsIn;
   private final long bytesOut;

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TreePath.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TreePath.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TreePath.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TreePath.java Sat Sep 12
09:31:34 2009
@@ -48,6 +48,7 @@
     this.index = index;
   }
 
+  @Override
   public String toString() {
     String mySegment = fieldName + (index == -1 ? "" : ("[" + index + "]"));
 

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java?rev=814122&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java Sat
Sep 12 09:31:34 2009
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * {@link ZombieCluster} rebuilds the cluster topology using the information
+ * obtained from job history logs.
+ */
+public class ZombieCluster extends AbstractClusterStory {
+  private Node root;
+
+  /**
+   * Construct a homogeneous cluster. We assume that the leaves on the topology
+   * are {@link MachineNode}s, and the parents of {@link MachineNode}s are
+   * {@link RackNode}s. We also expect all leaf nodes are on the same level.
+   * 
+   * @param topology
+   *          The network topology.
+   * @param defaultNode
+   *          The default node setting.
+   */
+  ZombieCluster(LoggedNetworkTopology topology, MachineNode defaultNode) {
+    buildCluster(topology, defaultNode);
+  }
+
+  /**
+   * Construct a homogeneous cluster. We assume that the leaves on the topology
+   * are {@link MachineNode}s, and the parents of {@link MachineNode}s are
+   * {@link RackNode}s. We also expect all leaf nodes are on the same level.
+   * 
+   * @param path Path to the JSON-encoded topology file.
+   * @param conf
+   * @param defaultNode
+   *          The default node setting.
+   * @throws IOException 
+   */
+  public ZombieCluster(Path path, MachineNode defaultNode, Configuration conf) throws IOException
{
+    this(new ClusterTopologyReader(path, conf).get(), defaultNode);
+  }
+  
+  /**
+   * Construct a homogeneous cluster. We assume that the leaves on the topology
+   * are {@link MachineNode}s, and the parents of {@link MachineNode}s are
+   * {@link RackNode}s. We also expect all leaf nodes are on the same level.
+   * 
+   * @param input The input stream for the JSON-encoded topology file.
+   * @param defaultNode
+   *          The default node setting.
+   * @throws IOException 
+   */
+  public ZombieCluster(InputStream input, MachineNode defaultNode) throws IOException {
+    this(new ClusterTopologyReader(input).get(), defaultNode);
+  }
+
+  @Override
+  public Node getClusterTopology() {
+    return root;
+  }
+
+  private final void buildCluster(LoggedNetworkTopology topology,
+      MachineNode defaultNode) {
+    Map<LoggedNetworkTopology, Integer> levelMapping = 
+      new IdentityHashMap<LoggedNetworkTopology, Integer>();
+    Deque<LoggedNetworkTopology> unvisited = 
+      new ArrayDeque<LoggedNetworkTopology>();
+    unvisited.add(topology);
+    levelMapping.put(topology, 0);
+    
+    // building levelMapping and determine leafLevel
+    int leafLevel = -1; // -1 means leafLevel unknown.
+    for (LoggedNetworkTopology n = unvisited.poll(); n != null; 
+      n = unvisited.poll()) {
+      int level = levelMapping.get(n);
+      List<LoggedNetworkTopology> children = n.getChildren();
+      if (children == null || children.isEmpty()) {
+        if (leafLevel == -1) {
+          leafLevel = level;
+        } else if (leafLevel != level) {
+          throw new IllegalArgumentException(
+              "Leaf nodes are not on the same level");
+        }
+      } else {
+        for (LoggedNetworkTopology child : children) {
+          levelMapping.put(child, level + 1);
+          unvisited.addFirst(child);
+        }
+      }
+    }
+
+    /**
+     * A second-pass dfs traverse of topology tree. path[i] contains the parent
+     * of the node at level i+1.
+     */
+    Node[] path = new Node[leafLevel];
+    unvisited.add(topology);
+    for (LoggedNetworkTopology n = unvisited.poll(); n != null; 
+      n = unvisited.poll()) {
+      int level = levelMapping.get(n);
+      Node current;
+      if (level == leafLevel) { // a machine node
+        MachineNode.Builder builder = new MachineNode.Builder(n.getName(), level);
+        if (defaultNode != null) {
+          builder.cloneFrom(defaultNode);
+        }
+        current = builder.build();
+      } else {
+        current = (level == leafLevel - 1) 
+          ? new RackNode(n.getName(), level) : 
+            new Node(n.getName(), level);
+        path[level] = current;
+        // Add all children to the front of the queue.
+        for (LoggedNetworkTopology child : n.getChildren()) {
+          unvisited.addFirst(child);
+        }
+      }
+      if (level != 0) {
+        path[level - 1].addChild(current);
+      }
+    }
+
+    root = path[0];
+  }
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java Sat Sep
12 09:31:34 2009
@@ -23,6 +23,8 @@
 import java.util.Random;
 import java.util.HashMap;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapred.JobConf;
@@ -38,29 +40,25 @@
 /**
  * {@link ZombieJob} is a layer above {@link LoggedJob} raw JSON objects.
  * 
- * Each <code>ZombieJob</code> object represents a job in job history. For
- * everything that exists in job history, contents are returned unchanged
- * faithfully. To get input splits of a non-exist task, a non-exist task
- * attempt, or an ill-formed task attempt, proper objects are made up from
- * statistical sketch.
+ * Each {@link ZombieJob} object represents a job in job history. For everything
+ * that exists in job history, contents are returned unchanged faithfully. To
+ * get input splits of a non-exist task, a non-exist task attempt, or an
+ * ill-formed task attempt, proper objects are made up from statistical
+ * sketches.
  */
+@SuppressWarnings("deprecation")
 public class ZombieJob implements JobStory {
-
+  static final Log LOG = LogFactory.getLog(ZombieJob.class);
   private final LoggedJob job;
-
-  private final Map<TaskID, LoggedTask> loggedTaskMap = new HashMap<TaskID, LoggedTask>();
-
-  private final Map<TaskAttemptID, LoggedTaskAttempt> loggedTaskAttemptMap = new HashMap<TaskAttemptID,
LoggedTaskAttempt>();
-
+  private Map<TaskID, LoggedTask> loggedTaskMap;
+  private Map<TaskAttemptID, LoggedTaskAttempt> loggedTaskAttemptMap;
   private final Random random;
-
-  private FileSplit[] splits;
-
-  private final LoggedNetworkTopology topology;
+  private InputSplit[] splits;
+  private final ClusterStory cluster;
+  private JobConf jobConf;
 
   // TODO: Fix ZombieJob to initialize this correctly from observed data
   double rackLocalOverNodeLocal = 1.5;
-  // TODO: Fix ZombieJob to initialize this correctly from observed data
   double rackRemoteOverNodeLocal = 3.0;
 
   /**
@@ -69,18 +67,19 @@
    * 
    * @param job
    *          The dead job this ZombieJob instance is based on.
-   * @param topology
-   *          The topology of the network where the dead job ran on.
+   * @param cluster
+   *          The cluster topology where the dead job ran on. This argument can
+   *          be null if we do not have knowledge of the cluster topology.
    * @param seed
    *          Seed for the random number generator for filling in information
    *          not available from the ZombieJob.
    */
-  public ZombieJob(LoggedJob job, LoggedNetworkTopology topology, long seed) {
-    if (job == null || topology == null) {
-      throw new IllegalArgumentException("job or topology is null");
+  public ZombieJob(LoggedJob job, ClusterStory cluster, long seed) {
+    if (job == null) {
+      throw new IllegalArgumentException("job is null");
     }
     this.job = job;
-    this.topology = topology;
+    this.cluster = cluster;
     random = new Random(seed);
   }
 
@@ -90,14 +89,15 @@
    * 
    * @param job
    *          The dead job this ZombieJob instance is based on.
-   * @param topology
-   *          The topology of the network where the dead job ran on.
+   * @param cluster
+   *          The cluster topology where the dead job ran on. This argument can
+   *          be null if we do not have knowledge of the cluster topology.
    */
-  public ZombieJob(LoggedJob job, LoggedNetworkTopology topology) {
-    this(job, topology, System.nanoTime());
+  public ZombieJob(LoggedJob job, ClusterStory cluster) {
+    this(job, cluster, System.nanoTime());
   }
 
-  private State convertState(JobHistory.Values status) {
+  private static State convertState(JobHistory.Values status) {
     if (status == JobHistory.Values.SUCCESS) {
       return State.SUCCEEDED;
     } else if (status == JobHistory.Values.FAILED) {
@@ -109,70 +109,85 @@
     }
   }
 
-  public JobConf getJobConf() {
-
-    // TODO : add more to jobConf ?
-    JobConf jobConf = new JobConf();
-    jobConf.setJobName(getName());
-    jobConf.setUser(getUser());
-    jobConf.setNumMapTasks(getNumberMaps());
-    jobConf.setNumReduceTasks(getNumberReduces());
+  @Override
+  public synchronized JobConf getJobConf() {
+    if (jobConf == null) {
+      // TODO : add more to jobConf ?
+      jobConf = new JobConf();
+      jobConf.setJobName(getName());
+      jobConf.setUser(getUser());
+      jobConf.setNumMapTasks(getNumberMaps());
+      jobConf.setNumReduceTasks(getNumberReduces());
+    }
     return jobConf;
-
   }
-
+  
   @Override
   public InputSplit[] getInputSplits() {
     if (splits == null) {
-      List<FileSplit> splitsList = new ArrayList<FileSplit>();
+      List<InputSplit> splitsList = new ArrayList<InputSplit>();
       Path emptyPath = new Path("/");
+      int totalHosts = 0; // use to determine avg # of hosts per split.
       for (LoggedTask mapTask : job.getMapTasks()) {
-        ArrayList<LoggedLocation> locations = mapTask.getPreferredLocations();
-        String[] hosts = new String[locations.size()];
-        int i = 0;
-        for (LoggedLocation location : locations) {
-          List<String> layers = location.getLayers();
-          if (layers.size() == 0) {
-            continue;
+        List<LoggedLocation> locations = mapTask.getPreferredLocations();
+        List<String> hostList = new ArrayList<String>();
+        if (locations != null) {
+          for (LoggedLocation location : locations) {
+            List<String> layers = location.getLayers();
+            if (layers.size() == 0) {
+              LOG.warn("Bad location layer format for task "+mapTask.getTaskID());
+              continue;
+            }
+            String host = layers.get(layers.size() - 1);
+            if (host == null) {
+              LOG.warn("Bad location layer format for task "+mapTask.getTaskID() + ": " +
layers);
+              continue;
+            }
+            hostList.add(host);
           }
-          String host = "";
-          /*
-           * for (String layer: location.getLayers()) { host +=
-           * (File.separatorChar + layer); }
-           */
-          host = layers.get(layers.size() - 1);
-          hosts[i++] = host;
-        }
-        long mapInputBytes = mapTask.getInputBytes();
-        splitsList.add(new FileSplit(emptyPath, 0,
-            ((mapInputBytes > 0) ? mapInputBytes : 0), hosts));
+        }
+        String[] hosts = hostList.toArray(new String[hostList.size()]);
+        totalHosts += hosts.length;
+        long mapInputBytes = getTaskInfo(mapTask).getInputBytes();
+        if (mapInputBytes < 0) {
+          LOG.warn("InputBytes for task "+mapTask.getTaskID()+" is not defined.");
+          mapInputBytes = 0;
+        }
+       
+        splitsList.add(new FileSplit(emptyPath, 0, mapInputBytes, hosts));
       }
 
       // If not all map tasks are in job trace, should make up some splits
       // for missing map tasks.
       int totalMaps = job.getTotalMaps();
+      if (totalMaps < splitsList.size()) {
+        LOG.warn("TotalMaps for job " + job.getJobID()
+            + " is less than the total number of map task descriptions ("
+            + totalMaps + "<" + splitsList.size() + ").");
+      }
+
+      int avgHostPerSplit;
+      if (splitsList.size() == 0) avgHostPerSplit = 3;
+      else {
+        avgHostPerSplit = totalHosts / splitsList.size();
+        if (avgHostPerSplit == 0) avgHostPerSplit = 3;
+      }
+      
       for (int i = splitsList.size(); i < totalMaps; i++) {
-        ArrayList<String> hosts = new ArrayList<String>();
-        // assume replication factor is 3.
-        while (hosts.size() < 3) {
-          List<LoggedNetworkTopology> racks = topology.getChildren();
-          LoggedNetworkTopology rack = racks.get(random.nextInt(racks.size()));
-          List<LoggedNetworkTopology> nodes = rack.getChildren();
-          String host = nodes.get(random.nextInt(nodes.size())).getName();
-          if (!hosts.contains(host)) {
-            hosts.add(host);
+        if (cluster == null) {
+          splitsList.add(new FileSplit(emptyPath, 0, 0, new String[0]));
+        } else {
+          MachineNode[] mNodes = cluster.getRandomMachines(avgHostPerSplit);
+          String[] hosts = new String[mNodes.length];
+          for (int j=0; j<hosts.length; ++j) {
+            hosts[j]=mNodes[j].getName();
           }
+          // TODO set size of a split to 0 now.
+          splitsList.add(new FileSplit(emptyPath, 0, 0, hosts));
         }
-        String[] hostsArray = hosts.toArray(new String[hosts.size()]);
-        // TODO set size of a split to 0 now.
-        splitsList.add(new FileSplit(emptyPath, 0, 0, hostsArray));
-      }
-
-      if (splitsList.size() == 0) {
-        System.err.println(job.getMapTasks().size());
       }
 
-      splits = splitsList.toArray(new FileSplit[splitsList.size()]);
+      splits = splitsList.toArray(new InputSplit[splitsList.size()]);
     }
     return splits;
   }
@@ -181,7 +196,7 @@
   public String getName() {
     String jobName = job.getJobName();
     if (jobName == null) {
-      return "";
+      return "(name unknown)";
     } else {
       return jobName;
     }
@@ -192,14 +207,27 @@
     return JobID.forName(getLoggedJob().getJobID());
   }
 
+  private int sanitizeValue(int oldVal, int defaultVal, String name, String id) {
+    if (oldVal == -1) {
+      LOG.warn(name +" not defined for "+id);
+      return defaultVal;
+    }
+    return oldVal;
+  }
+  
   @Override
   public int getNumberMaps() {
-    return job.getTotalMaps();
+    return sanitizeValue(job.getTotalMaps(), 0, "NumberMaps", job.getJobID());
   }
 
   @Override
   public int getNumberReduces() {
-    return job.getTotalReduces();
+    return sanitizeValue(job.getTotalReduces(), 0, "NumberReduces", job.getJobID());
+  }
+
+  @Override
+  public JobHistory.Values getOutcome() {
+    return job.getOutcome();
   }
 
   @Override
@@ -235,34 +263,78 @@
         taskId.getId(), attemptId.getId());
   }
 
+
+  private LoggedTask sanitizeLoggedTask(LoggedTask task) {
+    if (task == null) return null;
+    if (task.getTaskType() == null) {
+      LOG.warn("Task " + task.getTaskID() + " has nulll TaskType");
+      return null;
+    }
+    if (task.getTaskStatus() == null) {
+      LOG.warn("Task " + task.getTaskID() + " has nulll TaskStatus");
+      return null;
+    }
+    return task;
+  }
+  
+  private LoggedTaskAttempt sanitizeLoggedTaskAttempt(LoggedTaskAttempt attempt) {
+    if (attempt == null) return null;
+    if (attempt.getResult() == null) {
+      LOG.warn("TaskAttempt " + attempt.getResult() + " has nulll Result");
+      return null;
+    }
+    
+    return attempt;
+  }
+  
   /**
    * Build task mapping and task attempt mapping, to be later used to find
    * information of a particular {@link TaskID} or {@link TaskAttemptID}.
    */
-  private void buildMaps() {
-    for (LoggedTask map : job.getMapTasks()) {
-      loggedTaskMap.put(maskTaskID(TaskID.forName(map.taskID)), map);
-
-      for (LoggedTaskAttempt mapAttempt : map.getAttempts()) {
-        TaskAttemptID id = TaskAttemptID.forName(mapAttempt.getAttemptID());
-        loggedTaskAttemptMap.put(maskAttemptID(id), mapAttempt);
+  private synchronized void buildMaps() {
+    if (loggedTaskMap == null) {
+      loggedTaskMap = new HashMap<TaskID, LoggedTask>();
+      loggedTaskAttemptMap = new HashMap<TaskAttemptID, LoggedTaskAttempt>();
+      
+      for (LoggedTask map : job.getMapTasks()) {
+        map = sanitizeLoggedTask(map);
+        if (map != null) {
+          loggedTaskMap.put(maskTaskID(TaskID.forName(map.taskID)), map);
+
+          for (LoggedTaskAttempt mapAttempt : map.getAttempts()) {
+            mapAttempt = sanitizeLoggedTaskAttempt(mapAttempt);
+            if (mapAttempt != null) {
+              TaskAttemptID id = TaskAttemptID.forName(mapAttempt
+                  .getAttemptID());
+              loggedTaskAttemptMap.put(maskAttemptID(id), mapAttempt);
+            }
+          }
+        }
       }
-    }
-    for (LoggedTask reduce : job.getReduceTasks()) {
-      loggedTaskMap.put(maskTaskID(TaskID.forName(reduce.taskID)), reduce);
-
-      for (LoggedTaskAttempt reduceAttempt : reduce.getAttempts()) {
-        TaskAttemptID id = TaskAttemptID.forName(reduceAttempt.getAttemptID());
-        loggedTaskAttemptMap.put(maskAttemptID(id), reduceAttempt);
+      for (LoggedTask reduce : job.getReduceTasks()) {
+        reduce = sanitizeLoggedTask(reduce);
+        if (reduce != null) {
+          loggedTaskMap.put(maskTaskID(TaskID.forName(reduce.taskID)), reduce);
+
+          for (LoggedTaskAttempt reduceAttempt : reduce.getAttempts()) {
+            reduceAttempt = sanitizeLoggedTaskAttempt(reduceAttempt);
+            if (reduceAttempt != null) {
+              TaskAttemptID id = TaskAttemptID.forName(reduceAttempt
+                  .getAttemptID());
+              loggedTaskAttemptMap.put(maskAttemptID(id), reduceAttempt);
+            }
+          }
+        }
       }
-    }
 
-    // do not care about "other" tasks, "setup" or "clean"
+      // TODO: do not care about "other" tasks, "setup" or "clean"
+    }
   }
 
   @Override
   public String getUser() {
-    return job.getUser();
+    String retval = job.getUser();
+    return (retval==null)?"(unknown)":retval;
   }
 
   /**
@@ -293,7 +365,8 @@
     if (loggedTask == null) {
       // TODO insert parameters
       TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0);
-      return makeUpInfo(taskType, taskInfo, taskAttemptNumber, locality);
+      return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
+          taskNumber, locality);
     }
 
     LoggedTaskAttempt loggedAttempt = getLoggedTaskAttempt(taskType,
@@ -301,18 +374,16 @@
     if (loggedAttempt == null) {
       // Task exists, but attempt is missing.
       TaskInfo taskInfo = getTaskInfo(loggedTask);
-      return makeUpInfo(taskType, taskInfo, taskAttemptNumber, locality);
+      return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
+          taskNumber, locality);
     } else {
-      // Task and TaskAttempt both exist.
-      try {
-        return getInfo(loggedTask, loggedAttempt);
-      } catch (IllegalArgumentException e) {
-        if (e.getMessage().startsWith("status cannot be")) {
-          TaskInfo taskInfo = getTaskInfo(loggedTask);
-          return makeUpInfo(taskType, taskInfo, taskAttemptNumber, locality);
-        } else {
-          throw e;
-        }
+      // TODO should we handle killed attempts later?
+      if (loggedAttempt.getResult()==JobHistory.Values.KILLED) {
+        TaskInfo taskInfo = getTaskInfo(loggedTask);
+        return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
+            taskNumber, locality);
+      } else {
+        return getTaskAttemptInfo(loggedTask, loggedAttempt);
       }
     }
   }
@@ -341,7 +412,8 @@
     if (loggedTask == null) {
       // TODO insert parameters
       TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0);
-      return makeUpInfo(taskType, taskInfo, taskAttemptNumber, locality);
+      return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
+          taskNumber, locality);
     }
 
     LoggedTaskAttempt loggedAttempt = getLoggedTaskAttempt(taskType,
@@ -349,21 +421,25 @@
     if (loggedAttempt == null) {
       // Task exists, but attempt is missing.
       TaskInfo taskInfo = getTaskInfo(loggedTask);
-      return makeUpInfo(taskType, taskInfo, taskAttemptNumber, locality);
+      return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
+          taskNumber, locality);
     } else {
       // Task and TaskAttempt both exist.
       if (loggedAttempt.getResult() == Values.KILLED) {
         TaskInfo taskInfo = getTaskInfo(loggedTask);
-        return makeUpInfo(taskType, taskInfo, taskAttemptNumber, locality);
+        return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
+            taskNumber, locality);
       } else if (loggedAttempt.getResult() == Values.FAILED) {
-        // FAILED attempt is not affected by locality
-        // XXX however, made-up FAILED attempts ARE affected by locality, since
-        // statistics are present for attempts of different locality.
-        return getInfo(loggedTask, loggedAttempt);
+        /**
+         * FAILED attempt is not affected by locality however, made-up FAILED
+         * attempts ARE affected by locality, since statistics are present for
+         * attempts of different locality.
+         */
+        return getTaskAttemptInfo(loggedTask, loggedAttempt);
       } else if (loggedAttempt.getResult() == Values.SUCCESS) {
         int loggedLocality = getLocality(loggedTask, loggedAttempt);
         if (locality == loggedLocality) {
-          return getInfo(loggedTask, loggedAttempt);
+          return getTaskAttemptInfo(loggedTask, loggedAttempt);
         } else {
           // attempt succeeded in trace. It is scheduled in simulation with
           // a different locality.
@@ -378,6 +454,15 @@
     }
   }
 
+  private long sanitizeTaskRuntime(long time, String id) {
+    if (time < 0) {
+      LOG.warn("Negative running time for task "+id+": "+time);
+      return 100L; // set default to 100ms.
+    }
+    return time;
+  }
+  
+  @SuppressWarnings("hiding") 
   private TaskAttemptInfo scaleInfo(LoggedTask loggedTask,
       LoggedTaskAttempt loggedAttempt, int locality, int loggedLocality,
       double rackLocalOverNodeLocal, double rackRemoteOverNodeLocal) {
@@ -393,53 +478,49 @@
       } else {
         taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime();
       }
+      taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID());
       taskTime *= scaleFactor;
       return new MapTaskAttemptInfo(state, taskInfo, taskTime);
-    } else if (loggedTask.getTaskType() == Values.REDUCE) {
-      /*
-       * long shuffleTime = (loggedAttempt.getShuffleFinished() -
-       * loggedAttempt.getStartTime()); long sortTime =
-       * (loggedAttempt.getSortFinished() - loggedAttempt.getShuffleFinished());
-       * long reduceTime = (loggedAttempt.getFinishTime() -
-       * loggedAttempt.getSortFinished()); reduceTime *= scaleFactor; return new
-       * ReduceTaskAttemptInfo(convertState(loggedAttempt.getResult()),
-       * taskInfo, shuffleTime, sortTime, reduceTime);
-       */
-      throw new IllegalArgumentException("taskType cannot be REDUCE");
     } else {
-      throw new IllegalArgumentException("taskType is neither MAP nor REDUCE: "
+      throw new IllegalArgumentException("taskType can only be MAP: "
           + loggedTask.getTaskType());
     }
   }
 
   private int getLocality(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt) {
-    ParsedHost host = getParsedHost(loggedAttempt.getHostName());
-    int distance = 2;
-    for (LoggedLocation location : loggedTask.getPreferredLocations()) {
-      ParsedHost dataNode = new ParsedHost(location);
-      distance = Math.min(distance, host.distance(dataNode));
+    int distance = cluster.getMaximumDistance();
+    String rackHostName = loggedAttempt.getHostName();
+    if (rackHostName == null) return distance;
+    MachineNode mn = getMachineNode(rackHostName);
+    if (mn == null) return distance;
+    List<LoggedLocation> locations = loggedTask.getPreferredLocations();
+    if (locations != null) {
+      for (LoggedLocation location : locations) {
+        List<String> layers = location.getLayers();
+        if ((layers == null) || (layers.isEmpty())) {
+          continue;
+        }
+        String dataNodeName = layers.get(layers.size()-1);
+        MachineNode dataNode = cluster.getMachineByName(dataNodeName);
+        if (dataNode != null) {
+          distance = Math.min(distance, cluster.distance(mn, dataNode));
+        }
+      }
     }
     return distance;
   }
 
-  private ParsedHost getParsedHost(String hostName) {
-    try {
-      return new ParsedHost(hostName);
-    } catch (IllegalArgumentException e) {
-      // look up the host in topology, return a correct hostname
-      for (LoggedNetworkTopology rack : topology.getChildren()) {
-        for (LoggedNetworkTopology host : rack.getChildren()) {
-          if (host.getName().equals(hostName)) {
-            return new ParsedHost(
-                new String[] { rack.getName(), host.getName() });
-          }
-        }
-      }
-      return new ParsedHost(new String[] { "default-rack", hostName });
+  private MachineNode getMachineNode(String rackHostName) {
+    ParsedHost parsedHost = ParsedHost.parse(rackHostName);
+    String hostName = (parsedHost == null) ? rackHostName 
+                                           : parsedHost.getNodeName();
+    if (hostName == null) {
+      return null;
     }
+    return (cluster == null) ? null : cluster.getMachineByName(hostName);
   }
 
-  private TaskAttemptInfo getInfo(LoggedTask loggedTask,
+  private TaskAttemptInfo getTaskAttemptInfo(LoggedTask loggedTask,
       LoggedTaskAttempt loggedAttempt) {
     TaskInfo taskInfo = getTaskInfo(loggedTask);
     State state = convertState(loggedAttempt.getResult());
@@ -451,10 +532,7 @@
       } else {
         taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime();
       }
-      if (taskTime < 0) {
-        throw new IllegalStateException(loggedAttempt.getAttemptID()
-            + " taskTime<0: " + taskTime);
-      }
+      taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID());
       return new MapTaskAttemptInfo(state, taskInfo, taskTime);
     } else if (loggedTask.getTaskType() == Values.REDUCE) {
       long startTime = loggedAttempt.getStartTime();
@@ -477,30 +555,72 @@
         long shuffleTime = shuffleDone - startTime;
         long mergeTime = mergeDone - shuffleDone;
         long reduceTime = finishTime - mergeDone;
-        if (reduceTime < 0) {
-          throw new IllegalStateException(loggedAttempt.getAttemptID()
-              + " reduceTime<0: " + reduceTime);
-        }
+        reduceTime = sanitizeTaskRuntime(reduceTime, loggedAttempt.getAttemptID());
+        
         return new ReduceTaskAttemptInfo(state, taskInfo, shuffleTime,
             mergeTime, reduceTime);
       }
     } else {
-      throw new IllegalArgumentException("taskType is neither MAP nor REDUCE: "
+      throw new IllegalArgumentException("taskType for "
+          + loggedTask.getTaskID() + " is neither MAP nor REDUCE: "
           + loggedTask.getTaskType());
     }
   }
 
   private TaskInfo getTaskInfo(LoggedTask loggedTask) {
-    // TODO insert maxMemory
-    TaskInfo taskInfo = new TaskInfo(loggedTask.getInputBytes(),
-        (int) (loggedTask.getInputBytes() / loggedTask.getInputRecords()),
-        loggedTask.getOutputBytes(),
-        (int) (loggedTask.getOutputBytes() / loggedTask.getOutputRecords()), 0);
+    List<LoggedTaskAttempt> attempts = loggedTask.getAttempts();
+
+    long inputBytes = -1;
+    long inputRecords = -1;
+    long outputBytes = -1;
+    long outputRecords = -1;
+    long heapMegabytes = -1;
+
+    JobHistory.Values type = loggedTask.getTaskType();
+    if ((type != JobHistory.Values.MAP) && (type != JobHistory.Values.REDUCE)) {
+      throw new IllegalArgumentException(
+          "getTaskInfo only supports MAP or REDUCE tasks: " + type.toString());
+    }
+
+    for (LoggedTaskAttempt attempt : attempts) {
+      attempt = sanitizeLoggedTaskAttempt(attempt);
+      // ignore bad attempts or unsuccessful attempts.
+      if ((attempt == null)
+          || (attempt.getResult() != JobHistory.Values.SUCCESS)) {
+        continue;
+      }
+
+      if (type == JobHistory.Values.MAP) {
+        inputBytes = attempt.getHdfsBytesRead();
+        inputRecords = attempt.getMapInputRecords();
+        outputBytes = attempt.getMapOutputBytes();
+        outputRecords = attempt.getMapOutputRecords();
+        heapMegabytes = (job.getJobMapMB() > 0) ? job.getJobMapMB() 
+                                                : job.getHeapMegabytes();
+      } else {
+        inputBytes = attempt.getReduceShuffleBytes();
+        inputRecords = attempt.getReduceInputRecords();
+        outputBytes = attempt.getHdfsBytesWritten();
+        outputRecords = attempt.getReduceOutputRecords();
+        heapMegabytes = (job.getJobReduceMB() > 0) ? job.getJobReduceMB() 
+                                                   : job.getHeapMegabytes();
+      }
+      break;
+    }
+
+    TaskInfo taskInfo = new TaskInfo(inputBytes, (int) inputRecords,
+        outputBytes, (int) outputRecords, (int) heapMegabytes);
     return taskInfo;
   }
 
-  private TaskAttemptInfo makeUpInfo(TaskType taskType, TaskInfo taskInfo,
-      int taskAttemptNumber, int locality) {
+  private TaskAttemptID makeTaskAttemptID(TaskType taskType, int taskNumber,
+      int taskAttemptNumber) {
+    return new TaskAttemptID(new TaskID(JobID.forName(job.getJobID()),
+        taskType, taskNumber), taskAttemptNumber);
+  }
+  
+  private TaskAttemptInfo makeUpTaskAttemptInfo(TaskType taskType, TaskInfo taskInfo,
+      int taskAttemptNumber, int taskNumber, int locality) {
     if (taskType == TaskType.MAP) {
       State state = State.SUCCEEDED;
       long runtime = 0;
@@ -508,7 +628,8 @@
       // make up state
       state = makeUpState(taskAttemptNumber, job.getMapperTriesToSucceed());
       runtime = makeUpMapRuntime(state, locality);
-
+      runtime = sanitizeTaskRuntime(runtime, makeTaskAttemptID(taskType,
+          taskNumber, taskAttemptNumber).toString());
       TaskAttemptInfo tai = new MapTaskAttemptInfo(state, taskInfo, runtime);
       return tai;
     } else if (taskType == TaskType.REDUCE) {
@@ -538,7 +659,6 @@
       }
     }
     return 0;
-    // throw new IllegalStateException(" made-up reduceTime<0: " + reduceTime);
   }
 
   private long doMakeUpReduceRuntime(State state) {
@@ -565,47 +685,46 @@
     long runtime;
     // make up runtime
     if (state == State.SUCCEEDED) {
-      // XXX MapCDFs is a ArrayList of 4 possible groups: distance=0, 1, 2, and
-      // the last group is "distance cannot be determined". All pig jobs
-      // would have only the 4th group, and pig tasks usually do not have
-      // any locality, so this group should count as "distance=2".
-      // However, setup/cleanup tasks are also counted in the 4th group.
-      // These tasks do not make sense.
-      try {
-        runtime = makeUpRuntime(job.getSuccessfulMapAttemptCDFs().get(locality));
-      } catch (IllegalArgumentException e) {
-        if (e.getMessage() == "no value to use to make up runtime") {
-          runtime = makeUpRuntime(job.getSuccessfulMapAttemptCDFs());
-        } else {
-          throw e;
-        }
+      /**
+       * MapCDFs is a ArrayList of 4 possible groups: distance=0, 1, 2, and
+       * the last group is "distance cannot be determined". All pig jobs would
+       * have only the 4th group, and pig tasks usually do not have any
+       * locality, so this group should count as "distance=2". However,
+       * setup/cleanup tasks are also counted in the 4th group. These tasks do
+       * not make sense.
+       */
+      runtime = makeUpRuntime(job.getSuccessfulMapAttemptCDFs().get(locality));
+      if (runtime < 0) {
+        runtime = makeUpRuntime(job.getSuccessfulMapAttemptCDFs());
       }
     } else if (state == State.FAILED) {
-      try {
-        runtime = makeUpRuntime(job.getFailedMapAttemptCDFs().get(locality));
-      } catch (IllegalArgumentException e) {
-        if (e.getMessage() == "no value to use to make up runtime") {
-          runtime = makeUpRuntime(job.getFailedMapAttemptCDFs());
-        } else {
-          throw e;
-        }
-      }
+      runtime = makeUpRuntime(job.getFailedMapAttemptCDFs().get(locality));
+      if (runtime < 0) {
+        runtime = makeUpRuntime(job.getFailedMapAttemptCDFs());
+      }  
     } else {
       throw new IllegalArgumentException(
           "state is neither SUCCEEDED nor FAILED: " + state);
     }
+    
     return runtime;
   }
 
+  /**
+   * Perform a weighted random selection on a list of CDFs, and produce a random
+   * variable using the selected CDF.
+   * 
+   * @param mapAttemptCDFs
+   *          A list of CDFs for the distribution of runtime for the 1st, 2nd,
+   *          ... map attempts for the job.
+   */
   private long makeUpRuntime(ArrayList<LoggedDiscreteCDF> mapAttemptCDFs) {
     int total = 0;
     for (LoggedDiscreteCDF cdf : mapAttemptCDFs) {
       total += cdf.getNumberValues();
     }
     if (total == 0) {
-      // consider no value to use to make up runtime.
-      return 0;
-      // throw new IllegalStateException("No task attempt statistics");
+      return -1;
     }
     int index = random.nextInt(total);
     for (LoggedDiscreteCDF cdf : mapAttemptCDFs) {
@@ -621,11 +740,12 @@
     throw new IllegalStateException("not possible to get here");
   }
 
+  // return -1 if we fail to makeup runtime with available info.
   private long makeUpRuntime(LoggedDiscreteCDF loggedDiscreteCDF) {
     ArrayList<LoggedSingleRelativeRanking> rankings = new ArrayList<LoggedSingleRelativeRanking>(
         loggedDiscreteCDF.getRankings());
     if (loggedDiscreteCDF.getNumberValues() == 0) {
-      throw new IllegalArgumentException("no value to use to make up runtime");
+      return -1;
     }
 
     LoggedSingleRelativeRanking ranking = new LoggedSingleRelativeRanking();
@@ -675,19 +795,16 @@
   }
 
   private LoggedTask getLoggedTask(TaskType taskType, int taskNumber) {
-    if (loggedTaskMap.isEmpty()) {
-      buildMaps();
-    }
+    buildMaps();
     return loggedTaskMap.get(getMaskedTaskID(taskType, taskNumber));
   }
 
   private LoggedTaskAttempt getLoggedTaskAttempt(TaskType taskType,
       int taskNumber, int taskAttemptNumber) {
+    buildMaps();
     TaskAttemptID id = new TaskAttemptID(getMaskedTaskID(taskType, taskNumber),
         taskAttemptNumber);
-    if (loggedTaskAttemptMap.isEmpty()) {
-      buildMaps();
-    }
     return loggedTaskAttemptMap.get(id);
   }
+
 }

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java?rev=814122&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java
(added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java
Sat Sep 12 09:31:34 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Producing {@link JobStory}s from job trace.
+ */
+public class ZombieJobProducer implements JobStoryProducer {
+  private final JobTraceReader reader;
+  private final ZombieCluster cluster;
+
+  private ZombieJobProducer(JobTraceReader reader, ZombieCluster cluster) {
+    this.reader = reader;
+    this.cluster = cluster;
+  }
+
+  /**
+   * Constructor
+   * 
+   * @param path
+   *          Path to the JSON trace file, possibly compressed.
+   * @param cluster
+   *          The topology of the cluster that corresponds to the jobs in the
+   *          trace. The argument can be null if we do not have knowledge of the
+   *          cluster topology.
+   * @param conf
+   * @throws IOException
+   */
+  public ZombieJobProducer(Path path, ZombieCluster cluster, Configuration conf)
+      throws IOException {
+    this(new JobTraceReader(path, conf), cluster);
+  }
+
+  /**
+   * Constructor
+   * 
+   * @param input
+   *          The input stream for the JSON trace.
+   * @param cluster
+   *          The topology of the cluster that corresponds to the jobs in the
+   *          trace. The argument can be null if we do not have knowledge of the
+   *          cluster topology.
+   * @throws IOException
+   */
+  public ZombieJobProducer(InputStream input, ZombieCluster cluster)
+      throws IOException {
+    this(new JobTraceReader(input), cluster);
+  }
+
+  @Override
+  public JobStory getNextJob() throws IOException {
+    LoggedJob job = reader.getNext();
+    return (job == null) ? null : new ZombieJob(job, cluster);
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+}



Mime
View raw message